diff options
Diffstat (limited to 'src/python/grpcio/grpc/_adapter/fore.py')
-rw-r--r-- | src/python/grpcio/grpc/_adapter/fore.py | 363 |
1 files changed, 0 insertions, 363 deletions
diff --git a/src/python/grpcio/grpc/_adapter/fore.py b/src/python/grpcio/grpc/_adapter/fore.py deleted file mode 100644 index acdd69c420..0000000000 --- a/src/python/grpcio/grpc/_adapter/fore.py +++ /dev/null @@ -1,363 +0,0 @@ -# Copyright 2015, Google Inc. -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are -# met: -# -# * Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# * Redistributions in binary form must reproduce the above -# copyright notice, this list of conditions and the following disclaimer -# in the documentation and/or other materials provided with the -# distribution. -# * Neither the name of Google Inc. nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -"""The RPC-service-side bridge between RPC Framework and GRPC-on-the-wire.""" - -import enum -import logging -import threading -import time - -from grpc._adapter import _common -from grpc._adapter import _intermediary_low as _low -from grpc.framework.base import interfaces as base_interfaces -from grpc.framework.base import null -from grpc.framework.foundation import activated -from grpc.framework.foundation import logging_pool - -_THREAD_POOL_SIZE = 10 - - -@enum.unique -class _LowWrite(enum.Enum): - """The possible categories of low-level write state.""" - - OPEN = 'OPEN' - ACTIVE = 'ACTIVE' - CLOSED = 'CLOSED' - - -def _write(call, rpc_state, payload): - serialized_payload = rpc_state.serializer(payload) - if rpc_state.write.low is _LowWrite.OPEN: - call.write(serialized_payload, call, 0) - rpc_state.write.low = _LowWrite.ACTIVE - else: - rpc_state.write.pending.append(serialized_payload) - - -def _status(call, rpc_state): - call.status(_low.Status(_low.Code.OK, ''), call) - rpc_state.write.low = _LowWrite.CLOSED - - -class ForeLink(base_interfaces.ForeLink, activated.Activated): - """A service-side bridge between RPC Framework and the C-ish _low code.""" - - def __init__( - self, pool, request_deserializers, response_serializers, - root_certificates, key_chain_pairs, port=None): - """Constructor. - - Args: - pool: A thread pool. - request_deserializers: A dict from RPC method names to request object - deserializer behaviors. - response_serializers: A dict from RPC method names to response object - serializer behaviors. - root_certificates: The PEM-encoded client root certificates as a - bytestring or None. - key_chain_pairs: A sequence of PEM-encoded private key-certificate chain - pairs. - port: The port on which to serve, or None to have a port selected - automatically. - """ - self._condition = threading.Condition() - self._pool = pool - self._request_deserializers = request_deserializers - self._response_serializers = response_serializers - self._root_certificates = root_certificates - self._key_chain_pairs = key_chain_pairs - self._requested_port = port - - self._rear_link = null.NULL_REAR_LINK - self._completion_queue = None - self._server = None - self._rpc_states = {} - self._spinning = False - self._port = None - - def _on_stop_event(self): - self._spinning = False - self._condition.notify_all() - - def _on_service_acceptance_event(self, event, server): - """Handle a service invocation event.""" - service_acceptance = event.service_acceptance - if service_acceptance is None: - return - - call = service_acceptance.call - call.accept(self._completion_queue, call) - # TODO(nathaniel): Metadata support. - call.premetadata() - call.read(call) - method = service_acceptance.method - - self._rpc_states[call] = _common.CommonRPCState( - _common.WriteState(_LowWrite.OPEN, _common.HighWrite.OPEN, []), 1, - self._request_deserializers[method], - self._response_serializers[method]) - - ticket = base_interfaces.FrontToBackTicket( - call, 0, base_interfaces.FrontToBackTicket.Kind.COMMENCEMENT, method, - base_interfaces.ServicedSubscription.Kind.FULL, None, None, - service_acceptance.deadline - time.time()) - self._rear_link.accept_front_to_back_ticket(ticket) - - server.service(None) - - def _on_read_event(self, event): - """Handle data arriving during an RPC.""" - call = event.tag - rpc_state = self._rpc_states.get(call, None) - if rpc_state is None: - return - - sequence_number = rpc_state.sequence_number - rpc_state.sequence_number += 1 - if event.bytes is None: - ticket = base_interfaces.FrontToBackTicket( - call, sequence_number, - base_interfaces.FrontToBackTicket.Kind.COMPLETION, None, None, None, - None, None) - else: - call.read(call) - ticket = base_interfaces.FrontToBackTicket( - call, sequence_number, - base_interfaces.FrontToBackTicket.Kind.CONTINUATION, None, None, - None, rpc_state.deserializer(event.bytes), None) - - self._rear_link.accept_front_to_back_ticket(ticket) - - def _on_write_event(self, event): - call = event.tag - rpc_state = self._rpc_states.get(call, None) - if rpc_state is None: - return - - if rpc_state.write.pending: - serialized_payload = rpc_state.write.pending.pop(0) - call.write(serialized_payload, call, 0) - elif rpc_state.write.high is _common.HighWrite.CLOSED: - _status(call, rpc_state) - else: - rpc_state.write.low = _LowWrite.OPEN - - def _on_complete_event(self, event): - if not event.complete_accepted: - logging.error('Complete not accepted! %s', (event,)) - call = event.tag - rpc_state = self._rpc_states.pop(call, None) - if rpc_state is None: - return - - sequence_number = rpc_state.sequence_number - rpc_state.sequence_number += 1 - ticket = base_interfaces.FrontToBackTicket( - call, sequence_number, - base_interfaces.FrontToBackTicket.Kind.TRANSMISSION_FAILURE, None, - None, None, None, None) - self._rear_link.accept_front_to_back_ticket(ticket) - - def _on_finish_event(self, event): - """Handle termination of an RPC.""" - call = event.tag - rpc_state = self._rpc_states.pop(call, None) - if rpc_state is None: - return - - code = event.status.code - if code is _low.Code.OK: - return - - sequence_number = rpc_state.sequence_number - rpc_state.sequence_number += 1 - if code is _low.Code.CANCELLED: - ticket = base_interfaces.FrontToBackTicket( - call, sequence_number, - base_interfaces.FrontToBackTicket.Kind.CANCELLATION, None, None, - None, None, None) - elif code is _low.Code.DEADLINE_EXCEEDED: - ticket = base_interfaces.FrontToBackTicket( - call, sequence_number, - base_interfaces.FrontToBackTicket.Kind.EXPIRATION, None, None, None, - None, None) - else: - # TODO(nathaniel): Better mapping of codes to ticket-categories - ticket = base_interfaces.FrontToBackTicket( - call, sequence_number, - base_interfaces.FrontToBackTicket.Kind.TRANSMISSION_FAILURE, None, - None, None, None, None) - self._rear_link.accept_front_to_back_ticket(ticket) - - def _spin(self, completion_queue, server): - while True: - event = completion_queue.get(None) - - with self._condition: - if event.kind is _low.Event.Kind.STOP: - self._on_stop_event() - return - elif self._server is None: - continue - elif event.kind is _low.Event.Kind.SERVICE_ACCEPTED: - self._on_service_acceptance_event(event, server) - elif event.kind is _low.Event.Kind.READ_ACCEPTED: - self._on_read_event(event) - elif event.kind is _low.Event.Kind.WRITE_ACCEPTED: - self._on_write_event(event) - elif event.kind is _low.Event.Kind.COMPLETE_ACCEPTED: - self._on_complete_event(event) - elif event.kind is _low.Event.Kind.FINISH: - self._on_finish_event(event) - else: - logging.error('Illegal event! %s', (event,)) - - def _continue(self, call, payload): - rpc_state = self._rpc_states.get(call, None) - if rpc_state is None: - return - - _write(call, rpc_state, payload) - - def _complete(self, call, payload): - """Handle completion of the writes of an RPC.""" - rpc_state = self._rpc_states.get(call, None) - if rpc_state is None: - return - - if rpc_state.write.low is _LowWrite.OPEN: - if payload is None: - _status(call, rpc_state) - else: - _write(call, rpc_state, payload) - elif rpc_state.write.low is _LowWrite.ACTIVE: - if payload is not None: - rpc_state.write.pending.append(rpc_state.serializer(payload)) - else: - raise ValueError('Called to complete after having already completed!') - rpc_state.write.high = _common.HighWrite.CLOSED - - def _cancel(self, call): - call.cancel() - self._rpc_states.pop(call, None) - - def join_rear_link(self, rear_link): - """See base_interfaces.ForeLink.join_rear_link for specification.""" - self._rear_link = null.NULL_REAR_LINK if rear_link is None else rear_link - - def _start(self): - """Starts this ForeLink. - - This method must be called before attempting to exchange tickets with this - object. - """ - with self._condition: - address = '[::]:%d' % ( - 0 if self._requested_port is None else self._requested_port) - self._completion_queue = _low.CompletionQueue() - if self._root_certificates is None and not self._key_chain_pairs: - self._server = _low.Server(self._completion_queue) - self._port = self._server.add_http2_addr(address) - else: - server_credentials = _low.ServerCredentials( - self._root_certificates, self._key_chain_pairs, False) - self._server = _low.Server(self._completion_queue) - self._port = self._server.add_secure_http2_addr( - address, server_credentials) - self._server.start() - - self._server.service(None) - - self._pool.submit(self._spin, self._completion_queue, self._server) - self._spinning = True - - return self - - # TODO(nathaniel): Expose graceful-shutdown semantics in which this object - # enters a state in which it finishes ongoing RPCs but refuses new ones. - def _stop(self): - """Stops this ForeLink. - - This method must be called for proper termination of this object, and no - attempts to exchange tickets with this object may be made after this method - has been called. - """ - with self._condition: - self._server.stop() - # TODO(nathaniel): Yep, this is weird. Deleting a server shouldn't have a - # behaviorally significant side-effect. - self._server = None - self._completion_queue.stop() - - while self._spinning: - self._condition.wait() - - self._port = None - - def __enter__(self): - """See activated.Activated.__enter__ for specification.""" - return self._start() - - def __exit__(self, exc_type, exc_val, exc_tb): - """See activated.Activated.__exit__ for specification.""" - self._stop() - return False - - def start(self): - """See activated.Activated.start for specification.""" - return self._start() - - def stop(self): - """See activated.Activated.stop for specification.""" - self._stop() - - def port(self): - """Identifies the port on which this ForeLink is servicing RPCs. - - Returns: - The number of the port on which this ForeLink is servicing RPCs, or None - if this ForeLink is not currently activated and servicing RPCs. - """ - with self._condition: - return self._port - - def accept_back_to_front_ticket(self, ticket): - """See base_interfaces.ForeLink.accept_back_to_front_ticket for spec.""" - with self._condition: - if self._server is None: - return - - if ticket.kind is base_interfaces.BackToFrontTicket.Kind.CONTINUATION: - self._continue(ticket.operation_id, ticket.payload) - elif ticket.kind is base_interfaces.BackToFrontTicket.Kind.COMPLETION: - self._complete(ticket.operation_id, ticket.payload) - else: - self._cancel(ticket.operation_id) |