aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/lib/iomgr/ev_epoll1_linux.cc2
-rw-r--r--src/core/lib/iomgr/ev_epollex_linux.cc2
-rw-r--r--src/core/lib/iomgr/ev_epollsig_linux.cc2
-rw-r--r--src/core/lib/iomgr/ev_poll_posix.cc11
-rw-r--r--src/node/health_check/package.json29
-rw-r--r--src/node/tools/package.json41
-rw-r--r--src/python/grpcio/grpc/_channel.py66
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi24
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi109
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi238
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi21
-rw-r--r--src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi248
-rw-r--r--src/python/grpcio/grpc/_cython/cygrpc.pxd1
-rw-r--r--src/python/grpcio/grpc/_cython/cygrpc.pyx1
-rw-r--r--src/python/grpcio/grpc/_server.py46
-rw-r--r--src/python/grpcio_tests/tests/tests.json1
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py26
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py22
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py20
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py24
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/_server_test.py49
-rw-r--r--src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py94
22 files changed, 579 insertions, 498 deletions
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc
index 92b4b8b90b..1ab7e516de 100644
--- a/src/core/lib/iomgr/ev_epoll1_linux.cc
+++ b/src/core/lib/iomgr/ev_epoll1_linux.cc
@@ -1232,8 +1232,6 @@ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) {
/* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return
* NULL */
const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) {
- gpr_log(GPR_ERROR,
- "Skipping epoll1 because GRPC_LINUX_EPOLL is not defined.");
return nullptr;
}
#endif /* defined(GRPC_POSIX_SOCKET) */
diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc
index df2f629703..5f5f45a7a5 100644
--- a/src/core/lib/iomgr/ev_epollex_linux.cc
+++ b/src/core/lib/iomgr/ev_epollex_linux.cc
@@ -1449,8 +1449,6 @@ const grpc_event_engine_vtable* grpc_init_epollex_linux(
* NULL */
const grpc_event_engine_vtable* grpc_init_epollex_linux(
bool explicitly_requested) {
- gpr_log(GPR_ERROR,
- "Skipping epollex because GRPC_LINUX_EPOLL is not defined.");
return nullptr;
}
#endif /* defined(GRPC_POSIX_SOCKET) */
diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc
index bc548a1fda..8072a6cbed 100644
--- a/src/core/lib/iomgr/ev_epollsig_linux.cc
+++ b/src/core/lib/iomgr/ev_epollsig_linux.cc
@@ -1732,8 +1732,6 @@ const grpc_event_engine_vtable* grpc_init_epollsig_linux(
* NULL */
const grpc_event_engine_vtable* grpc_init_epollsig_linux(
bool explicit_request) {
- gpr_log(GPR_ERROR,
- "Skipping epollsig because GRPC_LINUX_EPOLL is not defined.");
return nullptr;
}
#endif /* defined(GRPC_POSIX_SOCKET) */
diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc
index 53de94fb6e..a569f674f6 100644
--- a/src/core/lib/iomgr/ev_poll_posix.cc
+++ b/src/core/lib/iomgr/ev_poll_posix.cc
@@ -71,6 +71,7 @@ struct grpc_fd {
int shutdown;
int closed;
int released;
+ gpr_atm pollhup;
grpc_error* shutdown_error;
/* The watcher list.
@@ -335,6 +336,7 @@ static grpc_fd* fd_create(int fd, const char* name) {
r->on_done_closure = nullptr;
r->closed = 0;
r->released = 0;
+ gpr_atm_no_barrier_store(&r->pollhup, 0);
r->read_notifier_pollset = nullptr;
char* name2;
@@ -950,7 +952,8 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
pfds[0].events = POLLIN;
pfds[0].revents = 0;
for (i = 0; i < pollset->fd_count; i++) {
- if (fd_is_orphaned(pollset->fds[i])) {
+ if (fd_is_orphaned(pollset->fds[i]) ||
+ gpr_atm_no_barrier_load(&pollset->fds[i]->pollhup) == 1) {
GRPC_FD_UNREF(pollset->fds[i], "multipoller");
} else {
pollset->fds[fd_count++] = pollset->fds[i];
@@ -1017,6 +1020,12 @@ static grpc_error* pollset_work(grpc_pollset* pollset,
pfds[i].fd, (pfds[i].revents & POLLIN_CHECK) != 0,
(pfds[i].revents & POLLOUT_CHECK) != 0, pfds[i].revents);
}
+ /* This is a mitigation to prevent poll() from spinning on a
+ ** POLLHUP https://github.com/grpc/grpc/pull/13665
+ */
+ if (pfds[i].revents & POLLHUP) {
+ gpr_atm_no_barrier_store(&watchers[i].fd->pollhup, 1);
+ }
fd_end_poll(&watchers[i], pfds[i].revents & POLLIN_CHECK,
pfds[i].revents & POLLOUT_CHECK, pollset);
}
diff --git a/src/node/health_check/package.json b/src/node/health_check/package.json
deleted file mode 100644
index fca3a2a7a6..0000000000
--- a/src/node/health_check/package.json
+++ /dev/null
@@ -1,29 +0,0 @@
-{
- "name": "grpc-health-check",
- "version": "1.7.2",
- "author": "Google Inc.",
- "description": "Health check service for use with gRPC",
- "repository": {
- "type": "git",
- "url": "https://github.com/grpc/grpc.git"
- },
- "bugs": "https://github.com/grpc/grpc/issues",
- "contributors": [
- {
- "name": "Michael Lumish",
- "email": "mlumish@google.com"
- }
- ],
- "dependencies": {
- "grpc": "^1.7.2",
- "lodash": "^3.9.3",
- "google-protobuf": "^3.0.0"
- },
- "files": [
- "LICENSE",
- "health.js",
- "v1"
- ],
- "main": "src/node/index.js",
- "license": "Apache-2.0"
-}
diff --git a/src/node/tools/package.json b/src/node/tools/package.json
deleted file mode 100644
index 99fd854067..0000000000
--- a/src/node/tools/package.json
+++ /dev/null
@@ -1,41 +0,0 @@
-{
- "name": "grpc-tools",
- "version": "1.7.2",
- "author": "Google Inc.",
- "description": "Tools for developing with gRPC on Node.js",
- "homepage": "https://grpc.io/",
- "repository": {
- "type": "git",
- "url": "https://github.com/grpc/grpc.git"
- },
- "bugs": "https://github.com/grpc/grpc/issues",
- "contributors": [
- {
- "name": "Michael Lumish",
- "email": "mlumish@google.com"
- }
- ],
- "bin": {
- "grpc_tools_node_protoc": "./bin/protoc.js",
- "grpc_tools_node_protoc_plugin": "./bin/protoc_plugin.js"
- },
- "scripts": {
- "install": "./node_modules/.bin/node-pre-gyp install"
- },
- "bundledDependencies": ["node-pre-gyp"],
- "binary": {
- "module_name": "grpc_tools",
- "host": "https://storage.googleapis.com/",
- "remote_path": "grpc-precompiled-binaries/node/{name}/v{version}",
- "package_name": "{platform}-{arch}.tar.gz",
- "module_path": "bin"
- },
- "files": [
- "index.js",
- "bin/protoc.js",
- "bin/protoc_plugin.js",
- "bin/google/protobuf",
- "LICENSE"
- ],
- "main": "index.js"
-}
diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py
index d7456a3dd1..3572737c87 100644
--- a/src/python/grpcio/grpc/_channel.py
+++ b/src/python/grpcio/grpc/_channel.py
@@ -129,12 +129,12 @@ def _abort(state, code, details):
def _handle_event(event, state, response_deserializer):
callbacks = []
for batch_operation in event.batch_operations:
- operation_type = batch_operation.type
+ operation_type = batch_operation.type()
state.due.remove(operation_type)
if operation_type == cygrpc.OperationType.receive_initial_metadata:
- state.initial_metadata = batch_operation.received_metadata
+ state.initial_metadata = batch_operation.initial_metadata()
elif operation_type == cygrpc.OperationType.receive_message:
- serialized_response = batch_operation.received_message.bytes()
+ serialized_response = batch_operation.message()
if serialized_response is not None:
response = _common.deserialize(serialized_response,
response_deserializer)
@@ -144,18 +144,17 @@ def _handle_event(event, state, response_deserializer):
else:
state.response = response
elif operation_type == cygrpc.OperationType.receive_status_on_client:
- state.trailing_metadata = batch_operation.received_metadata
+ state.trailing_metadata = batch_operation.trailing_metadata()
if state.code is None:
code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get(
- batch_operation.received_status_code)
+ batch_operation.code())
if code is None:
state.code = grpc.StatusCode.UNKNOWN
state.details = _unknown_code_details(
- batch_operation.received_status_code,
- batch_operation.received_status_details)
+ code, batch_operation.details())
else:
state.code = code
- state.details = batch_operation.received_status_details
+ state.details = batch_operation.details()
callbacks.extend(state.callbacks)
state.callbacks = None
return callbacks
@@ -200,7 +199,7 @@ def _consume_request_iterator(request_iterator, state, call,
_abort(state, grpc.StatusCode.INTERNAL, details)
return
else:
- operations = (cygrpc.operation_send_message(
+ operations = (cygrpc.SendMessageOperation(
serialized_request, _EMPTY_FLAGS),)
call.start_client_batch(operations, event_handler)
state.due.add(cygrpc.OperationType.send_message)
@@ -216,7 +215,7 @@ def _consume_request_iterator(request_iterator, state, call,
with state.condition:
if state.code is None:
operations = (
- cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),)
+ cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),)
call.start_client_batch(operations, event_handler)
state.due.add(cygrpc.OperationType.send_close_from_client)
@@ -319,7 +318,7 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call):
event_handler = _event_handler(self._state, self._call,
self._response_deserializer)
self._call.start_client_batch(
- (cygrpc.operation_receive_message(_EMPTY_FLAGS),),
+ (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
event_handler)
self._state.due.add(cygrpc.OperationType.receive_message)
elif self._state.code is grpc.StatusCode.OK:
@@ -453,12 +452,12 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable):
else:
state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None)
operations = (
- cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS),
- cygrpc.operation_send_message(serialized_request, _EMPTY_FLAGS),
- cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
- cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
- cygrpc.operation_receive_message(_EMPTY_FLAGS),
- cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
+ cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
+ cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS),
+ cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),)
return state, operations, deadline, deadline_timespec, None
def _blocking(self, request, timeout, metadata, credentials):
@@ -536,14 +535,14 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable):
self._response_deserializer)
with state.condition:
call.start_client_batch(
- (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),),
+ (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
event_handler)
operations = (
- cygrpc.operation_send_initial_metadata(
- metadata, _EMPTY_FLAGS), cygrpc.operation_send_message(
+ cygrpc.SendInitialMetadataOperation(
+ metadata, _EMPTY_FLAGS), cygrpc.SendMessageOperation(
serialized_request, _EMPTY_FLAGS),
- cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
- cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
+ cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),)
call_error = call.start_client_batch(operations, event_handler)
if call_error != cygrpc.CallError.ok:
_call_error_set_RPCstate(state, call_error, metadata)
@@ -573,12 +572,11 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
call.set_credentials(credentials._credentials)
with state.condition:
call.start_client_batch(
- (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),),
- None)
+ (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), None)
operations = (
- cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS),
- cygrpc.operation_receive_message(_EMPTY_FLAGS),
- cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
+ cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
+ cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),)
call_error = call.start_client_batch(operations, None)
_check_call_error(call_error, metadata)
_consume_request_iterator(request_iterator, state, call,
@@ -624,12 +622,12 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable):
event_handler = _event_handler(state, call, self._response_deserializer)
with state.condition:
call.start_client_batch(
- (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),),
+ (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
event_handler)
operations = (
- cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS),
- cygrpc.operation_receive_message(_EMPTY_FLAGS),
- cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
+ cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
+ cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),)
call_error = call.start_client_batch(operations, event_handler)
if call_error != cygrpc.CallError.ok:
_call_error_set_RPCstate(state, call_error, metadata)
@@ -664,11 +662,11 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable):
event_handler = _event_handler(state, call, self._response_deserializer)
with state.condition:
call.start_client_batch(
- (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),),
+ (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),),
event_handler)
operations = (
- cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS),
- cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
+ cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),)
call_error = call.start_client_batch(operations, event_handler)
if call_error != cygrpc.CallError.ok:
_call_error_set_RPCstate(state, call_error, metadata)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
index 6a72bbf693..6ee833697d 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi
@@ -17,6 +17,7 @@ cimport libc.time
# Typedef types with approximately the same semantics to provide their names to
# Cython
+ctypedef unsigned char uint8_t
ctypedef int int32_t
ctypedef unsigned uint32_t
ctypedef long int64_t
@@ -25,6 +26,7 @@ ctypedef long int64_t
cdef extern from "grpc/support/alloc.h":
void *gpr_malloc(size_t size) nogil
+ void *gpr_zalloc(size_t size) nogil
void gpr_free(void *ptr) nogil
void *gpr_realloc(void *p, size_t size) nogil
@@ -183,6 +185,18 @@ cdef extern from "grpc/grpc.h":
size_t arguments_length "num_args"
grpc_arg *arguments "args"
+ ctypedef enum grpc_compression_level:
+ GRPC_COMPRESS_LEVEL_NONE
+ GRPC_COMPRESS_LEVEL_LOW
+ GRPC_COMPRESS_LEVEL_MED
+ GRPC_COMPRESS_LEVEL_HIGH
+
+ ctypedef enum grpc_stream_compression_level:
+ GRPC_STREAM_COMPRESS_LEVEL_NONE
+ GRPC_STREAM_COMPRESS_LEVEL_LOW
+ GRPC_STREAM_COMPRESS_LEVEL_MED
+ GRPC_STREAM_COMPRESS_LEVEL_HIGH
+
ctypedef enum grpc_call_error:
GRPC_CALL_OK
GRPC_CALL_ERROR
@@ -258,9 +272,19 @@ cdef extern from "grpc/grpc.h":
GRPC_OP_RECV_STATUS_ON_CLIENT
GRPC_OP_RECV_CLOSE_ON_SERVER
+ ctypedef struct grpc_op_send_initial_metadata_maybe_compression_level:
+ uint8_t is_set
+ grpc_compression_level level
+
+ ctypedef struct grpc_op_send_initial_metadata_maybe_stream_compression_level:
+ uint8_t is_set
+ grpc_stream_compression_level level
+
ctypedef struct grpc_op_data_send_initial_metadata:
size_t count
grpc_metadata *metadata
+ grpc_op_send_initial_metadata_maybe_compression_level maybe_compression_level
+ grpc_op_send_initial_metadata_maybe_stream_compression_level maybe_stream_compression_level
ctypedef struct grpc_op_data_send_status_from_server:
size_t trailing_metadata_count
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi
new file mode 100644
index 0000000000..bfbe27785b
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi
@@ -0,0 +1,109 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class Operation:
+
+ cdef void c(self)
+ cdef void un_c(self)
+
+ # TODO(https://github.com/grpc/grpc/issues/7950): Eliminate this!
+ cdef grpc_op c_op
+
+
+cdef class SendInitialMetadataOperation(Operation):
+
+ cdef readonly object _initial_metadata;
+ cdef readonly int _flags
+ cdef grpc_metadata *_c_initial_metadata
+ cdef size_t _c_initial_metadata_count
+
+ cdef void c(self)
+ cdef void un_c(self)
+
+
+cdef class SendMessageOperation(Operation):
+
+ cdef readonly bytes _message
+ cdef readonly int _flags
+ cdef grpc_byte_buffer *_c_message_byte_buffer
+
+ cdef void c(self)
+ cdef void un_c(self)
+
+
+cdef class SendCloseFromClientOperation(Operation):
+
+ cdef readonly int _flags
+
+ cdef void c(self)
+ cdef void un_c(self)
+
+
+cdef class SendStatusFromServerOperation(Operation):
+
+ cdef readonly object _trailing_metadata
+ cdef readonly object _code
+ cdef readonly object _details
+ cdef readonly int _flags
+ cdef grpc_metadata *_c_trailing_metadata
+ cdef size_t _c_trailing_metadata_count
+ cdef grpc_slice _c_details
+
+ cdef void c(self)
+ cdef void un_c(self)
+
+
+cdef class ReceiveInitialMetadataOperation(Operation):
+
+ cdef readonly int _flags
+ cdef tuple _initial_metadata
+ cdef grpc_metadata_array _c_initial_metadata
+
+ cdef void c(self)
+ cdef void un_c(self)
+
+
+cdef class ReceiveMessageOperation(Operation):
+
+ cdef readonly int _flags
+ cdef grpc_byte_buffer *_c_message_byte_buffer
+ cdef bytes _message
+
+ cdef void c(self)
+ cdef void un_c(self)
+
+
+cdef class ReceiveStatusOnClientOperation(Operation):
+
+ cdef readonly int _flags
+ cdef grpc_metadata_array _c_trailing_metadata
+ cdef grpc_status_code _c_code
+ cdef grpc_slice _c_details
+ cdef tuple _trailing_metadata
+ cdef object _code
+ cdef str _details
+
+ cdef void c(self)
+ cdef void un_c(self)
+
+
+cdef class ReceiveCloseOnServerOperation(Operation):
+
+ cdef readonly int _flags
+ cdef object _cancelled
+ cdef int _c_cancelled
+
+ cdef void c(self)
+ cdef void un_c(self)
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi
new file mode 100644
index 0000000000..3c91abf722
--- /dev/null
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi
@@ -0,0 +1,238 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+cdef class Operation:
+
+ cdef void c(self):
+ raise NotImplementedError()
+
+ cdef void un_c(self):
+ raise NotImplementedError()
+
+
+cdef class SendInitialMetadataOperation(Operation):
+
+ def __cinit__(self, initial_metadata, flags):
+ self._initial_metadata = initial_metadata
+ self._flags = flags
+
+ def type(self):
+ return GRPC_OP_SEND_INITIAL_METADATA
+
+ cdef void c(self):
+ self.c_op.type = GRPC_OP_SEND_INITIAL_METADATA
+ self.c_op.flags = self._flags
+ _store_c_metadata(
+ self._initial_metadata, &self._c_initial_metadata,
+ &self._c_initial_metadata_count)
+ self.c_op.data.send_initial_metadata.metadata = self._c_initial_metadata
+ self.c_op.data.send_initial_metadata.count = self._c_initial_metadata_count
+ self.c_op.data.send_initial_metadata.maybe_compression_level.is_set = 0
+ self.c_op.data.send_initial_metadata.maybe_stream_compression_level.is_set = 0
+
+ cdef void un_c(self):
+ _release_c_metadata(
+ self._c_initial_metadata, self._c_initial_metadata_count)
+
+
+cdef class SendMessageOperation(Operation):
+
+ def __cinit__(self, bytes message, int flags):
+ self._message = message
+ self._flags = flags
+
+ def type(self):
+ return GRPC_OP_SEND_MESSAGE
+
+ cdef void c(self):
+ self.c_op.type = GRPC_OP_SEND_MESSAGE
+ self.c_op.flags = self._flags
+ cdef grpc_slice message_slice = grpc_slice_from_copied_buffer(
+ self._message, len(self._message))
+ self._c_message_byte_buffer = grpc_raw_byte_buffer_create(
+ &message_slice, 1)
+ grpc_slice_unref(message_slice)
+ self.c_op.data.send_message.send_message = self._c_message_byte_buffer
+
+ cdef void un_c(self):
+ grpc_byte_buffer_destroy(self._c_message_byte_buffer)
+
+
+cdef class SendCloseFromClientOperation(Operation):
+
+ def __cinit__(self, int flags):
+ self._flags = flags
+
+ def type(self):
+ return GRPC_OP_SEND_CLOSE_FROM_CLIENT
+
+ cdef void c(self):
+ self.c_op.type = GRPC_OP_SEND_CLOSE_FROM_CLIENT
+ self.c_op.flags = self._flags
+
+ cdef void un_c(self):
+ pass
+
+
+cdef class SendStatusFromServerOperation(Operation):
+
+ def __cinit__(self, trailing_metadata, code, object details, int flags):
+ self._trailing_metadata = trailing_metadata
+ self._code = code
+ self._details = details
+ self._flags = flags
+
+ def type(self):
+ return GRPC_OP_SEND_STATUS_FROM_SERVER
+
+ cdef void c(self):
+ self.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER
+ self.c_op.flags = self._flags
+ _store_c_metadata(
+ self._trailing_metadata, &self._c_trailing_metadata,
+ &self._c_trailing_metadata_count)
+ self.c_op.data.send_status_from_server.trailing_metadata = (
+ self._c_trailing_metadata)
+ self.c_op.data.send_status_from_server.trailing_metadata_count = (
+ self._c_trailing_metadata_count)
+ self.c_op.data.send_status_from_server.status = self._code
+ self._c_details = _slice_from_bytes(_encode(self._details))
+ self.c_op.data.send_status_from_server.status_details = &self._c_details
+
+ cdef void un_c(self):
+ grpc_slice_unref(self._c_details)
+ _release_c_metadata(
+ self._c_trailing_metadata, self._c_trailing_metadata_count)
+
+
+cdef class ReceiveInitialMetadataOperation(Operation):
+
+ def __cinit__(self, flags):
+ self._flags = flags
+
+ def type(self):
+ return GRPC_OP_RECV_INITIAL_METADATA
+
+ cdef void c(self):
+ self.c_op.type = GRPC_OP_RECV_INITIAL_METADATA
+ self.c_op.flags = self._flags
+ grpc_metadata_array_init(&self._c_initial_metadata)
+ self.c_op.data.receive_initial_metadata.receive_initial_metadata = (
+ &self._c_initial_metadata)
+
+ cdef void un_c(self):
+ self._initial_metadata = _metadata(&self._c_initial_metadata)
+ grpc_metadata_array_destroy(&self._c_initial_metadata)
+
+ def initial_metadata(self):
+ return self._initial_metadata
+
+
+cdef class ReceiveMessageOperation(Operation):
+
+ def __cinit__(self, flags):
+ self._flags = flags
+
+ def type(self):
+ return GRPC_OP_RECV_MESSAGE
+
+ cdef void c(self):
+ self.c_op.type = GRPC_OP_RECV_MESSAGE
+ self.c_op.flags = self._flags
+ self.c_op.data.receive_message.receive_message = (
+ &self._c_message_byte_buffer)
+
+ cdef void un_c(self):
+ cdef grpc_byte_buffer_reader message_reader
+ cdef bint message_reader_status
+ cdef grpc_slice message_slice
+ cdef size_t message_slice_length
+ cdef void *message_slice_pointer
+ if self._c_message_byte_buffer != NULL:
+ message_reader_status = grpc_byte_buffer_reader_init(
+ &message_reader, self._c_message_byte_buffer)
+ if message_reader_status:
+ message = bytearray()
+ while grpc_byte_buffer_reader_next(&message_reader, &message_slice):
+ message_slice_pointer = grpc_slice_start_ptr(message_slice)
+ message_slice_length = grpc_slice_length(message_slice)
+ message += (<char *>message_slice_pointer)[:message_slice_length]
+ grpc_slice_unref(message_slice)
+ grpc_byte_buffer_reader_destroy(&message_reader)
+ self._message = bytes(message)
+ else:
+ self._message = None
+ grpc_byte_buffer_destroy(self._c_message_byte_buffer)
+ else:
+ self._message = None
+
+ def message(self):
+ return self._message
+
+
+cdef class ReceiveStatusOnClientOperation(Operation):
+
+ def __cinit__(self, flags):
+ self._flags = flags
+
+ def type(self):
+ return GRPC_OP_RECV_STATUS_ON_CLIENT
+
+ cdef void c(self):
+ self.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT
+ self.c_op.flags = self._flags
+ grpc_metadata_array_init(&self._c_trailing_metadata)
+ self.c_op.data.receive_status_on_client.trailing_metadata = (
+ &self._c_trailing_metadata)
+ self.c_op.data.receive_status_on_client.status = (
+ &self._c_code)
+ self.c_op.data.receive_status_on_client.status_details = (
+ &self._c_details)
+
+ cdef void un_c(self):
+ self._trailing_metadata = _metadata(&self._c_trailing_metadata)
+ grpc_metadata_array_destroy(&self._c_trailing_metadata)
+ self._code = self._c_code
+ self._details = _decode(_slice_bytes(self._c_details))
+ grpc_slice_unref(self._c_details)
+
+ def trailing_metadata(self):
+ return self._trailing_metadata
+
+ def code(self):
+ return self._code
+
+ def details(self):
+ return self._details
+
+
+cdef class ReceiveCloseOnServerOperation(Operation):
+
+ def __cinit__(self, flags):
+ self._flags = flags
+
+ def type(self):
+ return GRPC_OP_RECV_CLOSE_ON_SERVER
+
+ cdef void c(self):
+ self.c_op.type = GRPC_OP_RECV_CLOSE_ON_SERVER
+ self.c_op.flags = self._flags
+ self.c_op.data.receive_close_on_server.cancelled = &self._c_cancelled
+
+ cdef void un_c(self):
+ self._cancelled = bool(self._c_cancelled)
+
+ def cancelled(self):
+ return self._cancelled
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
index 594fdb1a8b..537cf2b537 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi
@@ -65,11 +65,6 @@ cdef class Event:
cdef readonly object batch_operations
-cdef class ByteBuffer:
-
- cdef grpc_byte_buffer *c_byte_buffer
-
-
cdef class SslPemKeyCertPair:
cdef grpc_ssl_pem_key_cert_pair c_pair
@@ -89,22 +84,6 @@ cdef class ChannelArgs:
cdef list args
-cdef class Operation:
-
- cdef grpc_op c_op
- cdef bint _c_metadata_needs_release
- cdef size_t _c_metadata_count
- cdef grpc_metadata *_c_metadata
- cdef ByteBuffer _received_message
- cdef bint _c_metadata_array_needs_destruction
- cdef grpc_metadata_array _c_metadata_array
- cdef grpc_status_code _received_status_code
- cdef grpc_slice _status_details
- cdef int _received_cancelled
- cdef readonly bint is_valid
- cdef object references
-
-
cdef class CompressionOptions:
cdef grpc_compression_options c_options
diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
index 26eaf50eb4..7e6272fe2a 100644
--- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
+++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi
@@ -229,13 +229,15 @@ cdef class OperationTag:
self.c_nops = 0 if self._operations is None else len(self._operations)
if 0 < self.c_nops:
self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op) * self.c_nops)
- for index in range(self.c_nops):
- self.c_ops[index] = (<Operation>(self._operations[index])).c_op
+ for index, operation in enumerate(self._operations):
+ (<Operation>operation).c()
+ self.c_ops[index] = (<Operation>operation).c_op
cdef object release_ops(self):
if 0 < self.c_nops:
for index, operation in enumerate(self._operations):
(<Operation>operation).c_op = self.c_ops[index]
+ (<Operation>operation).un_c()
gpr_free(self.c_ops)
return self._operations
else:
@@ -260,69 +262,6 @@ cdef class Event:
self.is_new_request = is_new_request
-cdef class ByteBuffer:
-
- def __cinit__(self, bytes data):
- grpc_init()
- if data is None:
- self.c_byte_buffer = NULL
- return
-
- cdef char *c_data = data
- cdef grpc_slice data_slice
- cdef size_t data_length = len(data)
- with nogil:
- data_slice = grpc_slice_from_copied_buffer(c_data, data_length)
- with nogil:
- self.c_byte_buffer = grpc_raw_byte_buffer_create(
- &data_slice, 1)
- with nogil:
- grpc_slice_unref(data_slice)
-
- def bytes(self):
- cdef grpc_byte_buffer_reader reader
- cdef grpc_slice data_slice
- cdef size_t data_slice_length
- cdef void *data_slice_pointer
- cdef bint reader_status
- if self.c_byte_buffer != NULL:
- with nogil:
- reader_status = grpc_byte_buffer_reader_init(
- &reader, self.c_byte_buffer)
- if not reader_status:
- return None
- result = bytearray()
- with nogil:
- while grpc_byte_buffer_reader_next(&reader, &data_slice):
- data_slice_pointer = grpc_slice_start_ptr(data_slice)
- data_slice_length = grpc_slice_length(data_slice)
- with gil:
- result += (<char *>data_slice_pointer)[:data_slice_length]
- grpc_slice_unref(data_slice)
- with nogil:
- grpc_byte_buffer_reader_destroy(&reader)
- return bytes(result)
- else:
- return None
-
- def __len__(self):
- cdef size_t result
- if self.c_byte_buffer != NULL:
- with nogil:
- result = grpc_byte_buffer_length(self.c_byte_buffer)
- return result
- else:
- return 0
-
- def __str__(self):
- return self.bytes()
-
- def __dealloc__(self):
- if self.c_byte_buffer != NULL:
- grpc_byte_buffer_destroy(self.c_byte_buffer)
- grpc_shutdown()
-
-
cdef class SslPemKeyCertPair:
def __cinit__(self, bytes private_key, bytes certificate_chain):
@@ -407,185 +346,6 @@ cdef class ChannelArgs:
return self.args[i]
-cdef class Operation:
-
- def __cinit__(self):
- grpc_init()
- self.references = []
- self._c_metadata_needs_release = False
- self._c_metadata_array_needs_destruction = False
- self._status_details = grpc_empty_slice()
- self.is_valid = False
-
- @property
- def type(self):
- return self.c_op.type
-
- @property
- def flags(self):
- return self.c_op.flags
-
- @property
- def has_status(self):
- return self.c_op.type == GRPC_OP_RECV_STATUS_ON_CLIENT
-
- @property
- def received_message(self):
- if self.c_op.type != GRPC_OP_RECV_MESSAGE:
- raise TypeError("self must be an operation receiving a message")
- return self._received_message
-
- @property
- def received_message_or_none(self):
- if self.c_op.type != GRPC_OP_RECV_MESSAGE:
- return None
- return self._received_message
-
- @property
- def received_metadata(self):
- if (self.c_op.type != GRPC_OP_RECV_INITIAL_METADATA and
- self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT):
- raise TypeError("self must be an operation receiving metadata")
- return _metadata(&self._c_metadata_array)
-
- @property
- def received_status_code(self):
- if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT:
- raise TypeError("self must be an operation receiving a status code")
- return self._received_status_code
-
- @property
- def received_status_code_or_none(self):
- if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT:
- return None
- return self._received_status_code
-
- @property
- def received_status_details(self):
- if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT:
- raise TypeError("self must be an operation receiving status details")
- return _slice_bytes(self._status_details)
-
- @property
- def received_status_details_or_none(self):
- if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT:
- return None
- return _slice_bytes(self._status_details)
-
- @property
- def received_cancelled(self):
- if self.c_op.type != GRPC_OP_RECV_CLOSE_ON_SERVER:
- raise TypeError("self must be an operation receiving cancellation "
- "information")
- return False if self._received_cancelled == 0 else True
-
- @property
- def received_cancelled_or_none(self):
- if self.c_op.type != GRPC_OP_RECV_CLOSE_ON_SERVER:
- return None
- return False if self._received_cancelled == 0 else True
-
- def __dealloc__(self):
- if self._c_metadata_needs_release:
- _release_c_metadata(self._c_metadata, self._c_metadata_count)
- if self._c_metadata_array_needs_destruction:
- grpc_metadata_array_destroy(&self._c_metadata_array)
- grpc_slice_unref(self._status_details)
- grpc_shutdown()
-
-def operation_send_initial_metadata(metadata, int flags):
- cdef Operation op = Operation()
- op.c_op.type = GRPC_OP_SEND_INITIAL_METADATA
- op.c_op.flags = flags
- _store_c_metadata(metadata, &op._c_metadata, &op._c_metadata_count)
- op._c_metadata_needs_release = True
- op.c_op.data.send_initial_metadata.count = op._c_metadata_count
- op.c_op.data.send_initial_metadata.metadata = op._c_metadata
- op.is_valid = True
- return op
-
-def operation_send_message(data, int flags):
- cdef Operation op = Operation()
- op.c_op.type = GRPC_OP_SEND_MESSAGE
- op.c_op.flags = flags
- byte_buffer = ByteBuffer(data)
- op.c_op.data.send_message.send_message = byte_buffer.c_byte_buffer
- op.references.append(byte_buffer)
- op.is_valid = True
- return op
-
-def operation_send_close_from_client(int flags):
- cdef Operation op = Operation()
- op.c_op.type = GRPC_OP_SEND_CLOSE_FROM_CLIENT
- op.c_op.flags = flags
- op.is_valid = True
- return op
-
-def operation_send_status_from_server(
- metadata, grpc_status_code code, bytes details, int flags):
- cdef Operation op = Operation()
- op.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER
- op.c_op.flags = flags
- _store_c_metadata(metadata, &op._c_metadata, &op._c_metadata_count)
- op._c_metadata_needs_release = True
- op.c_op.data.send_status_from_server.trailing_metadata_count = (
- op._c_metadata_count)
- op.c_op.data.send_status_from_server.trailing_metadata = op._c_metadata
- op.c_op.data.send_status_from_server.status = code
- grpc_slice_unref(op._status_details)
- op._status_details = _slice_from_bytes(details)
- op.c_op.data.send_status_from_server.status_details = &op._status_details
- op.is_valid = True
- return op
-
-def operation_receive_initial_metadata(int flags):
- cdef Operation op = Operation()
- op.c_op.type = GRPC_OP_RECV_INITIAL_METADATA
- op.c_op.flags = flags
- grpc_metadata_array_init(&op._c_metadata_array)
- op.c_op.data.receive_initial_metadata.receive_initial_metadata = (
- &op._c_metadata_array)
- op._c_metadata_array_needs_destruction = True
- op.is_valid = True
- return op
-
-def operation_receive_message(int flags):
- cdef Operation op = Operation()
- op.c_op.type = GRPC_OP_RECV_MESSAGE
- op.c_op.flags = flags
- op._received_message = ByteBuffer(None)
- # n.b. the c_op.data.receive_message field needs to be deleted by us,
- # anyway, so we just let that be handled by the ByteBuffer() we allocated
- # the line before.
- op.c_op.data.receive_message.receive_message = (
- &op._received_message.c_byte_buffer)
- op.is_valid = True
- return op
-
-def operation_receive_status_on_client(int flags):
- cdef Operation op = Operation()
- op.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT
- op.c_op.flags = flags
- grpc_metadata_array_init(&op._c_metadata_array)
- op.c_op.data.receive_status_on_client.trailing_metadata = (
- &op._c_metadata_array)
- op._c_metadata_array_needs_destruction = True
- op.c_op.data.receive_status_on_client.status = (
- &op._received_status_code)
- op.c_op.data.receive_status_on_client.status_details = (
- &op._status_details)
- op.is_valid = True
- return op
-
-def operation_receive_close_on_server(int flags):
- cdef Operation op = Operation()
- op.c_op.type = GRPC_OP_RECV_CLOSE_ON_SERVER
- op.c_op.flags = flags
- op.c_op.data.receive_close_on_server.cancelled = &op._received_cancelled
- op.is_valid = True
- return op
-
-
cdef class CompressionOptions:
def __cinit__(self):
diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pxd b/src/python/grpcio/grpc/_cython/cygrpc.pxd
index 6fc5638d5d..ad229de0ae 100644
--- a/src/python/grpcio/grpc/_cython/cygrpc.pxd
+++ b/src/python/grpcio/grpc/_cython/cygrpc.pxd
@@ -19,6 +19,7 @@ include "_cygrpc/channel.pxd.pxi"
include "_cygrpc/credentials.pxd.pxi"
include "_cygrpc/completion_queue.pxd.pxi"
include "_cygrpc/metadata.pxd.pxi"
+include "_cygrpc/operation.pxd.pxi"
include "_cygrpc/records.pxd.pxi"
include "_cygrpc/security.pxd.pxi"
include "_cygrpc/server.pxd.pxi"
diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx
index d605229822..0964fb66ab 100644
--- a/src/python/grpcio/grpc/_cython/cygrpc.pyx
+++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx
@@ -26,6 +26,7 @@ include "_cygrpc/channel.pyx.pxi"
include "_cygrpc/credentials.pyx.pxi"
include "_cygrpc/completion_queue.pyx.pxi"
include "_cygrpc/metadata.pyx.pxi"
+include "_cygrpc/operation.pyx.pxi"
include "_cygrpc/records.pyx.pxi"
include "_cygrpc/security.pyx.pxi"
include "_cygrpc/server.pyx.pxi"
diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py
index 02d3af8706..eec31bdcf6 100644
--- a/src/python/grpcio/grpc/_server.py
+++ b/src/python/grpcio/grpc/_server.py
@@ -50,7 +50,7 @@ _UNEXPECTED_EXIT_SERVER_GRACE = 1.0
def _serialized_request(request_event):
- return request_event.batch_operations[0].received_message.bytes()
+ return request_event.batch_operations[0].message()
def _application_code(code):
@@ -130,13 +130,13 @@ def _abort(state, call, code, details):
effective_code = _abortion_code(state, code)
effective_details = details if state.details is None else state.details
if state.initial_metadata_allowed:
- operations = (cygrpc.operation_send_initial_metadata(
- (), _EMPTY_FLAGS), cygrpc.operation_send_status_from_server(
+ operations = (cygrpc.SendInitialMetadataOperation(
+ None, _EMPTY_FLAGS), cygrpc.SendStatusFromServerOperation(
state.trailing_metadata, effective_code, effective_details,
_EMPTY_FLAGS),)
token = _SEND_INITIAL_METADATA_AND_SEND_STATUS_FROM_SERVER_TOKEN
else:
- operations = (cygrpc.operation_send_status_from_server(
+ operations = (cygrpc.SendStatusFromServerOperation(
state.trailing_metadata, effective_code, effective_details,
_EMPTY_FLAGS),)
token = _SEND_STATUS_FROM_SERVER_TOKEN
@@ -150,8 +150,7 @@ def _receive_close_on_server(state):
def receive_close_on_server(receive_close_on_server_event):
with state.condition:
- if receive_close_on_server_event.batch_operations[
- 0].received_cancelled:
+ if receive_close_on_server_event.batch_operations[0].cancelled():
state.client = _CANCELLED
elif state.client is _OPEN:
state.client = _CLOSED
@@ -262,7 +261,7 @@ class _Context(grpc.ServicerContext):
_raise_rpc_error(self._state)
else:
if self._state.initial_metadata_allowed:
- operation = cygrpc.operation_send_initial_metadata(
+ operation = cygrpc.SendInitialMetadataOperation(
initial_metadata, _EMPTY_FLAGS)
self._rpc_event.operation_call.start_server_batch(
(operation,), _send_initial_metadata(self._state))
@@ -305,7 +304,7 @@ class _RequestIterator(object):
raise StopIteration()
else:
self._call.start_server_batch(
- (cygrpc.operation_receive_message(_EMPTY_FLAGS),),
+ (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
_receive_message(self._state, self._call,
self._request_deserializer))
self._state.due.add(_RECEIVE_MESSAGE_TOKEN)
@@ -348,7 +347,7 @@ def _unary_request(rpc_event, state, request_deserializer):
return None
else:
rpc_event.operation_call.start_server_batch(
- (cygrpc.operation_receive_message(_EMPTY_FLAGS),),
+ (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
_receive_message(state, rpc_event.operation_call,
request_deserializer))
state.due.add(_RECEIVE_MESSAGE_TOKEN)
@@ -424,14 +423,15 @@ def _send_response(rpc_event, state, serialized_response):
return False
else:
if state.initial_metadata_allowed:
- operations = (cygrpc.operation_send_initial_metadata(
- (), _EMPTY_FLAGS), cygrpc.operation_send_message(
- serialized_response, _EMPTY_FLAGS),)
+ operations = (cygrpc.SendInitialMetadataOperation(None,
+ _EMPTY_FLAGS),
+ cygrpc.SendMessageOperation(serialized_response,
+ _EMPTY_FLAGS),)
state.initial_metadata_allowed = False
token = _SEND_INITIAL_METADATA_AND_SEND_MESSAGE_TOKEN
else:
- operations = (cygrpc.operation_send_message(serialized_response,
- _EMPTY_FLAGS),)
+ operations = (cygrpc.SendMessageOperation(serialized_response,
+ _EMPTY_FLAGS),)
token = _SEND_MESSAGE_TOKEN
rpc_event.operation_call.start_server_batch(
operations, _send_message(state, token))
@@ -448,16 +448,16 @@ def _status(rpc_event, state, serialized_response):
code = _completion_code(state)
details = _details(state)
operations = [
- cygrpc.operation_send_status_from_server(
+ cygrpc.SendStatusFromServerOperation(
state.trailing_metadata, code, details, _EMPTY_FLAGS),
]
if state.initial_metadata_allowed:
operations.append(
- cygrpc.operation_send_initial_metadata((), _EMPTY_FLAGS))
+ cygrpc.SendInitialMetadataOperation(None, _EMPTY_FLAGS))
if serialized_response is not None:
operations.append(
- cygrpc.operation_send_message(serialized_response,
- _EMPTY_FLAGS))
+ cygrpc.SendMessageOperation(serialized_response,
+ _EMPTY_FLAGS))
rpc_event.operation_call.start_server_batch(
operations,
_send_status_from_server(state, _SEND_STATUS_FROM_SERVER_TOKEN))
@@ -563,10 +563,10 @@ def _find_method_handler(rpc_event, generic_handlers, interceptor_pipeline):
def _reject_rpc(rpc_event, status, details):
- operations = (cygrpc.operation_send_initial_metadata((), _EMPTY_FLAGS),
- cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
- cygrpc.operation_send_status_from_server((), status, details,
- _EMPTY_FLAGS),)
+ operations = (cygrpc.SendInitialMetadataOperation(None, _EMPTY_FLAGS),
+ cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
+ cygrpc.SendStatusFromServerOperation(None, status, details,
+ _EMPTY_FLAGS),)
rpc_state = _RPCState()
rpc_event.operation_call.start_server_batch(
operations, lambda ignored_event: (rpc_state, (),))
@@ -577,7 +577,7 @@ def _handle_with_method_handler(rpc_event, method_handler, thread_pool):
state = _RPCState()
with state.condition:
rpc_event.operation_call.start_server_batch(
- (cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),),
+ (cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),),
_receive_close_on_server(state))
state.due.add(_RECEIVE_CLOSE_ON_SERVER_TOKEN)
if method_handler.request_streaming:
diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json
index 3bf5308749..e033c1063f 100644
--- a/src/python/grpcio_tests/tests/tests.json
+++ b/src/python/grpcio_tests/tests/tests.json
@@ -34,6 +34,7 @@
"unit._cython._no_messages_server_completion_queue_per_call_test.Test",
"unit._cython._no_messages_single_server_completion_queue_test.Test",
"unit._cython._read_some_but_not_all_responses_test.ReadSomeButNotAllResponsesTest",
+ "unit._cython._server_test.Test",
"unit._cython.cygrpc_test.InsecureServerInsecureClient",
"unit._cython.cygrpc_test.SecureServerSecureClient",
"unit._cython.cygrpc_test.TypeSmokeTest",
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
index 7df8e2fde6..75b6b9e928 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py
@@ -65,10 +65,10 @@ class _Handler(object):
with self._lock:
self._call.start_server_batch(
- (cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),),
+ (cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),),
_RECEIVE_CLOSE_ON_SERVER_TAG)
self._call.start_server_batch(
- (cygrpc.operation_receive_message(_EMPTY_FLAGS),),
+ (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
_RECEIVE_MESSAGE_TAG)
first_event = self._completion_queue.poll()
if _is_cancellation_event(first_event):
@@ -76,10 +76,10 @@ class _Handler(object):
else:
with self._lock:
operations = (
- cygrpc.operation_send_initial_metadata(_EMPTY_METADATA,
- _EMPTY_FLAGS),
- cygrpc.operation_send_message(b'\x79\x57', _EMPTY_FLAGS),
- cygrpc.operation_send_status_from_server(
+ cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA,
+ _EMPTY_FLAGS),
+ cygrpc.SendMessageOperation(b'\x79\x57', _EMPTY_FLAGS),
+ cygrpc.SendStatusFromServerOperation(
_EMPTY_METADATA, cygrpc.StatusCode.ok, b'test details!',
_EMPTY_FLAGS),)
self._call.start_server_batch(operations,
@@ -170,13 +170,13 @@ class CancelManyCallsTest(unittest.TestCase):
None, _EMPTY_FLAGS, client_completion_queue, b'/twinkies',
None, _INFINITE_FUTURE)
operations = (
- cygrpc.operation_send_initial_metadata(_EMPTY_METADATA,
- _EMPTY_FLAGS),
- cygrpc.operation_send_message(b'\x45\x56', _EMPTY_FLAGS),
- cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
- cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
- cygrpc.operation_receive_message(_EMPTY_FLAGS),
- cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),)
+ cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA,
+ _EMPTY_FLAGS),
+ cygrpc.SendMessageOperation(b'\x45\x56', _EMPTY_FLAGS),
+ cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),)
tag = 'client_complete_call_{0:04d}_tag'.format(index)
client_call.start_client_batch(operations, tag)
client_due.add(tag)
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
index d08003af44..41291cc88f 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py
@@ -49,18 +49,19 @@ class Test(_common.RpcTest, unittest.TestCase):
with self.client_condition:
client_receive_initial_metadata_start_batch_result = (
client_call.start_client_batch([
- cygrpc.operation_receive_initial_metadata(
- _common.EMPTY_FLAGS),
+ cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS),
], client_receive_initial_metadata_tag))
+ self.assertEqual(cygrpc.CallError.ok,
+ client_receive_initial_metadata_start_batch_result)
client_complete_rpc_start_batch_result = client_call.start_client_batch(
[
- cygrpc.operation_send_initial_metadata(
+ cygrpc.SendInitialMetadataOperation(
_common.INVOCATION_METADATA, _common.EMPTY_FLAGS),
- cygrpc.operation_send_close_from_client(
- _common.EMPTY_FLAGS),
- cygrpc.operation_receive_status_on_client(
- _common.EMPTY_FLAGS),
+ cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS),
], client_complete_rpc_tag)
+ self.assertEqual(cygrpc.CallError.ok,
+ client_complete_rpc_start_batch_result)
self.client_driver.add_due({
client_receive_initial_metadata_tag,
client_complete_rpc_tag,
@@ -72,7 +73,7 @@ class Test(_common.RpcTest, unittest.TestCase):
with server_call_condition:
server_send_initial_metadata_start_batch_result = (
server_request_call_event.operation_call.start_server_batch([
- cygrpc.operation_send_initial_metadata(
+ cygrpc.SendInitialMetadataOperation(
_common.INITIAL_METADATA, _common.EMPTY_FLAGS),
], server_send_initial_metadata_tag))
server_call_driver.add_due({
@@ -84,9 +85,8 @@ class Test(_common.RpcTest, unittest.TestCase):
with server_call_condition:
server_complete_rpc_start_batch_result = (
server_request_call_event.operation_call.start_server_batch([
- cygrpc.operation_receive_close_on_server(
- _common.EMPTY_FLAGS),
- cygrpc.operation_send_status_from_server(
+ cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS),
+ cygrpc.SendStatusFromServerOperation(
_common.TRAILING_METADATA, cygrpc.StatusCode.ok,
b'test details', _common.EMPTY_FLAGS),
], server_complete_rpc_tag))
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
index d0166a2b29..b429a20ed7 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py
@@ -44,17 +44,14 @@ class Test(_common.RpcTest, unittest.TestCase):
with self.client_condition:
client_receive_initial_metadata_start_batch_result = (
client_call.start_client_batch([
- cygrpc.operation_receive_initial_metadata(
- _common.EMPTY_FLAGS),
+ cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS),
], client_receive_initial_metadata_tag))
client_complete_rpc_start_batch_result = client_call.start_client_batch(
[
- cygrpc.operation_send_initial_metadata(
+ cygrpc.SendInitialMetadataOperation(
_common.INVOCATION_METADATA, _common.EMPTY_FLAGS),
- cygrpc.operation_send_close_from_client(
- _common.EMPTY_FLAGS),
- cygrpc.operation_receive_status_on_client(
- _common.EMPTY_FLAGS),
+ cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS),
], client_complete_rpc_tag)
self.client_driver.add_due({
client_receive_initial_metadata_tag,
@@ -67,7 +64,7 @@ class Test(_common.RpcTest, unittest.TestCase):
with self.server_condition:
server_send_initial_metadata_start_batch_result = (
server_request_call_event.operation_call.start_server_batch([
- cygrpc.operation_send_initial_metadata(
+ cygrpc.SendInitialMetadataOperation(
_common.INITIAL_METADATA, _common.EMPTY_FLAGS),
], server_send_initial_metadata_tag))
self.server_driver.add_due({
@@ -79,11 +76,10 @@ class Test(_common.RpcTest, unittest.TestCase):
with self.server_condition:
server_complete_rpc_start_batch_result = (
server_request_call_event.operation_call.start_server_batch([
- cygrpc.operation_receive_close_on_server(
- _common.EMPTY_FLAGS),
- cygrpc.operation_send_status_from_server(
+ cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS),
+ cygrpc.SendStatusFromServerOperation(
_common.TRAILING_METADATA, cygrpc.StatusCode.ok,
- b'test details', _common.EMPTY_FLAGS),
+ 'test details', _common.EMPTY_FLAGS),
], server_complete_rpc_tag))
self.server_driver.add_due({
server_complete_rpc_tag,
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
index c7d19058da..87d0dd7a85 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py
@@ -159,15 +159,15 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
with client_condition:
client_receive_initial_metadata_start_batch_result = (
client_call.start_client_batch([
- cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
+ cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
], client_receive_initial_metadata_tag))
client_due.add(client_receive_initial_metadata_tag)
client_complete_rpc_start_batch_result = (
client_call.start_client_batch([
- cygrpc.operation_send_initial_metadata(_EMPTY_METADATA,
- _EMPTY_FLAGS),
- cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
- cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),
+ cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA,
+ _EMPTY_FLAGS),
+ cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
], client_complete_rpc_tag))
client_due.add(client_complete_rpc_tag)
@@ -176,12 +176,12 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
with server_call_condition:
server_send_initial_metadata_start_batch_result = (
server_rpc_event.operation_call.start_server_batch([
- cygrpc.operation_send_initial_metadata(_EMPTY_METADATA,
- _EMPTY_FLAGS),
+ cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA,
+ _EMPTY_FLAGS),
], server_send_initial_metadata_tag))
server_send_first_message_start_batch_result = (
server_rpc_event.operation_call.start_server_batch([
- cygrpc.operation_send_message(b'\x07', _EMPTY_FLAGS),
+ cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS),
], server_send_first_message_tag))
server_send_initial_metadata_event = server_call_driver.event_with_tag(
server_send_initial_metadata_tag)
@@ -190,12 +190,12 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
with server_call_condition:
server_send_second_message_start_batch_result = (
server_rpc_event.operation_call.start_server_batch([
- cygrpc.operation_send_message(b'\x07', _EMPTY_FLAGS),
+ cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS),
], server_send_second_message_tag))
server_complete_rpc_start_batch_result = (
server_rpc_event.operation_call.start_server_batch([
- cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
- cygrpc.operation_send_status_from_server(
+ cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
+ cygrpc.SendStatusFromServerOperation(
(), cygrpc.StatusCode.ok, b'test details',
_EMPTY_FLAGS),
], server_complete_rpc_tag))
@@ -209,7 +209,7 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase):
client_receive_first_message_tag = 'client_receive_first_message_tag'
client_receive_first_message_start_batch_result = (
client_call.start_client_batch([
- cygrpc.operation_receive_message(_EMPTY_FLAGS),
+ cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
], client_receive_first_message_tag))
client_due.add(client_receive_first_message_tag)
client_receive_first_message_event = client_driver.event_with_tag(
diff --git a/src/python/grpcio_tests/tests/unit/_cython/_server_test.py b/src/python/grpcio_tests/tests/unit/_cython/_server_test.py
new file mode 100644
index 0000000000..12bf40be6b
--- /dev/null
+++ b/src/python/grpcio_tests/tests/unit/_cython/_server_test.py
@@ -0,0 +1,49 @@
+# Copyright 2017 gRPC authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Test servers at the level of the Cython API."""
+
+import threading
+import time
+import unittest
+
+from grpc._cython import cygrpc
+
+
+class Test(unittest.TestCase):
+
+ def test_lonely_server(self):
+ server_call_completion_queue = cygrpc.CompletionQueue()
+ server_shutdown_completion_queue = cygrpc.CompletionQueue()
+ server = cygrpc.Server(cygrpc.ChannelArgs([]))
+ server.register_completion_queue(server_call_completion_queue)
+ server.register_completion_queue(server_shutdown_completion_queue)
+ port = server.add_http2_port(b'[::]:0')
+ server.start()
+
+ server_request_call_tag = 'server_request_call_tag'
+ server_request_call_start_batch_result = server.request_call(
+ server_call_completion_queue, server_call_completion_queue,
+ server_request_call_tag)
+
+ time.sleep(4)
+
+ server_shutdown_tag = 'server_shutdown_tag'
+ server_shutdown_result = server.shutdown(
+ server_shutdown_completion_queue, server_shutdown_tag)
+ server_request_call_event = server_call_completion_queue.poll()
+ server_shutdown_event = server_shutdown_completion_queue.poll()
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
index 33a35ae235..e34892c779 100644
--- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
+++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py
@@ -35,11 +35,6 @@ def _metadata_plugin(context, callback):
class TypeSmokeTest(unittest.TestCase):
- def testOperationFlags(self):
- operation = cygrpc.operation_send_message(b'asdf',
- cygrpc.WriteFlag.no_compress)
- self.assertEqual(cygrpc.WriteFlag.no_compress, operation.flags)
-
def testTimespec(self):
now = time.time()
now_timespec_a = cygrpc.Timespec(now)
@@ -174,7 +169,7 @@ class ServerClientMixin(object):
SERVER_TRAILING_METADATA_KEY = 'california_is_in_a_drought'
SERVER_TRAILING_METADATA_VALUE = 'zomg it is'
SERVER_STATUS_CODE = cygrpc.StatusCode.ok
- SERVER_STATUS_DETAILS = b'our work is never over'
+ SERVER_STATUS_DETAILS = 'our work is never over'
REQUEST = b'in death a member of project mayhem has a name'
RESPONSE = b'his name is robert paulson'
METHOD = b'twinkies'
@@ -196,13 +191,13 @@ class ServerClientMixin(object):
(CLIENT_METADATA_ASCII_KEY, CLIENT_METADATA_ASCII_VALUE,),
(CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE,),)
client_start_batch_result = client_call.start_client_batch([
- cygrpc.operation_send_initial_metadata(client_initial_metadata,
- _EMPTY_FLAGS),
- cygrpc.operation_send_message(REQUEST, _EMPTY_FLAGS),
- cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
- cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
- cygrpc.operation_receive_message(_EMPTY_FLAGS),
- cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS)
+ cygrpc.SendInitialMetadataOperation(client_initial_metadata,
+ _EMPTY_FLAGS),
+ cygrpc.SendMessageOperation(REQUEST, _EMPTY_FLAGS),
+ cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),
], client_call_tag)
self.assertEqual(cygrpc.CallError.ok, client_start_batch_result)
client_event_future = test_utilities.CompletionQueuePollFuture(
@@ -231,12 +226,12 @@ class ServerClientMixin(object):
server_trailing_metadata = (
(SERVER_TRAILING_METADATA_KEY, SERVER_TRAILING_METADATA_VALUE,),)
server_start_batch_result = server_call.start_server_batch([
- cygrpc.operation_send_initial_metadata(
+ cygrpc.SendInitialMetadataOperation(
server_initial_metadata,
- _EMPTY_FLAGS), cygrpc.operation_receive_message(_EMPTY_FLAGS),
- cygrpc.operation_send_message(RESPONSE, _EMPTY_FLAGS),
- cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
- cygrpc.operation_send_status_from_server(
+ _EMPTY_FLAGS), cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
+ cygrpc.SendMessageOperation(RESPONSE, _EMPTY_FLAGS),
+ cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
+ cygrpc.SendStatusFromServerOperation(
server_trailing_metadata, SERVER_STATUS_CODE,
SERVER_STATUS_DETAILS, _EMPTY_FLAGS)
], server_call_tag)
@@ -249,25 +244,24 @@ class ServerClientMixin(object):
found_client_op_types = set()
for client_result in client_event.batch_operations:
# we expect each op type to be unique
- self.assertNotIn(client_result.type, found_client_op_types)
- found_client_op_types.add(client_result.type)
- if client_result.type == cygrpc.OperationType.receive_initial_metadata:
+ self.assertNotIn(client_result.type(), found_client_op_types)
+ found_client_op_types.add(client_result.type())
+ if client_result.type(
+ ) == cygrpc.OperationType.receive_initial_metadata:
self.assertTrue(
test_common.metadata_transmitted(
server_initial_metadata,
- client_result.received_metadata))
- elif client_result.type == cygrpc.OperationType.receive_message:
- self.assertEqual(RESPONSE,
- client_result.received_message.bytes())
- elif client_result.type == cygrpc.OperationType.receive_status_on_client:
+ client_result.initial_metadata()))
+ elif client_result.type() == cygrpc.OperationType.receive_message:
+ self.assertEqual(RESPONSE, client_result.message())
+ elif client_result.type(
+ ) == cygrpc.OperationType.receive_status_on_client:
self.assertTrue(
test_common.metadata_transmitted(
server_trailing_metadata,
- client_result.received_metadata))
- self.assertEqual(SERVER_STATUS_DETAILS,
- client_result.received_status_details)
- self.assertEqual(SERVER_STATUS_CODE,
- client_result.received_status_code)
+ client_result.trailing_metadata()))
+ self.assertEqual(SERVER_STATUS_DETAILS, client_result.details())
+ self.assertEqual(SERVER_STATUS_CODE, client_result.code())
self.assertEqual(
set([
cygrpc.OperationType.send_initial_metadata,
@@ -281,13 +275,13 @@ class ServerClientMixin(object):
self.assertEqual(5, len(server_event.batch_operations))
found_server_op_types = set()
for server_result in server_event.batch_operations:
- self.assertNotIn(client_result.type, found_server_op_types)
- found_server_op_types.add(server_result.type)
- if server_result.type == cygrpc.OperationType.receive_message:
- self.assertEqual(REQUEST,
- server_result.received_message.bytes())
- elif server_result.type == cygrpc.OperationType.receive_close_on_server:
- self.assertFalse(server_result.received_cancelled)
+ self.assertNotIn(client_result.type(), found_server_op_types)
+ found_server_op_types.add(server_result.type())
+ if server_result.type() == cygrpc.OperationType.receive_message:
+ self.assertEqual(REQUEST, server_result.message())
+ elif server_result.type(
+ ) == cygrpc.OperationType.receive_close_on_server:
+ self.assertFalse(server_result.cancelled())
self.assertEqual(
set([
cygrpc.OperationType.send_initial_metadata,
@@ -323,9 +317,8 @@ class ServerClientMixin(object):
cygrpc_deadline, description)
client_event_future = perform_client_operations([
- cygrpc.operation_send_initial_metadata(empty_metadata,
- _EMPTY_FLAGS),
- cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),
+ cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS),
+ cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),
], "Client prologue")
request_event = self.server_completion_queue.poll(cygrpc_deadline)
@@ -337,8 +330,7 @@ class ServerClientMixin(object):
cygrpc_deadline, description)
server_event_future = perform_server_operations([
- cygrpc.operation_send_initial_metadata(empty_metadata,
- _EMPTY_FLAGS),
+ cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS),
], "Server prologue")
client_event_future.result() # force completion
@@ -347,12 +339,12 @@ class ServerClientMixin(object):
# Messaging
for _ in range(10):
client_event_future = perform_client_operations([
- cygrpc.operation_send_message(b'', _EMPTY_FLAGS),
- cygrpc.operation_receive_message(_EMPTY_FLAGS),
+ cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS),
+ cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
], "Client message")
server_event_future = perform_server_operations([
- cygrpc.operation_send_message(b'', _EMPTY_FLAGS),
- cygrpc.operation_receive_message(_EMPTY_FLAGS),
+ cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS),
+ cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),
], "Server receive")
client_event_future.result() # force completion
@@ -360,13 +352,13 @@ class ServerClientMixin(object):
# Epilogue
client_event_future = perform_client_operations([
- cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),
- cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS)
+ cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),
+ cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS)
], "Client epilogue")
server_event_future = perform_server_operations([
- cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),
- cygrpc.operation_send_status_from_server(
+ cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),
+ cygrpc.SendStatusFromServerOperation(
empty_metadata, cygrpc.StatusCode.ok, b'', _EMPTY_FLAGS)
], "Server epilogue")