diff options
Diffstat (limited to 'src')
22 files changed, 579 insertions, 498 deletions
diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index 92b4b8b90b..1ab7e516de 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -1232,8 +1232,6 @@ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) { /* If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return * NULL */ const grpc_event_engine_vtable* grpc_init_epoll1_linux(bool explicit_request) { - gpr_log(GPR_ERROR, - "Skipping epoll1 because GRPC_LINUX_EPOLL is not defined."); return nullptr; } #endif /* defined(GRPC_POSIX_SOCKET) */ diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index df2f629703..5f5f45a7a5 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -1449,8 +1449,6 @@ const grpc_event_engine_vtable* grpc_init_epollex_linux( * NULL */ const grpc_event_engine_vtable* grpc_init_epollex_linux( bool explicitly_requested) { - gpr_log(GPR_ERROR, - "Skipping epollex because GRPC_LINUX_EPOLL is not defined."); return nullptr; } #endif /* defined(GRPC_POSIX_SOCKET) */ diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc index bc548a1fda..8072a6cbed 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.cc +++ b/src/core/lib/iomgr/ev_epollsig_linux.cc @@ -1732,8 +1732,6 @@ const grpc_event_engine_vtable* grpc_init_epollsig_linux( * NULL */ const grpc_event_engine_vtable* grpc_init_epollsig_linux( bool explicit_request) { - gpr_log(GPR_ERROR, - "Skipping epollsig because GRPC_LINUX_EPOLL is not defined."); return nullptr; } #endif /* defined(GRPC_POSIX_SOCKET) */ diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index 53de94fb6e..a569f674f6 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -71,6 +71,7 @@ struct grpc_fd { int shutdown; int closed; int released; + gpr_atm pollhup; grpc_error* shutdown_error; /* The watcher list. @@ -335,6 +336,7 @@ static grpc_fd* fd_create(int fd, const char* name) { r->on_done_closure = nullptr; r->closed = 0; r->released = 0; + gpr_atm_no_barrier_store(&r->pollhup, 0); r->read_notifier_pollset = nullptr; char* name2; @@ -950,7 +952,8 @@ static grpc_error* pollset_work(grpc_pollset* pollset, pfds[0].events = POLLIN; pfds[0].revents = 0; for (i = 0; i < pollset->fd_count; i++) { - if (fd_is_orphaned(pollset->fds[i])) { + if (fd_is_orphaned(pollset->fds[i]) || + gpr_atm_no_barrier_load(&pollset->fds[i]->pollhup) == 1) { GRPC_FD_UNREF(pollset->fds[i], "multipoller"); } else { pollset->fds[fd_count++] = pollset->fds[i]; @@ -1017,6 +1020,12 @@ static grpc_error* pollset_work(grpc_pollset* pollset, pfds[i].fd, (pfds[i].revents & POLLIN_CHECK) != 0, (pfds[i].revents & POLLOUT_CHECK) != 0, pfds[i].revents); } + /* This is a mitigation to prevent poll() from spinning on a + ** POLLHUP https://github.com/grpc/grpc/pull/13665 + */ + if (pfds[i].revents & POLLHUP) { + gpr_atm_no_barrier_store(&watchers[i].fd->pollhup, 1); + } fd_end_poll(&watchers[i], pfds[i].revents & POLLIN_CHECK, pfds[i].revents & POLLOUT_CHECK, pollset); } diff --git a/src/node/health_check/package.json b/src/node/health_check/package.json deleted file mode 100644 index fca3a2a7a6..0000000000 --- a/src/node/health_check/package.json +++ /dev/null @@ -1,29 +0,0 @@ -{ - "name": "grpc-health-check", - "version": "1.7.2", - "author": "Google Inc.", - "description": "Health check service for use with gRPC", - "repository": { - "type": "git", - "url": "https://github.com/grpc/grpc.git" - }, - "bugs": "https://github.com/grpc/grpc/issues", - "contributors": [ - { - "name": "Michael Lumish", - "email": "mlumish@google.com" - } - ], - "dependencies": { - "grpc": "^1.7.2", - "lodash": "^3.9.3", - "google-protobuf": "^3.0.0" - }, - "files": [ - "LICENSE", - "health.js", - "v1" - ], - "main": "src/node/index.js", - "license": "Apache-2.0" -} diff --git a/src/node/tools/package.json b/src/node/tools/package.json deleted file mode 100644 index 99fd854067..0000000000 --- a/src/node/tools/package.json +++ /dev/null @@ -1,41 +0,0 @@ -{ - "name": "grpc-tools", - "version": "1.7.2", - "author": "Google Inc.", - "description": "Tools for developing with gRPC on Node.js", - "homepage": "https://grpc.io/", - "repository": { - "type": "git", - "url": "https://github.com/grpc/grpc.git" - }, - "bugs": "https://github.com/grpc/grpc/issues", - "contributors": [ - { - "name": "Michael Lumish", - "email": "mlumish@google.com" - } - ], - "bin": { - "grpc_tools_node_protoc": "./bin/protoc.js", - "grpc_tools_node_protoc_plugin": "./bin/protoc_plugin.js" - }, - "scripts": { - "install": "./node_modules/.bin/node-pre-gyp install" - }, - "bundledDependencies": ["node-pre-gyp"], - "binary": { - "module_name": "grpc_tools", - "host": "https://storage.googleapis.com/", - "remote_path": "grpc-precompiled-binaries/node/{name}/v{version}", - "package_name": "{platform}-{arch}.tar.gz", - "module_path": "bin" - }, - "files": [ - "index.js", - "bin/protoc.js", - "bin/protoc_plugin.js", - "bin/google/protobuf", - "LICENSE" - ], - "main": "index.js" -} diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index d7456a3dd1..3572737c87 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -129,12 +129,12 @@ def _abort(state, code, details): def _handle_event(event, state, response_deserializer): callbacks = [] for batch_operation in event.batch_operations: - operation_type = batch_operation.type + operation_type = batch_operation.type() state.due.remove(operation_type) if operation_type == cygrpc.OperationType.receive_initial_metadata: - state.initial_metadata = batch_operation.received_metadata + state.initial_metadata = batch_operation.initial_metadata() elif operation_type == cygrpc.OperationType.receive_message: - serialized_response = batch_operation.received_message.bytes() + serialized_response = batch_operation.message() if serialized_response is not None: response = _common.deserialize(serialized_response, response_deserializer) @@ -144,18 +144,17 @@ def _handle_event(event, state, response_deserializer): else: state.response = response elif operation_type == cygrpc.OperationType.receive_status_on_client: - state.trailing_metadata = batch_operation.received_metadata + state.trailing_metadata = batch_operation.trailing_metadata() if state.code is None: code = _common.CYGRPC_STATUS_CODE_TO_STATUS_CODE.get( - batch_operation.received_status_code) + batch_operation.code()) if code is None: state.code = grpc.StatusCode.UNKNOWN state.details = _unknown_code_details( - batch_operation.received_status_code, - batch_operation.received_status_details) + code, batch_operation.details()) else: state.code = code - state.details = batch_operation.received_status_details + state.details = batch_operation.details() callbacks.extend(state.callbacks) state.callbacks = None return callbacks @@ -200,7 +199,7 @@ def _consume_request_iterator(request_iterator, state, call, _abort(state, grpc.StatusCode.INTERNAL, details) return else: - operations = (cygrpc.operation_send_message( + operations = (cygrpc.SendMessageOperation( serialized_request, _EMPTY_FLAGS),) call.start_client_batch(operations, event_handler) state.due.add(cygrpc.OperationType.send_message) @@ -216,7 +215,7 @@ def _consume_request_iterator(request_iterator, state, call, with state.condition: if state.code is None: operations = ( - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS),) + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),) call.start_client_batch(operations, event_handler) state.due.add(cygrpc.OperationType.send_close_from_client) @@ -319,7 +318,7 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): event_handler = _event_handler(self._state, self._call, self._response_deserializer) self._call.start_client_batch( - (cygrpc.operation_receive_message(_EMPTY_FLAGS),), + (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), event_handler) self._state.due.add(cygrpc.OperationType.receive_message) elif self._state.code is grpc.StatusCode.OK: @@ -453,12 +452,12 @@ class _UnaryUnaryMultiCallable(grpc.UnaryUnaryMultiCallable): else: state = _RPCState(_UNARY_UNARY_INITIAL_DUE, None, None, None, None) operations = ( - cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS), - cygrpc.operation_send_message(serialized_request, _EMPTY_FLAGS), - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), - cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) + cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), + cygrpc.SendMessageOperation(serialized_request, _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),) return state, operations, deadline, deadline_timespec, None def _blocking(self, request, timeout, metadata, credentials): @@ -536,14 +535,14 @@ class _UnaryStreamMultiCallable(grpc.UnaryStreamMultiCallable): self._response_deserializer) with state.condition: call.start_client_batch( - (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),), + (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), event_handler) operations = ( - cygrpc.operation_send_initial_metadata( - metadata, _EMPTY_FLAGS), cygrpc.operation_send_message( + cygrpc.SendInitialMetadataOperation( + metadata, _EMPTY_FLAGS), cygrpc.SendMessageOperation( serialized_request, _EMPTY_FLAGS), - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),) call_error = call.start_client_batch(operations, event_handler) if call_error != cygrpc.CallError.ok: _call_error_set_RPCstate(state, call_error, metadata) @@ -573,12 +572,11 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): call.set_credentials(credentials._credentials) with state.condition: call.start_client_batch( - (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),), - None) + (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), None) operations = ( - cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) + cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),) call_error = call.start_client_batch(operations, None) _check_call_error(call_error, metadata) _consume_request_iterator(request_iterator, state, call, @@ -624,12 +622,12 @@ class _StreamUnaryMultiCallable(grpc.StreamUnaryMultiCallable): event_handler = _event_handler(state, call, self._response_deserializer) with state.condition: call.start_client_batch( - (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),), + (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), event_handler) operations = ( - cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) + cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),) call_error = call.start_client_batch(operations, event_handler) if call_error != cygrpc.CallError.ok: _call_error_set_RPCstate(state, call_error, metadata) @@ -664,11 +662,11 @@ class _StreamStreamMultiCallable(grpc.StreamStreamMultiCallable): event_handler = _event_handler(state, call, self._response_deserializer) with state.condition: call.start_client_batch( - (cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS),), + (cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS),), event_handler) operations = ( - cygrpc.operation_send_initial_metadata(metadata, _EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) + cygrpc.SendInitialMetadataOperation(metadata, _EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),) call_error = call.start_client_batch(operations, event_handler) if call_error != cygrpc.CallError.ok: _call_error_set_RPCstate(state, call_error, metadata) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index 6a72bbf693..6ee833697d 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -17,6 +17,7 @@ cimport libc.time # Typedef types with approximately the same semantics to provide their names to # Cython +ctypedef unsigned char uint8_t ctypedef int int32_t ctypedef unsigned uint32_t ctypedef long int64_t @@ -25,6 +26,7 @@ ctypedef long int64_t cdef extern from "grpc/support/alloc.h": void *gpr_malloc(size_t size) nogil + void *gpr_zalloc(size_t size) nogil void gpr_free(void *ptr) nogil void *gpr_realloc(void *p, size_t size) nogil @@ -183,6 +185,18 @@ cdef extern from "grpc/grpc.h": size_t arguments_length "num_args" grpc_arg *arguments "args" + ctypedef enum grpc_compression_level: + GRPC_COMPRESS_LEVEL_NONE + GRPC_COMPRESS_LEVEL_LOW + GRPC_COMPRESS_LEVEL_MED + GRPC_COMPRESS_LEVEL_HIGH + + ctypedef enum grpc_stream_compression_level: + GRPC_STREAM_COMPRESS_LEVEL_NONE + GRPC_STREAM_COMPRESS_LEVEL_LOW + GRPC_STREAM_COMPRESS_LEVEL_MED + GRPC_STREAM_COMPRESS_LEVEL_HIGH + ctypedef enum grpc_call_error: GRPC_CALL_OK GRPC_CALL_ERROR @@ -258,9 +272,19 @@ cdef extern from "grpc/grpc.h": GRPC_OP_RECV_STATUS_ON_CLIENT GRPC_OP_RECV_CLOSE_ON_SERVER + ctypedef struct grpc_op_send_initial_metadata_maybe_compression_level: + uint8_t is_set + grpc_compression_level level + + ctypedef struct grpc_op_send_initial_metadata_maybe_stream_compression_level: + uint8_t is_set + grpc_stream_compression_level level + ctypedef struct grpc_op_data_send_initial_metadata: size_t count grpc_metadata *metadata + grpc_op_send_initial_metadata_maybe_compression_level maybe_compression_level + grpc_op_send_initial_metadata_maybe_stream_compression_level maybe_stream_compression_level ctypedef struct grpc_op_data_send_status_from_server: size_t trailing_metadata_count diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi new file mode 100644 index 0000000000..bfbe27785b --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pxd.pxi @@ -0,0 +1,109 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class Operation: + + cdef void c(self) + cdef void un_c(self) + + # TODO(https://github.com/grpc/grpc/issues/7950): Eliminate this! + cdef grpc_op c_op + + +cdef class SendInitialMetadataOperation(Operation): + + cdef readonly object _initial_metadata; + cdef readonly int _flags + cdef grpc_metadata *_c_initial_metadata + cdef size_t _c_initial_metadata_count + + cdef void c(self) + cdef void un_c(self) + + +cdef class SendMessageOperation(Operation): + + cdef readonly bytes _message + cdef readonly int _flags + cdef grpc_byte_buffer *_c_message_byte_buffer + + cdef void c(self) + cdef void un_c(self) + + +cdef class SendCloseFromClientOperation(Operation): + + cdef readonly int _flags + + cdef void c(self) + cdef void un_c(self) + + +cdef class SendStatusFromServerOperation(Operation): + + cdef readonly object _trailing_metadata + cdef readonly object _code + cdef readonly object _details + cdef readonly int _flags + cdef grpc_metadata *_c_trailing_metadata + cdef size_t _c_trailing_metadata_count + cdef grpc_slice _c_details + + cdef void c(self) + cdef void un_c(self) + + +cdef class ReceiveInitialMetadataOperation(Operation): + + cdef readonly int _flags + cdef tuple _initial_metadata + cdef grpc_metadata_array _c_initial_metadata + + cdef void c(self) + cdef void un_c(self) + + +cdef class ReceiveMessageOperation(Operation): + + cdef readonly int _flags + cdef grpc_byte_buffer *_c_message_byte_buffer + cdef bytes _message + + cdef void c(self) + cdef void un_c(self) + + +cdef class ReceiveStatusOnClientOperation(Operation): + + cdef readonly int _flags + cdef grpc_metadata_array _c_trailing_metadata + cdef grpc_status_code _c_code + cdef grpc_slice _c_details + cdef tuple _trailing_metadata + cdef object _code + cdef str _details + + cdef void c(self) + cdef void un_c(self) + + +cdef class ReceiveCloseOnServerOperation(Operation): + + cdef readonly int _flags + cdef object _cancelled + cdef int _c_cancelled + + cdef void c(self) + cdef void un_c(self) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi new file mode 100644 index 0000000000..3c91abf722 --- /dev/null +++ b/src/python/grpcio/grpc/_cython/_cygrpc/operation.pyx.pxi @@ -0,0 +1,238 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +cdef class Operation: + + cdef void c(self): + raise NotImplementedError() + + cdef void un_c(self): + raise NotImplementedError() + + +cdef class SendInitialMetadataOperation(Operation): + + def __cinit__(self, initial_metadata, flags): + self._initial_metadata = initial_metadata + self._flags = flags + + def type(self): + return GRPC_OP_SEND_INITIAL_METADATA + + cdef void c(self): + self.c_op.type = GRPC_OP_SEND_INITIAL_METADATA + self.c_op.flags = self._flags + _store_c_metadata( + self._initial_metadata, &self._c_initial_metadata, + &self._c_initial_metadata_count) + self.c_op.data.send_initial_metadata.metadata = self._c_initial_metadata + self.c_op.data.send_initial_metadata.count = self._c_initial_metadata_count + self.c_op.data.send_initial_metadata.maybe_compression_level.is_set = 0 + self.c_op.data.send_initial_metadata.maybe_stream_compression_level.is_set = 0 + + cdef void un_c(self): + _release_c_metadata( + self._c_initial_metadata, self._c_initial_metadata_count) + + +cdef class SendMessageOperation(Operation): + + def __cinit__(self, bytes message, int flags): + self._message = message + self._flags = flags + + def type(self): + return GRPC_OP_SEND_MESSAGE + + cdef void c(self): + self.c_op.type = GRPC_OP_SEND_MESSAGE + self.c_op.flags = self._flags + cdef grpc_slice message_slice = grpc_slice_from_copied_buffer( + self._message, len(self._message)) + self._c_message_byte_buffer = grpc_raw_byte_buffer_create( + &message_slice, 1) + grpc_slice_unref(message_slice) + self.c_op.data.send_message.send_message = self._c_message_byte_buffer + + cdef void un_c(self): + grpc_byte_buffer_destroy(self._c_message_byte_buffer) + + +cdef class SendCloseFromClientOperation(Operation): + + def __cinit__(self, int flags): + self._flags = flags + + def type(self): + return GRPC_OP_SEND_CLOSE_FROM_CLIENT + + cdef void c(self): + self.c_op.type = GRPC_OP_SEND_CLOSE_FROM_CLIENT + self.c_op.flags = self._flags + + cdef void un_c(self): + pass + + +cdef class SendStatusFromServerOperation(Operation): + + def __cinit__(self, trailing_metadata, code, object details, int flags): + self._trailing_metadata = trailing_metadata + self._code = code + self._details = details + self._flags = flags + + def type(self): + return GRPC_OP_SEND_STATUS_FROM_SERVER + + cdef void c(self): + self.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER + self.c_op.flags = self._flags + _store_c_metadata( + self._trailing_metadata, &self._c_trailing_metadata, + &self._c_trailing_metadata_count) + self.c_op.data.send_status_from_server.trailing_metadata = ( + self._c_trailing_metadata) + self.c_op.data.send_status_from_server.trailing_metadata_count = ( + self._c_trailing_metadata_count) + self.c_op.data.send_status_from_server.status = self._code + self._c_details = _slice_from_bytes(_encode(self._details)) + self.c_op.data.send_status_from_server.status_details = &self._c_details + + cdef void un_c(self): + grpc_slice_unref(self._c_details) + _release_c_metadata( + self._c_trailing_metadata, self._c_trailing_metadata_count) + + +cdef class ReceiveInitialMetadataOperation(Operation): + + def __cinit__(self, flags): + self._flags = flags + + def type(self): + return GRPC_OP_RECV_INITIAL_METADATA + + cdef void c(self): + self.c_op.type = GRPC_OP_RECV_INITIAL_METADATA + self.c_op.flags = self._flags + grpc_metadata_array_init(&self._c_initial_metadata) + self.c_op.data.receive_initial_metadata.receive_initial_metadata = ( + &self._c_initial_metadata) + + cdef void un_c(self): + self._initial_metadata = _metadata(&self._c_initial_metadata) + grpc_metadata_array_destroy(&self._c_initial_metadata) + + def initial_metadata(self): + return self._initial_metadata + + +cdef class ReceiveMessageOperation(Operation): + + def __cinit__(self, flags): + self._flags = flags + + def type(self): + return GRPC_OP_RECV_MESSAGE + + cdef void c(self): + self.c_op.type = GRPC_OP_RECV_MESSAGE + self.c_op.flags = self._flags + self.c_op.data.receive_message.receive_message = ( + &self._c_message_byte_buffer) + + cdef void un_c(self): + cdef grpc_byte_buffer_reader message_reader + cdef bint message_reader_status + cdef grpc_slice message_slice + cdef size_t message_slice_length + cdef void *message_slice_pointer + if self._c_message_byte_buffer != NULL: + message_reader_status = grpc_byte_buffer_reader_init( + &message_reader, self._c_message_byte_buffer) + if message_reader_status: + message = bytearray() + while grpc_byte_buffer_reader_next(&message_reader, &message_slice): + message_slice_pointer = grpc_slice_start_ptr(message_slice) + message_slice_length = grpc_slice_length(message_slice) + message += (<char *>message_slice_pointer)[:message_slice_length] + grpc_slice_unref(message_slice) + grpc_byte_buffer_reader_destroy(&message_reader) + self._message = bytes(message) + else: + self._message = None + grpc_byte_buffer_destroy(self._c_message_byte_buffer) + else: + self._message = None + + def message(self): + return self._message + + +cdef class ReceiveStatusOnClientOperation(Operation): + + def __cinit__(self, flags): + self._flags = flags + + def type(self): + return GRPC_OP_RECV_STATUS_ON_CLIENT + + cdef void c(self): + self.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT + self.c_op.flags = self._flags + grpc_metadata_array_init(&self._c_trailing_metadata) + self.c_op.data.receive_status_on_client.trailing_metadata = ( + &self._c_trailing_metadata) + self.c_op.data.receive_status_on_client.status = ( + &self._c_code) + self.c_op.data.receive_status_on_client.status_details = ( + &self._c_details) + + cdef void un_c(self): + self._trailing_metadata = _metadata(&self._c_trailing_metadata) + grpc_metadata_array_destroy(&self._c_trailing_metadata) + self._code = self._c_code + self._details = _decode(_slice_bytes(self._c_details)) + grpc_slice_unref(self._c_details) + + def trailing_metadata(self): + return self._trailing_metadata + + def code(self): + return self._code + + def details(self): + return self._details + + +cdef class ReceiveCloseOnServerOperation(Operation): + + def __cinit__(self, flags): + self._flags = flags + + def type(self): + return GRPC_OP_RECV_CLOSE_ON_SERVER + + cdef void c(self): + self.c_op.type = GRPC_OP_RECV_CLOSE_ON_SERVER + self.c_op.flags = self._flags + self.c_op.data.receive_close_on_server.cancelled = &self._c_cancelled + + cdef void un_c(self): + self._cancelled = bool(self._c_cancelled) + + def cancelled(self): + return self._cancelled diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi index 594fdb1a8b..537cf2b537 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pxd.pxi @@ -65,11 +65,6 @@ cdef class Event: cdef readonly object batch_operations -cdef class ByteBuffer: - - cdef grpc_byte_buffer *c_byte_buffer - - cdef class SslPemKeyCertPair: cdef grpc_ssl_pem_key_cert_pair c_pair @@ -89,22 +84,6 @@ cdef class ChannelArgs: cdef list args -cdef class Operation: - - cdef grpc_op c_op - cdef bint _c_metadata_needs_release - cdef size_t _c_metadata_count - cdef grpc_metadata *_c_metadata - cdef ByteBuffer _received_message - cdef bint _c_metadata_array_needs_destruction - cdef grpc_metadata_array _c_metadata_array - cdef grpc_status_code _received_status_code - cdef grpc_slice _status_details - cdef int _received_cancelled - cdef readonly bint is_valid - cdef object references - - cdef class CompressionOptions: cdef grpc_compression_options c_options diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi index 26eaf50eb4..7e6272fe2a 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi @@ -229,13 +229,15 @@ cdef class OperationTag: self.c_nops = 0 if self._operations is None else len(self._operations) if 0 < self.c_nops: self.c_ops = <grpc_op *>gpr_malloc(sizeof(grpc_op) * self.c_nops) - for index in range(self.c_nops): - self.c_ops[index] = (<Operation>(self._operations[index])).c_op + for index, operation in enumerate(self._operations): + (<Operation>operation).c() + self.c_ops[index] = (<Operation>operation).c_op cdef object release_ops(self): if 0 < self.c_nops: for index, operation in enumerate(self._operations): (<Operation>operation).c_op = self.c_ops[index] + (<Operation>operation).un_c() gpr_free(self.c_ops) return self._operations else: @@ -260,69 +262,6 @@ cdef class Event: self.is_new_request = is_new_request -cdef class ByteBuffer: - - def __cinit__(self, bytes data): - grpc_init() - if data is None: - self.c_byte_buffer = NULL - return - - cdef char *c_data = data - cdef grpc_slice data_slice - cdef size_t data_length = len(data) - with nogil: - data_slice = grpc_slice_from_copied_buffer(c_data, data_length) - with nogil: - self.c_byte_buffer = grpc_raw_byte_buffer_create( - &data_slice, 1) - with nogil: - grpc_slice_unref(data_slice) - - def bytes(self): - cdef grpc_byte_buffer_reader reader - cdef grpc_slice data_slice - cdef size_t data_slice_length - cdef void *data_slice_pointer - cdef bint reader_status - if self.c_byte_buffer != NULL: - with nogil: - reader_status = grpc_byte_buffer_reader_init( - &reader, self.c_byte_buffer) - if not reader_status: - return None - result = bytearray() - with nogil: - while grpc_byte_buffer_reader_next(&reader, &data_slice): - data_slice_pointer = grpc_slice_start_ptr(data_slice) - data_slice_length = grpc_slice_length(data_slice) - with gil: - result += (<char *>data_slice_pointer)[:data_slice_length] - grpc_slice_unref(data_slice) - with nogil: - grpc_byte_buffer_reader_destroy(&reader) - return bytes(result) - else: - return None - - def __len__(self): - cdef size_t result - if self.c_byte_buffer != NULL: - with nogil: - result = grpc_byte_buffer_length(self.c_byte_buffer) - return result - else: - return 0 - - def __str__(self): - return self.bytes() - - def __dealloc__(self): - if self.c_byte_buffer != NULL: - grpc_byte_buffer_destroy(self.c_byte_buffer) - grpc_shutdown() - - cdef class SslPemKeyCertPair: def __cinit__(self, bytes private_key, bytes certificate_chain): @@ -407,185 +346,6 @@ cdef class ChannelArgs: return self.args[i] -cdef class Operation: - - def __cinit__(self): - grpc_init() - self.references = [] - self._c_metadata_needs_release = False - self._c_metadata_array_needs_destruction = False - self._status_details = grpc_empty_slice() - self.is_valid = False - - @property - def type(self): - return self.c_op.type - - @property - def flags(self): - return self.c_op.flags - - @property - def has_status(self): - return self.c_op.type == GRPC_OP_RECV_STATUS_ON_CLIENT - - @property - def received_message(self): - if self.c_op.type != GRPC_OP_RECV_MESSAGE: - raise TypeError("self must be an operation receiving a message") - return self._received_message - - @property - def received_message_or_none(self): - if self.c_op.type != GRPC_OP_RECV_MESSAGE: - return None - return self._received_message - - @property - def received_metadata(self): - if (self.c_op.type != GRPC_OP_RECV_INITIAL_METADATA and - self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT): - raise TypeError("self must be an operation receiving metadata") - return _metadata(&self._c_metadata_array) - - @property - def received_status_code(self): - if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT: - raise TypeError("self must be an operation receiving a status code") - return self._received_status_code - - @property - def received_status_code_or_none(self): - if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT: - return None - return self._received_status_code - - @property - def received_status_details(self): - if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT: - raise TypeError("self must be an operation receiving status details") - return _slice_bytes(self._status_details) - - @property - def received_status_details_or_none(self): - if self.c_op.type != GRPC_OP_RECV_STATUS_ON_CLIENT: - return None - return _slice_bytes(self._status_details) - - @property - def received_cancelled(self): - if self.c_op.type != GRPC_OP_RECV_CLOSE_ON_SERVER: - raise TypeError("self must be an operation receiving cancellation " - "information") - return False if self._received_cancelled == 0 else True - - @property - def received_cancelled_or_none(self): - if self.c_op.type != GRPC_OP_RECV_CLOSE_ON_SERVER: - return None - return False if self._received_cancelled == 0 else True - - def __dealloc__(self): - if self._c_metadata_needs_release: - _release_c_metadata(self._c_metadata, self._c_metadata_count) - if self._c_metadata_array_needs_destruction: - grpc_metadata_array_destroy(&self._c_metadata_array) - grpc_slice_unref(self._status_details) - grpc_shutdown() - -def operation_send_initial_metadata(metadata, int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_SEND_INITIAL_METADATA - op.c_op.flags = flags - _store_c_metadata(metadata, &op._c_metadata, &op._c_metadata_count) - op._c_metadata_needs_release = True - op.c_op.data.send_initial_metadata.count = op._c_metadata_count - op.c_op.data.send_initial_metadata.metadata = op._c_metadata - op.is_valid = True - return op - -def operation_send_message(data, int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_SEND_MESSAGE - op.c_op.flags = flags - byte_buffer = ByteBuffer(data) - op.c_op.data.send_message.send_message = byte_buffer.c_byte_buffer - op.references.append(byte_buffer) - op.is_valid = True - return op - -def operation_send_close_from_client(int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_SEND_CLOSE_FROM_CLIENT - op.c_op.flags = flags - op.is_valid = True - return op - -def operation_send_status_from_server( - metadata, grpc_status_code code, bytes details, int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_SEND_STATUS_FROM_SERVER - op.c_op.flags = flags - _store_c_metadata(metadata, &op._c_metadata, &op._c_metadata_count) - op._c_metadata_needs_release = True - op.c_op.data.send_status_from_server.trailing_metadata_count = ( - op._c_metadata_count) - op.c_op.data.send_status_from_server.trailing_metadata = op._c_metadata - op.c_op.data.send_status_from_server.status = code - grpc_slice_unref(op._status_details) - op._status_details = _slice_from_bytes(details) - op.c_op.data.send_status_from_server.status_details = &op._status_details - op.is_valid = True - return op - -def operation_receive_initial_metadata(int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_RECV_INITIAL_METADATA - op.c_op.flags = flags - grpc_metadata_array_init(&op._c_metadata_array) - op.c_op.data.receive_initial_metadata.receive_initial_metadata = ( - &op._c_metadata_array) - op._c_metadata_array_needs_destruction = True - op.is_valid = True - return op - -def operation_receive_message(int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_RECV_MESSAGE - op.c_op.flags = flags - op._received_message = ByteBuffer(None) - # n.b. the c_op.data.receive_message field needs to be deleted by us, - # anyway, so we just let that be handled by the ByteBuffer() we allocated - # the line before. - op.c_op.data.receive_message.receive_message = ( - &op._received_message.c_byte_buffer) - op.is_valid = True - return op - -def operation_receive_status_on_client(int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_RECV_STATUS_ON_CLIENT - op.c_op.flags = flags - grpc_metadata_array_init(&op._c_metadata_array) - op.c_op.data.receive_status_on_client.trailing_metadata = ( - &op._c_metadata_array) - op._c_metadata_array_needs_destruction = True - op.c_op.data.receive_status_on_client.status = ( - &op._received_status_code) - op.c_op.data.receive_status_on_client.status_details = ( - &op._status_details) - op.is_valid = True - return op - -def operation_receive_close_on_server(int flags): - cdef Operation op = Operation() - op.c_op.type = GRPC_OP_RECV_CLOSE_ON_SERVER - op.c_op.flags = flags - op.c_op.data.receive_close_on_server.cancelled = &op._received_cancelled - op.is_valid = True - return op - - cdef class CompressionOptions: def __cinit__(self): diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pxd b/src/python/grpcio/grpc/_cython/cygrpc.pxd index 6fc5638d5d..ad229de0ae 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pxd +++ b/src/python/grpcio/grpc/_cython/cygrpc.pxd @@ -19,6 +19,7 @@ include "_cygrpc/channel.pxd.pxi" include "_cygrpc/credentials.pxd.pxi" include "_cygrpc/completion_queue.pxd.pxi" include "_cygrpc/metadata.pxd.pxi" +include "_cygrpc/operation.pxd.pxi" include "_cygrpc/records.pxd.pxi" include "_cygrpc/security.pxd.pxi" include "_cygrpc/server.pxd.pxi" diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index d605229822..0964fb66ab 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -26,6 +26,7 @@ include "_cygrpc/channel.pyx.pxi" include "_cygrpc/credentials.pyx.pxi" include "_cygrpc/completion_queue.pyx.pxi" include "_cygrpc/metadata.pyx.pxi" +include "_cygrpc/operation.pyx.pxi" include "_cygrpc/records.pyx.pxi" include "_cygrpc/security.pyx.pxi" include "_cygrpc/server.pyx.pxi" diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index 02d3af8706..eec31bdcf6 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -50,7 +50,7 @@ _UNEXPECTED_EXIT_SERVER_GRACE = 1.0 def _serialized_request(request_event): - return request_event.batch_operations[0].received_message.bytes() + return request_event.batch_operations[0].message() def _application_code(code): @@ -130,13 +130,13 @@ def _abort(state, call, code, details): effective_code = _abortion_code(state, code) effective_details = details if state.details is None else state.details if state.initial_metadata_allowed: - operations = (cygrpc.operation_send_initial_metadata( - (), _EMPTY_FLAGS), cygrpc.operation_send_status_from_server( + operations = (cygrpc.SendInitialMetadataOperation( + None, _EMPTY_FLAGS), cygrpc.SendStatusFromServerOperation( state.trailing_metadata, effective_code, effective_details, _EMPTY_FLAGS),) token = _SEND_INITIAL_METADATA_AND_SEND_STATUS_FROM_SERVER_TOKEN else: - operations = (cygrpc.operation_send_status_from_server( + operations = (cygrpc.SendStatusFromServerOperation( state.trailing_metadata, effective_code, effective_details, _EMPTY_FLAGS),) token = _SEND_STATUS_FROM_SERVER_TOKEN @@ -150,8 +150,7 @@ def _receive_close_on_server(state): def receive_close_on_server(receive_close_on_server_event): with state.condition: - if receive_close_on_server_event.batch_operations[ - 0].received_cancelled: + if receive_close_on_server_event.batch_operations[0].cancelled(): state.client = _CANCELLED elif state.client is _OPEN: state.client = _CLOSED @@ -262,7 +261,7 @@ class _Context(grpc.ServicerContext): _raise_rpc_error(self._state) else: if self._state.initial_metadata_allowed: - operation = cygrpc.operation_send_initial_metadata( + operation = cygrpc.SendInitialMetadataOperation( initial_metadata, _EMPTY_FLAGS) self._rpc_event.operation_call.start_server_batch( (operation,), _send_initial_metadata(self._state)) @@ -305,7 +304,7 @@ class _RequestIterator(object): raise StopIteration() else: self._call.start_server_batch( - (cygrpc.operation_receive_message(_EMPTY_FLAGS),), + (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), _receive_message(self._state, self._call, self._request_deserializer)) self._state.due.add(_RECEIVE_MESSAGE_TOKEN) @@ -348,7 +347,7 @@ def _unary_request(rpc_event, state, request_deserializer): return None else: rpc_event.operation_call.start_server_batch( - (cygrpc.operation_receive_message(_EMPTY_FLAGS),), + (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), _receive_message(state, rpc_event.operation_call, request_deserializer)) state.due.add(_RECEIVE_MESSAGE_TOKEN) @@ -424,14 +423,15 @@ def _send_response(rpc_event, state, serialized_response): return False else: if state.initial_metadata_allowed: - operations = (cygrpc.operation_send_initial_metadata( - (), _EMPTY_FLAGS), cygrpc.operation_send_message( - serialized_response, _EMPTY_FLAGS),) + operations = (cygrpc.SendInitialMetadataOperation(None, + _EMPTY_FLAGS), + cygrpc.SendMessageOperation(serialized_response, + _EMPTY_FLAGS),) state.initial_metadata_allowed = False token = _SEND_INITIAL_METADATA_AND_SEND_MESSAGE_TOKEN else: - operations = (cygrpc.operation_send_message(serialized_response, - _EMPTY_FLAGS),) + operations = (cygrpc.SendMessageOperation(serialized_response, + _EMPTY_FLAGS),) token = _SEND_MESSAGE_TOKEN rpc_event.operation_call.start_server_batch( operations, _send_message(state, token)) @@ -448,16 +448,16 @@ def _status(rpc_event, state, serialized_response): code = _completion_code(state) details = _details(state) operations = [ - cygrpc.operation_send_status_from_server( + cygrpc.SendStatusFromServerOperation( state.trailing_metadata, code, details, _EMPTY_FLAGS), ] if state.initial_metadata_allowed: operations.append( - cygrpc.operation_send_initial_metadata((), _EMPTY_FLAGS)) + cygrpc.SendInitialMetadataOperation(None, _EMPTY_FLAGS)) if serialized_response is not None: operations.append( - cygrpc.operation_send_message(serialized_response, - _EMPTY_FLAGS)) + cygrpc.SendMessageOperation(serialized_response, + _EMPTY_FLAGS)) rpc_event.operation_call.start_server_batch( operations, _send_status_from_server(state, _SEND_STATUS_FROM_SERVER_TOKEN)) @@ -563,10 +563,10 @@ def _find_method_handler(rpc_event, generic_handlers, interceptor_pipeline): def _reject_rpc(rpc_event, status, details): - operations = (cygrpc.operation_send_initial_metadata((), _EMPTY_FLAGS), - cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), - cygrpc.operation_send_status_from_server((), status, details, - _EMPTY_FLAGS),) + operations = (cygrpc.SendInitialMetadataOperation(None, _EMPTY_FLAGS), + cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation(None, status, details, + _EMPTY_FLAGS),) rpc_state = _RPCState() rpc_event.operation_call.start_server_batch( operations, lambda ignored_event: (rpc_state, (),)) @@ -577,7 +577,7 @@ def _handle_with_method_handler(rpc_event, method_handler, thread_pool): state = _RPCState() with state.condition: rpc_event.operation_call.start_server_batch( - (cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),), + (cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),), _receive_close_on_server(state)) state.due.add(_RECEIVE_CLOSE_ON_SERVER_TOKEN) if method_handler.request_streaming: diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index 3bf5308749..e033c1063f 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -34,6 +34,7 @@ "unit._cython._no_messages_server_completion_queue_per_call_test.Test", "unit._cython._no_messages_single_server_completion_queue_test.Test", "unit._cython._read_some_but_not_all_responses_test.ReadSomeButNotAllResponsesTest", + "unit._cython._server_test.Test", "unit._cython.cygrpc_test.InsecureServerInsecureClient", "unit._cython.cygrpc_test.SecureServerSecureClient", "unit._cython.cygrpc_test.TypeSmokeTest", diff --git a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py index 7df8e2fde6..75b6b9e928 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_cancel_many_calls_test.py @@ -65,10 +65,10 @@ class _Handler(object): with self._lock: self._call.start_server_batch( - (cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS),), + (cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS),), _RECEIVE_CLOSE_ON_SERVER_TAG) self._call.start_server_batch( - (cygrpc.operation_receive_message(_EMPTY_FLAGS),), + (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), _RECEIVE_MESSAGE_TAG) first_event = self._completion_queue.poll() if _is_cancellation_event(first_event): @@ -76,10 +76,10 @@ class _Handler(object): else: with self._lock: operations = ( - cygrpc.operation_send_initial_metadata(_EMPTY_METADATA, - _EMPTY_FLAGS), - cygrpc.operation_send_message(b'\x79\x57', _EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( + cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA, + _EMPTY_FLAGS), + cygrpc.SendMessageOperation(b'\x79\x57', _EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( _EMPTY_METADATA, cygrpc.StatusCode.ok, b'test details!', _EMPTY_FLAGS),) self._call.start_server_batch(operations, @@ -170,13 +170,13 @@ class CancelManyCallsTest(unittest.TestCase): None, _EMPTY_FLAGS, client_completion_queue, b'/twinkies', None, _INFINITE_FUTURE) operations = ( - cygrpc.operation_send_initial_metadata(_EMPTY_METADATA, - _EMPTY_FLAGS), - cygrpc.operation_send_message(b'\x45\x56', _EMPTY_FLAGS), - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), - cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS),) + cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA, + _EMPTY_FLAGS), + cygrpc.SendMessageOperation(b'\x45\x56', _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS),) tag = 'client_complete_call_{0:04d}_tag'.format(index) client_call.start_client_batch(operations, tag) client_due.add(tag) diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py index d08003af44..41291cc88f 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_server_completion_queue_per_call_test.py @@ -49,18 +49,19 @@ class Test(_common.RpcTest, unittest.TestCase): with self.client_condition: client_receive_initial_metadata_start_batch_result = ( client_call.start_client_batch([ - cygrpc.operation_receive_initial_metadata( - _common.EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS), ], client_receive_initial_metadata_tag)) + self.assertEqual(cygrpc.CallError.ok, + client_receive_initial_metadata_start_batch_result) client_complete_rpc_start_batch_result = client_call.start_client_batch( [ - cygrpc.operation_send_initial_metadata( + cygrpc.SendInitialMetadataOperation( _common.INVOCATION_METADATA, _common.EMPTY_FLAGS), - cygrpc.operation_send_close_from_client( - _common.EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client( - _common.EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS), ], client_complete_rpc_tag) + self.assertEqual(cygrpc.CallError.ok, + client_complete_rpc_start_batch_result) self.client_driver.add_due({ client_receive_initial_metadata_tag, client_complete_rpc_tag, @@ -72,7 +73,7 @@ class Test(_common.RpcTest, unittest.TestCase): with server_call_condition: server_send_initial_metadata_start_batch_result = ( server_request_call_event.operation_call.start_server_batch([ - cygrpc.operation_send_initial_metadata( + cygrpc.SendInitialMetadataOperation( _common.INITIAL_METADATA, _common.EMPTY_FLAGS), ], server_send_initial_metadata_tag)) server_call_driver.add_due({ @@ -84,9 +85,8 @@ class Test(_common.RpcTest, unittest.TestCase): with server_call_condition: server_complete_rpc_start_batch_result = ( server_request_call_event.operation_call.start_server_batch([ - cygrpc.operation_receive_close_on_server( - _common.EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( + cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( _common.TRAILING_METADATA, cygrpc.StatusCode.ok, b'test details', _common.EMPTY_FLAGS), ], server_complete_rpc_tag)) diff --git a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py index d0166a2b29..b429a20ed7 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_no_messages_single_server_completion_queue_test.py @@ -44,17 +44,14 @@ class Test(_common.RpcTest, unittest.TestCase): with self.client_condition: client_receive_initial_metadata_start_batch_result = ( client_call.start_client_batch([ - cygrpc.operation_receive_initial_metadata( - _common.EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_common.EMPTY_FLAGS), ], client_receive_initial_metadata_tag)) client_complete_rpc_start_batch_result = client_call.start_client_batch( [ - cygrpc.operation_send_initial_metadata( + cygrpc.SendInitialMetadataOperation( _common.INVOCATION_METADATA, _common.EMPTY_FLAGS), - cygrpc.operation_send_close_from_client( - _common.EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client( - _common.EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_common.EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_common.EMPTY_FLAGS), ], client_complete_rpc_tag) self.client_driver.add_due({ client_receive_initial_metadata_tag, @@ -67,7 +64,7 @@ class Test(_common.RpcTest, unittest.TestCase): with self.server_condition: server_send_initial_metadata_start_batch_result = ( server_request_call_event.operation_call.start_server_batch([ - cygrpc.operation_send_initial_metadata( + cygrpc.SendInitialMetadataOperation( _common.INITIAL_METADATA, _common.EMPTY_FLAGS), ], server_send_initial_metadata_tag)) self.server_driver.add_due({ @@ -79,11 +76,10 @@ class Test(_common.RpcTest, unittest.TestCase): with self.server_condition: server_complete_rpc_start_batch_result = ( server_request_call_event.operation_call.start_server_batch([ - cygrpc.operation_receive_close_on_server( - _common.EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( + cygrpc.ReceiveCloseOnServerOperation(_common.EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( _common.TRAILING_METADATA, cygrpc.StatusCode.ok, - b'test details', _common.EMPTY_FLAGS), + 'test details', _common.EMPTY_FLAGS), ], server_complete_rpc_tag)) self.server_driver.add_due({ server_complete_rpc_tag, diff --git a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py index c7d19058da..87d0dd7a85 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/_read_some_but_not_all_responses_test.py @@ -159,15 +159,15 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): with client_condition: client_receive_initial_metadata_start_batch_result = ( client_call.start_client_batch([ - cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), ], client_receive_initial_metadata_tag)) client_due.add(client_receive_initial_metadata_tag) client_complete_rpc_start_batch_result = ( client_call.start_client_batch([ - cygrpc.operation_send_initial_metadata(_EMPTY_METADATA, - _EMPTY_FLAGS), - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS), + cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA, + _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), ], client_complete_rpc_tag)) client_due.add(client_complete_rpc_tag) @@ -176,12 +176,12 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): with server_call_condition: server_send_initial_metadata_start_batch_result = ( server_rpc_event.operation_call.start_server_batch([ - cygrpc.operation_send_initial_metadata(_EMPTY_METADATA, - _EMPTY_FLAGS), + cygrpc.SendInitialMetadataOperation(_EMPTY_METADATA, + _EMPTY_FLAGS), ], server_send_initial_metadata_tag)) server_send_first_message_start_batch_result = ( server_rpc_event.operation_call.start_server_batch([ - cygrpc.operation_send_message(b'\x07', _EMPTY_FLAGS), + cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS), ], server_send_first_message_tag)) server_send_initial_metadata_event = server_call_driver.event_with_tag( server_send_initial_metadata_tag) @@ -190,12 +190,12 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): with server_call_condition: server_send_second_message_start_batch_result = ( server_rpc_event.operation_call.start_server_batch([ - cygrpc.operation_send_message(b'\x07', _EMPTY_FLAGS), + cygrpc.SendMessageOperation(b'\x07', _EMPTY_FLAGS), ], server_send_second_message_tag)) server_complete_rpc_start_batch_result = ( server_rpc_event.operation_call.start_server_batch([ - cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( + cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( (), cygrpc.StatusCode.ok, b'test details', _EMPTY_FLAGS), ], server_complete_rpc_tag)) @@ -209,7 +209,7 @@ class ReadSomeButNotAllResponsesTest(unittest.TestCase): client_receive_first_message_tag = 'client_receive_first_message_tag' client_receive_first_message_start_batch_result = ( client_call.start_client_batch([ - cygrpc.operation_receive_message(_EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), ], client_receive_first_message_tag)) client_due.add(client_receive_first_message_tag) client_receive_first_message_event = client_driver.event_with_tag( diff --git a/src/python/grpcio_tests/tests/unit/_cython/_server_test.py b/src/python/grpcio_tests/tests/unit/_cython/_server_test.py new file mode 100644 index 0000000000..12bf40be6b --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_cython/_server_test.py @@ -0,0 +1,49 @@ +# Copyright 2017 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test servers at the level of the Cython API.""" + +import threading +import time +import unittest + +from grpc._cython import cygrpc + + +class Test(unittest.TestCase): + + def test_lonely_server(self): + server_call_completion_queue = cygrpc.CompletionQueue() + server_shutdown_completion_queue = cygrpc.CompletionQueue() + server = cygrpc.Server(cygrpc.ChannelArgs([])) + server.register_completion_queue(server_call_completion_queue) + server.register_completion_queue(server_shutdown_completion_queue) + port = server.add_http2_port(b'[::]:0') + server.start() + + server_request_call_tag = 'server_request_call_tag' + server_request_call_start_batch_result = server.request_call( + server_call_completion_queue, server_call_completion_queue, + server_request_call_tag) + + time.sleep(4) + + server_shutdown_tag = 'server_shutdown_tag' + server_shutdown_result = server.shutdown( + server_shutdown_completion_queue, server_shutdown_tag) + server_request_call_event = server_call_completion_queue.poll() + server_shutdown_event = server_shutdown_completion_queue.poll() + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py index 33a35ae235..e34892c779 100644 --- a/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py +++ b/src/python/grpcio_tests/tests/unit/_cython/cygrpc_test.py @@ -35,11 +35,6 @@ def _metadata_plugin(context, callback): class TypeSmokeTest(unittest.TestCase): - def testOperationFlags(self): - operation = cygrpc.operation_send_message(b'asdf', - cygrpc.WriteFlag.no_compress) - self.assertEqual(cygrpc.WriteFlag.no_compress, operation.flags) - def testTimespec(self): now = time.time() now_timespec_a = cygrpc.Timespec(now) @@ -174,7 +169,7 @@ class ServerClientMixin(object): SERVER_TRAILING_METADATA_KEY = 'california_is_in_a_drought' SERVER_TRAILING_METADATA_VALUE = 'zomg it is' SERVER_STATUS_CODE = cygrpc.StatusCode.ok - SERVER_STATUS_DETAILS = b'our work is never over' + SERVER_STATUS_DETAILS = 'our work is never over' REQUEST = b'in death a member of project mayhem has a name' RESPONSE = b'his name is robert paulson' METHOD = b'twinkies' @@ -196,13 +191,13 @@ class ServerClientMixin(object): (CLIENT_METADATA_ASCII_KEY, CLIENT_METADATA_ASCII_VALUE,), (CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE,),) client_start_batch_result = client_call.start_client_batch([ - cygrpc.operation_send_initial_metadata(client_initial_metadata, - _EMPTY_FLAGS), - cygrpc.operation_send_message(REQUEST, _EMPTY_FLAGS), - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), - cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS) + cygrpc.SendInitialMetadataOperation(client_initial_metadata, + _EMPTY_FLAGS), + cygrpc.SendMessageOperation(REQUEST, _EMPTY_FLAGS), + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS), ], client_call_tag) self.assertEqual(cygrpc.CallError.ok, client_start_batch_result) client_event_future = test_utilities.CompletionQueuePollFuture( @@ -231,12 +226,12 @@ class ServerClientMixin(object): server_trailing_metadata = ( (SERVER_TRAILING_METADATA_KEY, SERVER_TRAILING_METADATA_VALUE,),) server_start_batch_result = server_call.start_server_batch([ - cygrpc.operation_send_initial_metadata( + cygrpc.SendInitialMetadataOperation( server_initial_metadata, - _EMPTY_FLAGS), cygrpc.operation_receive_message(_EMPTY_FLAGS), - cygrpc.operation_send_message(RESPONSE, _EMPTY_FLAGS), - cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( + _EMPTY_FLAGS), cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), + cygrpc.SendMessageOperation(RESPONSE, _EMPTY_FLAGS), + cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( server_trailing_metadata, SERVER_STATUS_CODE, SERVER_STATUS_DETAILS, _EMPTY_FLAGS) ], server_call_tag) @@ -249,25 +244,24 @@ class ServerClientMixin(object): found_client_op_types = set() for client_result in client_event.batch_operations: # we expect each op type to be unique - self.assertNotIn(client_result.type, found_client_op_types) - found_client_op_types.add(client_result.type) - if client_result.type == cygrpc.OperationType.receive_initial_metadata: + self.assertNotIn(client_result.type(), found_client_op_types) + found_client_op_types.add(client_result.type()) + if client_result.type( + ) == cygrpc.OperationType.receive_initial_metadata: self.assertTrue( test_common.metadata_transmitted( server_initial_metadata, - client_result.received_metadata)) - elif client_result.type == cygrpc.OperationType.receive_message: - self.assertEqual(RESPONSE, - client_result.received_message.bytes()) - elif client_result.type == cygrpc.OperationType.receive_status_on_client: + client_result.initial_metadata())) + elif client_result.type() == cygrpc.OperationType.receive_message: + self.assertEqual(RESPONSE, client_result.message()) + elif client_result.type( + ) == cygrpc.OperationType.receive_status_on_client: self.assertTrue( test_common.metadata_transmitted( server_trailing_metadata, - client_result.received_metadata)) - self.assertEqual(SERVER_STATUS_DETAILS, - client_result.received_status_details) - self.assertEqual(SERVER_STATUS_CODE, - client_result.received_status_code) + client_result.trailing_metadata())) + self.assertEqual(SERVER_STATUS_DETAILS, client_result.details()) + self.assertEqual(SERVER_STATUS_CODE, client_result.code()) self.assertEqual( set([ cygrpc.OperationType.send_initial_metadata, @@ -281,13 +275,13 @@ class ServerClientMixin(object): self.assertEqual(5, len(server_event.batch_operations)) found_server_op_types = set() for server_result in server_event.batch_operations: - self.assertNotIn(client_result.type, found_server_op_types) - found_server_op_types.add(server_result.type) - if server_result.type == cygrpc.OperationType.receive_message: - self.assertEqual(REQUEST, - server_result.received_message.bytes()) - elif server_result.type == cygrpc.OperationType.receive_close_on_server: - self.assertFalse(server_result.received_cancelled) + self.assertNotIn(client_result.type(), found_server_op_types) + found_server_op_types.add(server_result.type()) + if server_result.type() == cygrpc.OperationType.receive_message: + self.assertEqual(REQUEST, server_result.message()) + elif server_result.type( + ) == cygrpc.OperationType.receive_close_on_server: + self.assertFalse(server_result.cancelled()) self.assertEqual( set([ cygrpc.OperationType.send_initial_metadata, @@ -323,9 +317,8 @@ class ServerClientMixin(object): cygrpc_deadline, description) client_event_future = perform_client_operations([ - cygrpc.operation_send_initial_metadata(empty_metadata, - _EMPTY_FLAGS), - cygrpc.operation_receive_initial_metadata(_EMPTY_FLAGS), + cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS), + cygrpc.ReceiveInitialMetadataOperation(_EMPTY_FLAGS), ], "Client prologue") request_event = self.server_completion_queue.poll(cygrpc_deadline) @@ -337,8 +330,7 @@ class ServerClientMixin(object): cygrpc_deadline, description) server_event_future = perform_server_operations([ - cygrpc.operation_send_initial_metadata(empty_metadata, - _EMPTY_FLAGS), + cygrpc.SendInitialMetadataOperation(empty_metadata, _EMPTY_FLAGS), ], "Server prologue") client_event_future.result() # force completion @@ -347,12 +339,12 @@ class ServerClientMixin(object): # Messaging for _ in range(10): client_event_future = perform_client_operations([ - cygrpc.operation_send_message(b'', _EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), + cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), ], "Client message") server_event_future = perform_server_operations([ - cygrpc.operation_send_message(b'', _EMPTY_FLAGS), - cygrpc.operation_receive_message(_EMPTY_FLAGS), + cygrpc.SendMessageOperation(b'', _EMPTY_FLAGS), + cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS), ], "Server receive") client_event_future.result() # force completion @@ -360,13 +352,13 @@ class ServerClientMixin(object): # Epilogue client_event_future = perform_client_operations([ - cygrpc.operation_send_close_from_client(_EMPTY_FLAGS), - cygrpc.operation_receive_status_on_client(_EMPTY_FLAGS) + cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS), + cygrpc.ReceiveStatusOnClientOperation(_EMPTY_FLAGS) ], "Client epilogue") server_event_future = perform_server_operations([ - cygrpc.operation_receive_close_on_server(_EMPTY_FLAGS), - cygrpc.operation_send_status_from_server( + cygrpc.ReceiveCloseOnServerOperation(_EMPTY_FLAGS), + cygrpc.SendStatusFromServerOperation( empty_metadata, cygrpc.StatusCode.ok, b'', _EMPTY_FLAGS) ], "Server epilogue") |