aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/python/grpcio/grpc/_links/service.py43
-rw-r--r--src/python/grpcio_test/grpc_test/framework/interfaces/links/test_cases.py9
2 files changed, 32 insertions, 20 deletions
diff --git a/src/python/grpcio/grpc/_links/service.py b/src/python/grpcio/grpc/_links/service.py
index 7783e91824..5c636d61ab 100644
--- a/src/python/grpcio/grpc/_links/service.py
+++ b/src/python/grpcio/grpc/_links/service.py
@@ -44,7 +44,10 @@ from grpc.framework.interfaces.links import links
@enum.unique
class _Read(enum.Enum):
READING = 'reading'
- AWAITING_ALLOWANCE = 'awaiting allowance'
+ # TODO(issue 2916): This state will again be necessary after eliminating the
+ # "early_read" field of _RPCState and going back to only reading when granted
+ # allowance to read.
+ # AWAITING_ALLOWANCE = 'awaiting allowance'
CLOSED = 'closed'
@@ -67,12 +70,15 @@ class _RPCState(object):
def __init__(
self, request_deserializer, response_serializer, sequence_number, read,
- allowance, high_write, low_write, premetadataed, terminal_metadata, code,
- message):
+ early_read, allowance, high_write, low_write, premetadataed,
+ terminal_metadata, code, message):
self.request_deserializer = request_deserializer
self.response_serializer = response_serializer
self.sequence_number = sequence_number
self.read = read
+ # TODO(issue 2916): Eliminate this by eliminating the necessity of calling
+ # call.read just to advance the RPC.
+ self.early_read = early_read # A raw (not deserialized) read.
self.allowance = allowance
self.high_write = high_write
self.low_write = low_write
@@ -120,7 +126,7 @@ class _Kernel(object):
call.read(call)
self._rpc_states[call] = _RPCState(
- request_deserializer, response_serializer, 1, _Read.READING, 0,
+ request_deserializer, response_serializer, 1, _Read.READING, None, 1,
_HighWrite.OPEN, _LowWrite.OPEN, False, None, None, None)
ticket = links.Ticket(
call, 0, group, method, links.Ticket.Subscription.FULL,
@@ -140,12 +146,15 @@ class _Kernel(object):
termination = links.Ticket.Termination.COMPLETION
else:
if 0 < rpc_state.allowance:
+ payload = rpc_state.request_deserializer(event.bytes)
+ termination = None
rpc_state.allowance -= 1
call.read(call)
else:
- rpc_state.read = _Read.AWAITING_ALLOWANCE
- payload = rpc_state.request_deserializer(event.bytes)
- termination = None
+ rpc_state.early_read = event.bytes
+ return
+ # TODO(issue 2916): Instead of returning:
+ # rpc_state.read = _Read.AWAITING_ALLOWANCE
ticket = links.Ticket(
call, rpc_state.sequence_number, None, None, None, None, None, None,
payload, None, None, None, termination)
@@ -237,12 +246,22 @@ class _Kernel(object):
rpc_state.premetadataed = True
if ticket.allowance is not None:
- if rpc_state.read is _Read.AWAITING_ALLOWANCE:
- rpc_state.allowance += ticket.allowance - 1
- call.read(call)
- rpc_state.read = _Read.READING
- else:
+ if rpc_state.early_read is None:
rpc_state.allowance += ticket.allowance
+ else:
+ payload = rpc_state.request_deserializer(rpc_state.early_read)
+ rpc_state.allowance += ticket.allowance - 1
+ rpc_state.early_read = None
+ if rpc_state.read is _Read.READING:
+ call.read(call)
+ termination = None
+ else:
+ termination = links.Ticket.Termination.COMPLETION
+ ticket = links.Ticket(
+ call, rpc_state.sequence_number, None, None, None, None, None,
+ None, payload, None, None, None, termination)
+ rpc_state.sequence_number += 1
+ self._relay.add_value(ticket)
if ticket.payload is not None:
call.write(rpc_state.response_serializer(ticket.payload), call)
diff --git a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_cases.py b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_cases.py
index 26ca035c44..1e575d1a9e 100644
--- a/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_cases.py
+++ b/src/python/grpcio_test/grpc_test/framework/interfaces/links/test_cases.py
@@ -303,16 +303,9 @@ class TransmissionTest(object):
invocation_message, links.Ticket.Termination.COMPLETION)
self._invocation_link.accept_ticket(original_invocation_ticket)
- # TODO(nathaniel): This shouldn't be necessary. Detecting the end of the
- # invocation-side ticket sequence shouldn't require granting allowance for
- # another payload.
self._service_mate.block_until_tickets_satisfy(
at_least_n_payloads_received_predicate(1))
service_operation_id = self._service_mate.tickets()[0].operation_id
- self._service_link.accept_ticket(
- links.Ticket(
- service_operation_id, 0, None, None, links.Ticket.Subscription.FULL,
- None, 1, None, None, None, None, None, None))
self._service_mate.block_until_tickets_satisfy(terminated)
self._assert_is_valid_invocation_sequence(
@@ -321,7 +314,7 @@ class TransmissionTest(object):
invocation_terminal_metadata, links.Ticket.Termination.COMPLETION)
original_service_ticket = links.Ticket(
- service_operation_id, 1, None, None, links.Ticket.Subscription.FULL,
+ service_operation_id, 0, None, None, links.Ticket.Subscription.FULL,
timeout, 0, service_initial_metadata, service_payload,
service_terminal_metadata, service_code, service_message,
links.Ticket.Termination.COMPLETION)