diff options
Diffstat (limited to 'src/python')
28 files changed, 2066 insertions, 254 deletions
diff --git a/src/python/interop/interop/__init__.py b/src/python/interop/interop/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/src/python/interop/interop/__init__.py diff --git a/src/python/interop/interop/credentials/README b/src/python/interop/interop/credentials/README new file mode 100755 index 0000000000..cb20dcb49f --- /dev/null +++ b/src/python/interop/interop/credentials/README @@ -0,0 +1 @@ +These are test keys *NOT* to be used in production. diff --git a/src/python/interop/interop/credentials/server1.key b/src/python/interop/interop/credentials/server1.key new file mode 100755 index 0000000000..143a5b8765 --- /dev/null +++ b/src/python/interop/interop/credentials/server1.key @@ -0,0 +1,16 @@ +-----BEGIN PRIVATE KEY----- +MIICdQIBADANBgkqhkiG9w0BAQEFAASCAl8wggJbAgEAAoGBAOHDFScoLCVJpYDD +M4HYtIdV6Ake/sMNaaKdODjDMsux/4tDydlumN+fm+AjPEK5GHhGn1BgzkWF+slf +3BxhrA/8dNsnunstVA7ZBgA/5qQxMfGAq4wHNVX77fBZOgp9VlSMVfyd9N8YwbBY +AckOeUQadTi2X1S6OgJXgQ0m3MWhAgMBAAECgYAn7qGnM2vbjJNBm0VZCkOkTIWm +V10okw7EPJrdL2mkre9NasghNXbE1y5zDshx5Nt3KsazKOxTT8d0Jwh/3KbaN+YY +tTCbKGW0pXDRBhwUHRcuRzScjli8Rih5UOCiZkhefUTcRb6xIhZJuQy71tjaSy0p +dHZRmYyBYO2YEQ8xoQJBAPrJPhMBkzmEYFtyIEqAxQ/o/A6E+E4w8i+KM7nQCK7q +K4JXzyXVAjLfyBZWHGM2uro/fjqPggGD6QH1qXCkI4MCQQDmdKeb2TrKRh5BY1LR +81aJGKcJ2XbcDu6wMZK4oqWbTX2KiYn9GB0woM6nSr/Y6iy1u145YzYxEV/iMwff +DJULAkB8B2MnyzOg0pNFJqBJuH29bKCcHa8gHJzqXhNO5lAlEbMK95p/P2Wi+4Hd +aiEIAF1BF326QJcvYKmwSmrORp85AkAlSNxRJ50OWrfMZnBgzVjDx3xG6KsFQVk2 +ol6VhqL6dFgKUORFUWBvnKSyhjJxurlPEahV6oo6+A+mPhFY8eUvAkAZQyTdupP3 +XEFQKctGz+9+gKkemDp7LBBMEMBXrGTLPhpEfcjv/7KPdnFHYmhYeBTBnuVmTVWe +F98XJ7tIFfJq +-----END PRIVATE KEY----- diff --git a/src/python/interop/interop/credentials/server1.pem b/src/python/interop/interop/credentials/server1.pem new file mode 100755 index 0000000000..8e582e571f --- /dev/null +++ b/src/python/interop/interop/credentials/server1.pem @@ -0,0 +1,16 @@ +-----BEGIN CERTIFICATE----- +MIICmzCCAgSgAwIBAgIBAzANBgkqhkiG9w0BAQUFADBWMQswCQYDVQQGEwJBVTET +MBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50ZXJuZXQgV2lkZ2l0cyBQ +dHkgTHRkMQ8wDQYDVQQDDAZ0ZXN0Y2EwHhcNMTQwNzIyMDYwMDU3WhcNMjQwNzE5 +MDYwMDU3WjBkMQswCQYDVQQGEwJVUzERMA8GA1UECBMISWxsaW5vaXMxEDAOBgNV +BAcTB0NoaWNhZ28xFDASBgNVBAoTC0dvb2dsZSBJbmMuMRowGAYDVQQDFBEqLnRl +c3QuZ29vZ2xlLmNvbTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA4cMVJygs +JUmlgMMzgdi0h1XoCR7+ww1pop04OMMyy7H/i0PJ2W6Y35+b4CM8QrkYeEafUGDO +RYX6yV/cHGGsD/x02ye6ey1UDtkGAD/mpDEx8YCrjAc1Vfvt8Fk6Cn1WVIxV/J30 +3xjBsFgByQ55RBp1OLZfVLo6AleBDSbcxaECAwEAAaNrMGkwCQYDVR0TBAIwADAL +BgNVHQ8EBAMCBeAwTwYDVR0RBEgwRoIQKi50ZXN0Lmdvb2dsZS5mcoIYd2F0ZXJ6 +b29pLnRlc3QuZ29vZ2xlLmJlghIqLnRlc3QueW91dHViZS5jb22HBMCoAQMwDQYJ +KoZIhvcNAQEFBQADgYEAM2Ii0LgTGbJ1j4oqX9bxVcxm+/R5Yf8oi0aZqTJlnLYS +wXcBykxTx181s7WyfJ49WwrYXo78zTDAnf1ma0fPq3e4mpspvyndLh1a+OarHa1e +aT0DIIYk7qeEa1YcVljx2KyLd0r1BBAfrwyGaEPVeJQVYWaOJRU2we/KD4ojf9s= +-----END CERTIFICATE----- diff --git a/src/python/interop/interop/empty_pb2.py b/src/python/interop/interop/empty_pb2.py new file mode 100644 index 0000000000..753341c7da --- /dev/null +++ b/src/python/interop/interop/empty_pb2.py @@ -0,0 +1,60 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: test/cpp/interop/empty.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +from google.protobuf import descriptor_pb2 +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='test/cpp/interop/empty.proto', + package='grpc.testing', + serialized_pb=_b('\n\x1ctest/cpp/interop/empty.proto\x12\x0cgrpc.testing\"\x07\n\x05\x45mpty') +) +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + + + + +_EMPTY = _descriptor.Descriptor( + name='Empty', + full_name='grpc.testing.Empty', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=46, + serialized_end=53, +) + +DESCRIPTOR.message_types_by_name['Empty'] = _EMPTY + +Empty = _reflection.GeneratedProtocolMessageType('Empty', (_message.Message,), dict( + DESCRIPTOR = _EMPTY, + __module__ = 'test.cpp.interop.empty_pb2' + # @@protoc_insertion_point(class_scope:grpc.testing.Empty) + )) +_sym_db.RegisterMessage(Empty) + + +# @@protoc_insertion_point(module_scope) diff --git a/src/python/interop/interop/messages_pb2.py b/src/python/interop/interop/messages_pb2.py new file mode 100644 index 0000000000..79270cdf12 --- /dev/null +++ b/src/python/interop/interop/messages_pb2.py @@ -0,0 +1,444 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: test/cpp/interop/messages.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf.internal import enum_type_wrapper +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +from google.protobuf import descriptor_pb2 +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='test/cpp/interop/messages.proto', + package='grpc.testing', + serialized_pb=_b('\n\x1ftest/cpp/interop/messages.proto\x12\x0cgrpc.testing\"@\n\x07Payload\x12\'\n\x04type\x18\x01 \x01(\x0e\x32\x19.grpc.testing.PayloadType\x12\x0c\n\x04\x62ody\x18\x02 \x01(\x0c\"\xb1\x01\n\rSimpleRequest\x12\x30\n\rresponse_type\x18\x01 \x01(\x0e\x32\x19.grpc.testing.PayloadType\x12\x15\n\rresponse_size\x18\x02 \x01(\x05\x12&\n\x07payload\x18\x03 \x01(\x0b\x32\x15.grpc.testing.Payload\x12\x15\n\rfill_username\x18\x04 \x01(\x08\x12\x18\n\x10\x66ill_oauth_scope\x18\x05 \x01(\x08\"_\n\x0eSimpleResponse\x12&\n\x07payload\x18\x01 \x01(\x0b\x32\x15.grpc.testing.Payload\x12\x10\n\x08username\x18\x02 \x01(\t\x12\x13\n\x0boauth_scope\x18\x03 \x01(\t\"C\n\x19StreamingInputCallRequest\x12&\n\x07payload\x18\x01 \x01(\x0b\x32\x15.grpc.testing.Payload\"=\n\x1aStreamingInputCallResponse\x12\x1f\n\x17\x61ggregated_payload_size\x18\x01 \x01(\x05\"7\n\x12ResponseParameters\x12\x0c\n\x04size\x18\x01 \x01(\x05\x12\x13\n\x0binterval_us\x18\x02 \x01(\x05\"\xb5\x01\n\x1aStreamingOutputCallRequest\x12\x30\n\rresponse_type\x18\x01 \x01(\x0e\x32\x19.grpc.testing.PayloadType\x12=\n\x13response_parameters\x18\x02 \x03(\x0b\x32 .grpc.testing.ResponseParameters\x12&\n\x07payload\x18\x03 \x01(\x0b\x32\x15.grpc.testing.Payload\"E\n\x1bStreamingOutputCallResponse\x12&\n\x07payload\x18\x01 \x01(\x0b\x32\x15.grpc.testing.Payload*?\n\x0bPayloadType\x12\x10\n\x0c\x43OMPRESSABLE\x10\x00\x12\x12\n\x0eUNCOMPRESSABLE\x10\x01\x12\n\n\x06RANDOM\x10\x02') +) +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +_PAYLOADTYPE = _descriptor.EnumDescriptor( + name='PayloadType', + full_name='grpc.testing.PayloadType', + filename=None, + file=DESCRIPTOR, + values=[ + _descriptor.EnumValueDescriptor( + name='COMPRESSABLE', index=0, number=0, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='UNCOMPRESSABLE', index=1, number=1, + options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='RANDOM', index=2, number=2, + options=None, + type=None), + ], + containing_type=None, + options=None, + serialized_start=836, + serialized_end=899, +) +_sym_db.RegisterEnumDescriptor(_PAYLOADTYPE) + +PayloadType = enum_type_wrapper.EnumTypeWrapper(_PAYLOADTYPE) +COMPRESSABLE = 0 +UNCOMPRESSABLE = 1 +RANDOM = 2 + + + +_PAYLOAD = _descriptor.Descriptor( + name='Payload', + full_name='grpc.testing.Payload', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='type', full_name='grpc.testing.Payload.type', index=0, + number=1, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='body', full_name='grpc.testing.Payload.body', index=1, + number=2, type=12, cpp_type=9, label=1, + has_default_value=False, default_value=_b(""), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=49, + serialized_end=113, +) + + +_SIMPLEREQUEST = _descriptor.Descriptor( + name='SimpleRequest', + full_name='grpc.testing.SimpleRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='response_type', full_name='grpc.testing.SimpleRequest.response_type', index=0, + number=1, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='response_size', full_name='grpc.testing.SimpleRequest.response_size', index=1, + number=2, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='payload', full_name='grpc.testing.SimpleRequest.payload', index=2, + number=3, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='fill_username', full_name='grpc.testing.SimpleRequest.fill_username', index=3, + number=4, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='fill_oauth_scope', full_name='grpc.testing.SimpleRequest.fill_oauth_scope', index=4, + number=5, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=116, + serialized_end=293, +) + + +_SIMPLERESPONSE = _descriptor.Descriptor( + name='SimpleResponse', + full_name='grpc.testing.SimpleResponse', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='payload', full_name='grpc.testing.SimpleResponse.payload', index=0, + number=1, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='username', full_name='grpc.testing.SimpleResponse.username', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='oauth_scope', full_name='grpc.testing.SimpleResponse.oauth_scope', index=2, + number=3, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=295, + serialized_end=390, +) + + +_STREAMINGINPUTCALLREQUEST = _descriptor.Descriptor( + name='StreamingInputCallRequest', + full_name='grpc.testing.StreamingInputCallRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='payload', full_name='grpc.testing.StreamingInputCallRequest.payload', index=0, + number=1, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=392, + serialized_end=459, +) + + +_STREAMINGINPUTCALLRESPONSE = _descriptor.Descriptor( + name='StreamingInputCallResponse', + full_name='grpc.testing.StreamingInputCallResponse', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='aggregated_payload_size', full_name='grpc.testing.StreamingInputCallResponse.aggregated_payload_size', index=0, + number=1, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=461, + serialized_end=522, +) + + +_RESPONSEPARAMETERS = _descriptor.Descriptor( + name='ResponseParameters', + full_name='grpc.testing.ResponseParameters', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='size', full_name='grpc.testing.ResponseParameters.size', index=0, + number=1, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='interval_us', full_name='grpc.testing.ResponseParameters.interval_us', index=1, + number=2, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=524, + serialized_end=579, +) + + +_STREAMINGOUTPUTCALLREQUEST = _descriptor.Descriptor( + name='StreamingOutputCallRequest', + full_name='grpc.testing.StreamingOutputCallRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='response_type', full_name='grpc.testing.StreamingOutputCallRequest.response_type', index=0, + number=1, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='response_parameters', full_name='grpc.testing.StreamingOutputCallRequest.response_parameters', index=1, + number=2, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + _descriptor.FieldDescriptor( + name='payload', full_name='grpc.testing.StreamingOutputCallRequest.payload', index=2, + number=3, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=582, + serialized_end=763, +) + + +_STREAMINGOUTPUTCALLRESPONSE = _descriptor.Descriptor( + name='StreamingOutputCallResponse', + full_name='grpc.testing.StreamingOutputCallResponse', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='payload', full_name='grpc.testing.StreamingOutputCallResponse.payload', index=0, + number=1, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + options=None), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + options=None, + is_extendable=False, + extension_ranges=[], + oneofs=[ + ], + serialized_start=765, + serialized_end=834, +) + +_PAYLOAD.fields_by_name['type'].enum_type = _PAYLOADTYPE +_SIMPLEREQUEST.fields_by_name['response_type'].enum_type = _PAYLOADTYPE +_SIMPLEREQUEST.fields_by_name['payload'].message_type = _PAYLOAD +_SIMPLERESPONSE.fields_by_name['payload'].message_type = _PAYLOAD +_STREAMINGINPUTCALLREQUEST.fields_by_name['payload'].message_type = _PAYLOAD +_STREAMINGOUTPUTCALLREQUEST.fields_by_name['response_type'].enum_type = _PAYLOADTYPE +_STREAMINGOUTPUTCALLREQUEST.fields_by_name['response_parameters'].message_type = _RESPONSEPARAMETERS +_STREAMINGOUTPUTCALLREQUEST.fields_by_name['payload'].message_type = _PAYLOAD +_STREAMINGOUTPUTCALLRESPONSE.fields_by_name['payload'].message_type = _PAYLOAD +DESCRIPTOR.message_types_by_name['Payload'] = _PAYLOAD +DESCRIPTOR.message_types_by_name['SimpleRequest'] = _SIMPLEREQUEST +DESCRIPTOR.message_types_by_name['SimpleResponse'] = _SIMPLERESPONSE +DESCRIPTOR.message_types_by_name['StreamingInputCallRequest'] = _STREAMINGINPUTCALLREQUEST +DESCRIPTOR.message_types_by_name['StreamingInputCallResponse'] = _STREAMINGINPUTCALLRESPONSE +DESCRIPTOR.message_types_by_name['ResponseParameters'] = _RESPONSEPARAMETERS +DESCRIPTOR.message_types_by_name['StreamingOutputCallRequest'] = _STREAMINGOUTPUTCALLREQUEST +DESCRIPTOR.message_types_by_name['StreamingOutputCallResponse'] = _STREAMINGOUTPUTCALLRESPONSE +DESCRIPTOR.enum_types_by_name['PayloadType'] = _PAYLOADTYPE + +Payload = _reflection.GeneratedProtocolMessageType('Payload', (_message.Message,), dict( + DESCRIPTOR = _PAYLOAD, + __module__ = 'test.cpp.interop.messages_pb2' + # @@protoc_insertion_point(class_scope:grpc.testing.Payload) + )) +_sym_db.RegisterMessage(Payload) + +SimpleRequest = _reflection.GeneratedProtocolMessageType('SimpleRequest', (_message.Message,), dict( + DESCRIPTOR = _SIMPLEREQUEST, + __module__ = 'test.cpp.interop.messages_pb2' + # @@protoc_insertion_point(class_scope:grpc.testing.SimpleRequest) + )) +_sym_db.RegisterMessage(SimpleRequest) + +SimpleResponse = _reflection.GeneratedProtocolMessageType('SimpleResponse', (_message.Message,), dict( + DESCRIPTOR = _SIMPLERESPONSE, + __module__ = 'test.cpp.interop.messages_pb2' + # @@protoc_insertion_point(class_scope:grpc.testing.SimpleResponse) + )) +_sym_db.RegisterMessage(SimpleResponse) + +StreamingInputCallRequest = _reflection.GeneratedProtocolMessageType('StreamingInputCallRequest', (_message.Message,), dict( + DESCRIPTOR = _STREAMINGINPUTCALLREQUEST, + __module__ = 'test.cpp.interop.messages_pb2' + # @@protoc_insertion_point(class_scope:grpc.testing.StreamingInputCallRequest) + )) +_sym_db.RegisterMessage(StreamingInputCallRequest) + +StreamingInputCallResponse = _reflection.GeneratedProtocolMessageType('StreamingInputCallResponse', (_message.Message,), dict( + DESCRIPTOR = _STREAMINGINPUTCALLRESPONSE, + __module__ = 'test.cpp.interop.messages_pb2' + # @@protoc_insertion_point(class_scope:grpc.testing.StreamingInputCallResponse) + )) +_sym_db.RegisterMessage(StreamingInputCallResponse) + +ResponseParameters = _reflection.GeneratedProtocolMessageType('ResponseParameters', (_message.Message,), dict( + DESCRIPTOR = _RESPONSEPARAMETERS, + __module__ = 'test.cpp.interop.messages_pb2' + # @@protoc_insertion_point(class_scope:grpc.testing.ResponseParameters) + )) +_sym_db.RegisterMessage(ResponseParameters) + +StreamingOutputCallRequest = _reflection.GeneratedProtocolMessageType('StreamingOutputCallRequest', (_message.Message,), dict( + DESCRIPTOR = _STREAMINGOUTPUTCALLREQUEST, + __module__ = 'test.cpp.interop.messages_pb2' + # @@protoc_insertion_point(class_scope:grpc.testing.StreamingOutputCallRequest) + )) +_sym_db.RegisterMessage(StreamingOutputCallRequest) + +StreamingOutputCallResponse = _reflection.GeneratedProtocolMessageType('StreamingOutputCallResponse', (_message.Message,), dict( + DESCRIPTOR = _STREAMINGOUTPUTCALLRESPONSE, + __module__ = 'test.cpp.interop.messages_pb2' + # @@protoc_insertion_point(class_scope:grpc.testing.StreamingOutputCallResponse) + )) +_sym_db.RegisterMessage(StreamingOutputCallResponse) + + +# @@protoc_insertion_point(module_scope) diff --git a/src/python/interop/interop/methods.py b/src/python/interop/interop/methods.py new file mode 100644 index 0000000000..e5ce5902ca --- /dev/null +++ b/src/python/interop/interop/methods.py @@ -0,0 +1,109 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Implementations of interoperability test methods.""" + +from grpc_early_adopter import utilities + +from interop import empty_pb2 +from interop import messages_pb2 + +def _empty_call(request): + return empty_pb2.Empty() + +EMPTY_CALL = utilities.unary_unary_rpc_method( + _empty_call, empty_pb2.Empty.SerializeToString, empty_pb2.Empty.FromString, + empty_pb2.Empty.SerializeToString, empty_pb2.Empty.FromString) + + +def _unary_call(request): + return messages_pb2.SimpleResponse( + payload=messages_pb2.Payload( + type=messages_pb2.COMPRESSABLE, + body=b'\x00' * request.response_size)) + +UNARY_CALL = utilities.unary_unary_rpc_method( + _unary_call, messages_pb2.SimpleRequest.SerializeToString, + messages_pb2.SimpleRequest.FromString, + messages_pb2.SimpleResponse.SerializeToString, + messages_pb2.SimpleResponse.FromString) + + +def _streaming_output_call(request): + for response_parameters in request.response_parameters: + yield messages_pb2.StreamingOutputCallResponse( + payload=messages_pb2.Payload( + type=request.response_type, + body=b'\x00' * response_parameters.size)) + +STREAMING_OUTPUT_CALL = utilities.unary_stream_rpc_method( + _streaming_output_call, + messages_pb2.StreamingOutputCallRequest.SerializeToString, + messages_pb2.StreamingOutputCallRequest.FromString, + messages_pb2.StreamingOutputCallResponse.SerializeToString, + messages_pb2.StreamingOutputCallResponse.FromString) + + +def _streaming_input_call(request_iterator): + aggregate_size = 0 + for request in request_iterator: + if request.payload and request.payload.body: + aggregate_size += len(request.payload.body) + return messages_pb2.StreamingInputCallResponse( + aggregated_payload_size=aggregate_size) + +STREAMING_INPUT_CALL = utilities.stream_unary_rpc_method( + _streaming_input_call, + messages_pb2.StreamingInputCallRequest.SerializeToString, + messages_pb2.StreamingInputCallRequest.FromString, + messages_pb2.StreamingInputCallResponse.SerializeToString, + messages_pb2.StreamingInputCallResponse.FromString) + + +def _full_duplex_call(request_iterator): + for request in request_iterator: + yield messages_pb2.StreamingOutputCallResponse( + payload=messages_pb2.Payload( + type=request.payload.type, + body=b'\x00' * request.response_parameters[0].size)) + +FULL_DUPLEX_CALL = utilities.stream_stream_rpc_method( + _full_duplex_call, + messages_pb2.StreamingOutputCallRequest.SerializeToString, + messages_pb2.StreamingOutputCallRequest.FromString, + messages_pb2.StreamingOutputCallResponse.SerializeToString, + messages_pb2.StreamingOutputCallResponse.FromString) + +# NOTE(nathaniel): Apparently this is the same as the full-duplex call? +HALF_DUPLEX_CALL = utilities.stream_stream_rpc_method( + _full_duplex_call, + messages_pb2.StreamingOutputCallRequest.SerializeToString, + messages_pb2.StreamingOutputCallRequest.FromString, + messages_pb2.StreamingOutputCallResponse.SerializeToString, + messages_pb2.StreamingOutputCallResponse.FromString) diff --git a/src/python/interop/interop/server.py b/src/python/interop/interop/server.py new file mode 100644 index 0000000000..404c87dd0a --- /dev/null +++ b/src/python/interop/interop/server.py @@ -0,0 +1,91 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""The Python implementation of the GRPC interoperability test server.""" + +import argparse +import logging +import pkg_resources +import time + +from grpc_early_adopter import implementations + +from interop import methods + +_ONE_DAY_IN_SECONDS = 60 * 60 * 24 + +_PRIVATE_KEY_RESOURCE_PATH = 'credentials/server1.key' +_CERTIFICATE_CHAIN_RESOURCE_PATH = 'credentials/server1.pem' + +_METHODS = { + '/grpc.testing.TestService/EmptyCall': methods.EMPTY_CALL, + '/grpc.testing.TestService/UnaryCall': methods.UNARY_CALL, + '/grpc.testing.TestService/StreamingOutputCall': + methods.STREAMING_OUTPUT_CALL, + '/grpc.testing.TestService/StreamingInputCall': + methods.STREAMING_INPUT_CALL, + '/grpc.testing.TestService/FullDuplexCall': + methods.FULL_DUPLEX_CALL, + '/grpc.testing.TestService/HalfDuplexCall': + methods.HALF_DUPLEX_CALL, +} + + +def serve(): + parser = argparse.ArgumentParser() + parser.add_argument( + '--port', help='the port on which to serve', type=int) + parser.add_argument( + '--use_tls', help='require a secure connection', dest='use_tls', + action='store_true') + args = parser.parse_args() + + if args.use_tls: + private_key = pkg_resources.resource_string( + __name__, _PRIVATE_KEY_RESOURCE_PATH) + certificate_chain = pkg_resources.resource_string( + __name__, _CERTIFICATE_CHAIN_RESOURCE_PATH) + server = implementations.secure_server( + _METHODS, args.port, private_key, certificate_chain) + else: + server = implementations.insecure_server( + _METHODS, args.port) + + server.start() + logging.info('Server serving.') + try: + while True: + time.sleep(_ONE_DAY_IN_SECONDS) + except BaseException as e: + logging.info('Caught exception "%s"; stopping server...', e) + server.stop() + logging.info('Server stopped; exiting.') + +if __name__ == '__main__': + serve() diff --git a/src/python/interop/interop/test_pb2.py b/src/python/interop/interop/test_pb2.py new file mode 100644 index 0000000000..1241453159 --- /dev/null +++ b/src/python/interop/interop/test_pb2.py @@ -0,0 +1,32 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: test/cpp/interop/test.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +from google.protobuf import descriptor_pb2 +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from test.cpp.interop import empty_pb2 as test_dot_cpp_dot_interop_dot_empty__pb2 +from test.cpp.interop import messages_pb2 as test_dot_cpp_dot_interop_dot_messages__pb2 + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='test/cpp/interop/test.proto', + package='grpc.testing', + serialized_pb=_b('\n\x1btest/cpp/interop/test.proto\x12\x0cgrpc.testing\x1a\x1ctest/cpp/interop/empty.proto\x1a\x1ftest/cpp/interop/messages.proto2\xbb\x04\n\x0bTestService\x12\x35\n\tEmptyCall\x12\x13.grpc.testing.Empty\x1a\x13.grpc.testing.Empty\x12\x46\n\tUnaryCall\x12\x1b.grpc.testing.SimpleRequest\x1a\x1c.grpc.testing.SimpleResponse\x12l\n\x13StreamingOutputCall\x12(.grpc.testing.StreamingOutputCallRequest\x1a).grpc.testing.StreamingOutputCallResponse0\x01\x12i\n\x12StreamingInputCall\x12\'.grpc.testing.StreamingInputCallRequest\x1a(.grpc.testing.StreamingInputCallResponse(\x01\x12i\n\x0e\x46ullDuplexCall\x12(.grpc.testing.StreamingOutputCallRequest\x1a).grpc.testing.StreamingOutputCallResponse(\x01\x30\x01\x12i\n\x0eHalfDuplexCall\x12(.grpc.testing.StreamingOutputCallRequest\x1a).grpc.testing.StreamingOutputCallResponse(\x01\x30\x01') + , + dependencies=[test_dot_cpp_dot_interop_dot_empty__pb2.DESCRIPTOR,test_dot_cpp_dot_interop_dot_messages__pb2.DESCRIPTOR,]) +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + + + + + +# @@protoc_insertion_point(module_scope) diff --git a/src/python/interop/setup.py b/src/python/interop/setup.py new file mode 100644 index 0000000000..4b7709f234 --- /dev/null +++ b/src/python/interop/setup.py @@ -0,0 +1,51 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""A setup module for the GRPC Python interop testing package.""" + +from distutils import core as _core + +_PACKAGES = ( + 'interop', +) + +_PACKAGE_DIRECTORIES = { + 'interop': 'interop', +} + +_PACKAGE_DATA = { + 'interop': ['credentials/server1.key', 'credentials/server1.pem',] +} + +_INSTALL_REQUIRES = ['grpc-2015>=0.0.1'] + +_core.setup( + name='interop', version='0.0.1', packages=_PACKAGES, + package_dir=_PACKAGE_DIRECTORIES, package_data=_PACKAGE_DATA, + install_requires=_INSTALL_REQUIRES) diff --git a/src/python/src/_adapter/_face_test_case.py b/src/python/src/_adapter/_face_test_case.py index 112dcfb928..2c6e6286b5 100644 --- a/src/python/src/_adapter/_face_test_case.py +++ b/src/python/src/_adapter/_face_test_case.py @@ -80,7 +80,7 @@ class FaceTestCase(test_case.FaceTestCase, coverage.BlockingCoverage): fore_link = fore.ForeLink( pool, serialization.request_deserializers, - serialization.response_serializers) + serialization.response_serializers, None, ()) port = fore_link.start() rear_link = rear.RearLink( 'localhost', port, pool, diff --git a/src/python/src/_adapter/_links_test.py b/src/python/src/_adapter/_links_test.py index 8341460a9a..d8bbb27127 100644 --- a/src/python/src/_adapter/_links_test.py +++ b/src/python/src/_adapter/_links_test.py @@ -67,7 +67,7 @@ class RoundTripTest(unittest.TestCase): test_rear_link = _test_links.RearLink(rear_action, None) fore_link = fore.ForeLink( - self.fore_link_pool, {test_method: None}, {test_method: None}) + self.fore_link_pool, {test_method: None}, {test_method: None}, None, ()) fore_link.join_rear_link(test_rear_link) test_rear_link.join_fore_link(fore_link) port = fore_link.start() @@ -120,7 +120,7 @@ class RoundTripTest(unittest.TestCase): fore_link = fore.ForeLink( self.fore_link_pool, {test_method: _IDENTITY}, - {test_method: _IDENTITY}) + {test_method: _IDENTITY}, None, ()) fore_link.join_rear_link(test_rear_link) test_rear_link.join_fore_link(fore_link) port = fore_link.start() @@ -182,7 +182,7 @@ class RoundTripTest(unittest.TestCase): fore_link = fore.ForeLink( self.fore_link_pool, {test_method: scenario.deserialize_request}, - {test_method: scenario.serialize_response}) + {test_method: scenario.serialize_response}, None, ()) fore_link.join_rear_link(test_rear_link) test_rear_link.join_fore_link(fore_link) port = fore_link.start() diff --git a/src/python/src/_adapter/_low.py b/src/python/src/_adapter/_low.py index 6c24087dad..09105eafa0 100644 --- a/src/python/src/_adapter/_low.py +++ b/src/python/src/_adapter/_low.py @@ -52,4 +52,5 @@ Call = _c.Call Channel = _c.Channel CompletionQueue = _c.CompletionQueue Server = _c.Server +ServerCredentials = _c.ServerCredentials # pylint: enable=invalid-name diff --git a/src/python/src/_adapter/_server.c b/src/python/src/_adapter/_server.c index 503be61ab4..2f8cc99e44 100644 --- a/src/python/src/_adapter/_server.c +++ b/src/python/src/_adapter/_server.c @@ -85,6 +85,19 @@ static PyObject *pygrpc_server_add_http2_addr(Server *self, PyObject *args) { return PyInt_FromLong(port); } +static PyObject *pygrpc_server_add_secure_http2_addr(Server *self, + PyObject *args) { + const char *addr; + int port; + PyArg_ParseTuple(args, "s", &addr); + port = grpc_server_add_secure_http2_port(self->c_server, addr); + if (port == 0) { + PyErr_SetString(PyExc_RuntimeError, "Couldn't add port to server!"); + return NULL; + } + return PyInt_FromLong(port); +} + static PyObject *pygrpc_server_start(Server *self) { grpc_server_start(self->c_server); @@ -118,6 +131,8 @@ static PyObject *pygrpc_server_stop(Server *self) { static PyMethodDef methods[] = { {"add_http2_addr", (PyCFunction)pygrpc_server_add_http2_addr, METH_VARARGS, "Add an HTTP2 address."}, + {"add_secure_http2_addr", (PyCFunction)pygrpc_server_add_secure_http2_addr, + METH_VARARGS, "Add a secure HTTP2 address."}, {"start", (PyCFunction)pygrpc_server_start, METH_NOARGS, "Starts the server."}, {"service", (PyCFunction)pygrpc_server_service, METH_VARARGS, diff --git a/src/python/src/_adapter/fore.py b/src/python/src/_adapter/fore.py index 2f102751f2..28aede1fd9 100644 --- a/src/python/src/_adapter/fore.py +++ b/src/python/src/_adapter/fore.py @@ -69,7 +69,8 @@ class ForeLink(ticket_interfaces.ForeLink): """A service-side bridge between RPC Framework and the C-ish _low code.""" def __init__( - self, pool, request_deserializers, response_serializers, port=None): + self, pool, request_deserializers, response_serializers, + root_certificates, key_chain_pairs, port=None): """Constructor. Args: @@ -78,6 +79,10 @@ class ForeLink(ticket_interfaces.ForeLink): deserializer behaviors. response_serializers: A dict from RPC method names to response object serializer behaviors. + root_certificates: The PEM-encoded client root certificates as a + bytestring or None. + key_chain_pairs: A sequence of PEM-encoded private key-certificate chain + pairs. port: The port on which to serve, or None to have a port selected automatically. """ @@ -85,6 +90,8 @@ class ForeLink(ticket_interfaces.ForeLink): self._pool = pool self._request_deserializers = request_deserializers self._response_serializers = response_serializers + self._root_certificates = root_certificates + self._key_chain_pairs = key_chain_pairs self._port = port self._rear_link = null.NULL_REAR_LINK @@ -264,10 +271,16 @@ class ForeLink(ticket_interfaces.ForeLink): object. """ with self._condition: + address = '[::]:%d' % (0 if self._port is None else self._port) self._completion_queue = _low.CompletionQueue() - self._server = _low.Server(self._completion_queue, None) - port = self._server.add_http2_addr( - '[::]:%d' % (0 if self._port is None else self._port)) + if self._root_certificates is None and not self._key_chain_pairs: + self._server = _low.Server(self._completion_queue, None) + port = self._server.add_http2_addr(address) + else: + server_credentials = _low.ServerCredentials( + self._root_certificates, self._key_chain_pairs) + self._server = _low.Server(self._completion_queue, server_credentials) + port = self._server.add_secure_http2_addr(address) self._server.start() self._server.service(None) diff --git a/src/python/src/_framework/base/packets/_ingestion.py b/src/python/src/_framework/base/packets/_ingestion.py index abc1e7a043..91f5a35359 100644 --- a/src/python/src/_framework/base/packets/_ingestion.py +++ b/src/python/src/_framework/base/packets/_ingestion.py @@ -183,7 +183,7 @@ class _WrappedConsumer(object): payload: A customer-significant payload object. May be None only if complete is True. complete: Whether or not the end of the payload sequence has been reached. - May be False only if payload is not None. + Must be True if payload is None. Returns: True if the wrapped consumer made progress or False if the wrapped @@ -191,13 +191,12 @@ class _WrappedConsumer(object): progress. """ try: - if payload: - if complete: - self._consumer.consume_and_terminate(payload) - else: - self._consumer.consume(payload) - else: + if payload is None: self._consumer.terminate() + elif complete: + self._consumer.consume_and_terminate(payload) + else: + self._consumer.consume(payload) return True except abandonment.Abandoned: return False diff --git a/src/python/src/_framework/face/_calls.py b/src/python/src/_framework/face/_calls.py index 9128aef7c4..a7d8be5e43 100644 --- a/src/python/src/_framework/face/_calls.py +++ b/src/python/src/_framework/face/_calls.py @@ -29,6 +29,7 @@ """Utility functions for invoking RPCs.""" +import sys import threading from _framework.base import interfaces as base_interfaces @@ -79,20 +80,46 @@ def _stream_event_subscription(result_consumer, abortion_callback): _EventServicedIngestor(result_consumer, abortion_callback)) +# NOTE(nathaniel): This class has some extremely special semantics around +# cancellation that allow it to be used by both "blocking" APIs and "futures" +# APIs. +# +# Since futures.Future defines its own exception for cancellation, we want these +# objects, when returned by methods of a returning-Futures-from-other-methods +# object, to raise the same exception for cancellation. But that's weird in a +# blocking API - why should this object, also returned by methods of blocking +# APIs, raise exceptions from the "future" module? Should we do something like +# have this class be parameterized by the type of exception that it raises in +# cancellation circumstances? +# +# We don't have to take such a dramatic step: since blocking APIs define no +# cancellation semantics whatsoever, there is no supported way for +# blocking-API-users of these objects to cancel RPCs, and thus no supported way +# for them to see an exception the type of which would be weird to them. +# +# Bonus: in both blocking and futures APIs, this object still properly raises +# exceptions.CancellationError for any *server-side cancellation* of an RPC. class _OperationCancellableIterator(interfaces.CancellableIterator): """An interfaces.CancellableIterator for response-streaming operations.""" def __init__(self, rendezvous, operation): + self._lock = threading.Lock() self._rendezvous = rendezvous self._operation = operation + self._cancelled = False def __iter__(self): return self def next(self): + with self._lock: + if self._cancelled: + raise future.CancelledError() return next(self._rendezvous) def cancel(self): + with self._lock: + self._cancelled = True self._operation.cancel() self._rendezvous.set_outcome(base_interfaces.Outcome.CANCELLED) @@ -105,46 +132,126 @@ class _OperationFuture(future.Future): self._rendezvous = rendezvous self._operation = operation - self._outcome = None + self._cancelled = False + self._computed = False + self._payload = None + self._exception = None + self._traceback = None self._callbacks = [] def cancel(self): """See future.Future.cancel for specification.""" with self._condition: - if self._outcome is None: + if not self._cancelled and not self._computed: self._operation.cancel() - self._outcome = future.aborted() + self._cancelled = True self._condition.notify_all() return False def cancelled(self): """See future.Future.cancelled for specification.""" - return False + with self._condition: + return self._cancelled + + def running(self): + """See future.Future.running for specification.""" + with self._condition: + return not self._cancelled and not self._computed def done(self): """See future.Future.done for specification.""" with self._condition: - return (self._outcome is not None and - self._outcome.category is not future.ABORTED) + return self._cancelled or self._computed + + def result(self, timeout=None): + """See future.Future.result for specification.""" + with self._condition: + if self._cancelled: + raise future.CancelledError() + if self._computed: + if self._payload is None: + raise self._exception # pylint: disable=raising-bad-type + else: + return self._payload + + condition = threading.Condition() + def notify_condition(unused_future): + with condition: + condition.notify() + self._callbacks.append(notify_condition) + + with condition: + condition.wait(timeout=timeout) + + with self._condition: + if self._cancelled: + raise future.CancelledError() + elif self._computed: + if self._payload is None: + raise self._exception # pylint: disable=raising-bad-type + else: + return self._payload + else: + raise future.TimeoutError() + + def exception(self, timeout=None): + """See future.Future.exception for specification.""" + with self._condition: + if self._cancelled: + raise future.CancelledError() + if self._computed: + return self._exception + + condition = threading.Condition() + def notify_condition(unused_future): + with condition: + condition.notify() + self._callbacks.append(notify_condition) + + with condition: + condition.wait(timeout=timeout) - def outcome(self): - """See future.Future.outcome for specification.""" with self._condition: - while self._outcome is None: - self._condition.wait() - return self._outcome + if self._cancelled: + raise future.CancelledError() + elif self._computed: + return self._exception + else: + raise future.TimeoutError() - def add_done_callback(self, callback): + def traceback(self, timeout=None): + """See future.Future.traceback for specification.""" + with self._condition: + if self._cancelled: + raise future.CancelledError() + if self._computed: + return self._traceback + + condition = threading.Condition() + def notify_condition(unused_future): + with condition: + condition.notify() + self._callbacks.append(notify_condition) + + with condition: + condition.wait(timeout=timeout) + + with self._condition: + if self._cancelled: + raise future.CancelledError() + elif self._computed: + return self._traceback + else: + raise future.TimeoutError() + + def add_done_callback(self, fn): """See future.Future.add_done_callback for specification.""" with self._condition: if self._callbacks is not None: - self._callbacks.add(callback) + self._callbacks.add(fn) return - outcome = self._outcome - - callable_util.call_logging_exceptions( - callback, _DONE_CALLBACK_LOG_MESSAGE, outcome) + callable_util.call_logging_exceptions(fn, _DONE_CALLBACK_LOG_MESSAGE, self) def on_operation_termination(self, operation_outcome): """Indicates to this object that the operation has terminated. @@ -154,34 +261,42 @@ class _OperationFuture(future.Future): outcome of the operation. """ with self._condition: - if (self._outcome is None and - operation_outcome is not base_interfaces.Outcome.COMPLETED): - self._outcome = future.raised( - _control.abortion_outcome_to_exception(operation_outcome)) - self._condition.notify_all() - - outcome = self._outcome - rendezvous = self._rendezvous - callbacks = list(self._callbacks) - self._callbacks = None - - if outcome is None: - try: - return_value = next(rendezvous) - except Exception as e: # pylint: disable=broad-except - outcome = future.raised(e) + cancelled = self._cancelled + if cancelled: + callbacks = list(self._callbacks) + self._callbacks = None else: - outcome = future.returned(return_value) + rendezvous = self._rendezvous + + if not cancelled: + payload = None + exception = None + traceback = None + if operation_outcome == base_interfaces.Outcome.COMPLETED: + try: + payload = next(rendezvous) + except Exception as e: # pylint: disable=broad-except + exception = e + traceback = sys.exc_info()[2] + else: + try: + # We raise and then immediately catch in order to create a traceback. + raise _control.abortion_outcome_to_exception(operation_outcome) + except Exception as e: # pylint: disable=broad-except + exception = e + traceback = sys.exc_info()[2] with self._condition: - if self._outcome is None: - self._outcome = outcome - self._condition.notify_all() - else: - outcome = self._outcome + if not self._cancelled: + self._computed = True + self._payload = payload + self._exception = exception + self._traceback = traceback + callbacks = list(self._callbacks) + self._callbacks = None for callback in callbacks: callable_util.call_logging_exceptions( - callback, _DONE_CALLBACK_LOG_MESSAGE, outcome) + callback, _DONE_CALLBACK_LOG_MESSAGE, self) class _Call(interfaces.Call): diff --git a/src/python/src/_framework/face/testing/future_invocation_asynchronous_event_service_test_case.py b/src/python/src/_framework/face/testing/future_invocation_asynchronous_event_service_test_case.py index cf8b2eeb95..939b238b66 100644 --- a/src/python/src/_framework/face/testing/future_invocation_asynchronous_event_service_test_case.py +++ b/src/python/src/_framework/face/testing/future_invocation_asynchronous_event_service_test_case.py @@ -116,7 +116,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( response_future = self.stub.future_value_in_value_out( name, request, _TIMEOUT) - response = response_future.outcome().return_value + response = response_future.result() test_messages.verify(request, response, self) @@ -144,7 +144,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( with request_iterator.pause(): response_future = self.stub.future_stream_in_value_out( name, request_iterator, _TIMEOUT) - response = response_future.outcome().return_value + response = response_future.result() test_messages.verify(requests, response, self) @@ -173,13 +173,13 @@ class FutureInvocationAsynchronousEventServiceTestCase( first_response_future = self.stub.future_value_in_value_out( name, first_request, _TIMEOUT) - first_response = first_response_future.outcome().return_value + first_response = first_response_future.result() test_messages.verify(first_request, first_response, self) second_response_future = self.stub.future_value_in_value_out( name, second_request, _TIMEOUT) - second_response = second_response_future.outcome().return_value + second_response = second_response_future.result() test_messages.verify(second_request, second_response, self) @@ -192,10 +192,10 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.pause(): response_future = self.stub.future_value_in_value_out( name, request, _TIMEOUT) - outcome = response_future.outcome() - - self.assertIsInstance( - outcome.exception, exceptions.ExpirationError) + self.assertIsInstance( + response_future.exception(), exceptions.ExpirationError) + with self.assertRaises(exceptions.ExpirationError): + response_future.result() def testExpiredUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -203,11 +203,11 @@ class FutureInvocationAsynchronousEventServiceTestCase( for test_messages in test_messages_sequence: request = test_messages.request() - with self.control.pause(), self.assertRaises( - exceptions.ExpirationError): + with self.control.pause(): response_iterator = self.stub.inline_value_in_stream_out( name, request, _TIMEOUT) - list(response_iterator) + with self.assertRaises(exceptions.ExpirationError): + list(response_iterator) def testExpiredStreamRequestUnaryResponse(self): for name, test_messages_sequence in ( @@ -218,10 +218,10 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.pause(): response_future = self.stub.future_stream_in_value_out( name, iter(requests), _TIMEOUT) - outcome = response_future.outcome() - - self.assertIsInstance( - outcome.exception, exceptions.ExpirationError) + self.assertIsInstance( + response_future.exception(), exceptions.ExpirationError) + with self.assertRaises(exceptions.ExpirationError): + response_future.result() def testExpiredStreamRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -229,11 +229,11 @@ class FutureInvocationAsynchronousEventServiceTestCase( for test_messages in test_messages_sequence: requests = test_messages.requests() - with self.control.pause(), self.assertRaises( - exceptions.ExpirationError): + with self.control.pause(): response_iterator = self.stub.inline_stream_in_stream_out( name, iter(requests), _TIMEOUT) - list(response_iterator) + with self.assertRaises(exceptions.ExpirationError): + list(response_iterator) def testFailedUnaryRequestUnaryResponse(self): for name, test_messages_sequence in ( @@ -244,13 +244,15 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.fail(): response_future = self.stub.future_value_in_value_out( name, request, _TIMEOUT) - outcome = response_future.outcome() - # Because the servicer fails outside of the thread from which the - # servicer-side runtime called into it its failure is indistinguishable - # from simply not having called its response_callback before the - # expiration of the RPC. - self.assertIsInstance(outcome.exception, exceptions.ExpirationError) + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is + # indistinguishable from simply not having called its + # response_callback before the expiration of the RPC. + self.assertIsInstance( + response_future.exception(), exceptions.ExpirationError) + with self.assertRaises(exceptions.ExpirationError): + response_future.result() def testFailedUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -276,13 +278,15 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.fail(): response_future = self.stub.future_stream_in_value_out( name, iter(requests), _TIMEOUT) - outcome = response_future.outcome() - # Because the servicer fails outside of the thread from which the - # servicer-side runtime called into it its failure is indistinguishable - # from simply not having called its response_callback before the - # expiration of the RPC. - self.assertIsInstance(outcome.exception, exceptions.ExpirationError) + # Because the servicer fails outside of the thread from which the + # servicer-side runtime called into it its failure is + # indistinguishable from simply not having called its + # response_callback before the expiration of the RPC. + self.assertIsInstance( + response_future.exception(), exceptions.ExpirationError) + with self.assertRaises(exceptions.ExpirationError): + response_future.result() def testFailedStreamRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -310,8 +314,8 @@ class FutureInvocationAsynchronousEventServiceTestCase( name, first_request, _TIMEOUT) second_response_future = self.stub.future_value_in_value_out( name, second_request, _TIMEOUT) - first_response = first_response_future.outcome().return_value - second_response = second_response_future.outcome().return_value + first_response = first_response_future.result() + second_response = second_response_future.result() test_messages.verify(first_request, first_response, self) test_messages.verify(second_request, second_response, self) @@ -329,10 +333,10 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.pause(): response_future = self.stub.future_value_in_value_out( name, request, _TIMEOUT) - cancelled = response_future.cancel() + cancel_method_return_value = response_future.cancel() - self.assertFalse(cancelled) - self.assertEqual(future.ABORTED, response_future.outcome().category) + self.assertFalse(cancel_method_return_value) + self.assertTrue(response_future.cancelled()) def testCancelledUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -345,7 +349,7 @@ class FutureInvocationAsynchronousEventServiceTestCase( name, request, _TIMEOUT) response_iterator.cancel() - with self.assertRaises(exceptions.CancellationError): + with self.assertRaises(future.CancelledError): next(response_iterator) def testCancelledStreamRequestUnaryResponse(self): @@ -357,10 +361,10 @@ class FutureInvocationAsynchronousEventServiceTestCase( with self.control.pause(): response_future = self.stub.future_stream_in_value_out( name, iter(requests), _TIMEOUT) - cancelled = response_future.cancel() + cancel_method_return_value = response_future.cancel() - self.assertFalse(cancelled) - self.assertEqual(future.ABORTED, response_future.outcome().category) + self.assertFalse(cancel_method_return_value) + self.assertTrue(response_future.cancelled()) def testCancelledStreamRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -373,5 +377,5 @@ class FutureInvocationAsynchronousEventServiceTestCase( name, iter(requests), _TIMEOUT) response_iterator.cancel() - with self.assertRaises(exceptions.CancellationError): + with self.assertRaises(future.CancelledError): next(response_iterator) diff --git a/src/python/src/_framework/foundation/_later_test.py b/src/python/src/_framework/foundation/_later_test.py index fbd17a4ad9..50b67907db 100644 --- a/src/python/src/_framework/foundation/_later_test.py +++ b/src/python/src/_framework/foundation/_later_test.py @@ -33,7 +33,6 @@ import threading import time import unittest -from _framework.foundation import future from _framework.foundation import later TICK = 0.1 @@ -44,10 +43,14 @@ class LaterTest(unittest.TestCase): def test_simple_delay(self): lock = threading.Lock() cell = [0] - def increment_cell(): + return_value = object() + + def computation(): with lock: cell[0] += 1 - computation_future = later.later(TICK * 2, increment_cell) + return return_value + computation_future = later.later(TICK * 2, computation) + self.assertFalse(computation_future.done()) self.assertFalse(computation_future.cancelled()) time.sleep(TICK) @@ -60,22 +63,21 @@ class LaterTest(unittest.TestCase): self.assertFalse(computation_future.cancelled()) with lock: self.assertEqual(1, cell[0]) - outcome = computation_future.outcome() - self.assertEqual(future.RETURNED, outcome.category) + self.assertEqual(return_value, computation_future.result()) def test_callback(self): lock = threading.Lock() cell = [0] callback_called = [False] - outcome_passed_to_callback = [None] - def increment_cell(): + future_passed_to_callback = [None] + def computation(): with lock: cell[0] += 1 - computation_future = later.later(TICK * 2, increment_cell) + computation_future = later.later(TICK * 2, computation) def callback(outcome): with lock: callback_called[0] = True - outcome_passed_to_callback[0] = outcome + future_passed_to_callback[0] = outcome computation_future.add_done_callback(callback) time.sleep(TICK) with lock: @@ -83,63 +85,67 @@ class LaterTest(unittest.TestCase): time.sleep(TICK * 2) with lock: self.assertTrue(callback_called[0]) - self.assertEqual(future.RETURNED, outcome_passed_to_callback[0].category) + self.assertTrue(future_passed_to_callback[0].done()) callback_called[0] = False - outcome_passed_to_callback[0] = None + future_passed_to_callback[0] = None computation_future.add_done_callback(callback) with lock: self.assertTrue(callback_called[0]) - self.assertEqual(future.RETURNED, outcome_passed_to_callback[0].category) + self.assertTrue(future_passed_to_callback[0].done()) def test_cancel(self): lock = threading.Lock() cell = [0] callback_called = [False] - outcome_passed_to_callback = [None] - def increment_cell(): + future_passed_to_callback = [None] + def computation(): with lock: cell[0] += 1 - computation_future = later.later(TICK * 2, increment_cell) + computation_future = later.later(TICK * 2, computation) def callback(outcome): with lock: callback_called[0] = True - outcome_passed_to_callback[0] = outcome + future_passed_to_callback[0] = outcome computation_future.add_done_callback(callback) time.sleep(TICK) with lock: self.assertFalse(callback_called[0]) computation_future.cancel() self.assertTrue(computation_future.cancelled()) - self.assertFalse(computation_future.done()) - self.assertEqual(future.ABORTED, computation_future.outcome().category) + self.assertFalse(computation_future.running()) + self.assertTrue(computation_future.done()) with lock: self.assertTrue(callback_called[0]) - self.assertEqual(future.ABORTED, outcome_passed_to_callback[0].category) + self.assertTrue(future_passed_to_callback[0].cancelled()) - def test_outcome(self): + def test_result(self): lock = threading.Lock() cell = [0] callback_called = [False] - outcome_passed_to_callback = [None] - def increment_cell(): + future_passed_to_callback_cell = [None] + return_value = object() + + def computation(): with lock: cell[0] += 1 - computation_future = later.later(TICK * 2, increment_cell) - def callback(outcome): + return return_value + computation_future = later.later(TICK * 2, computation) + + def callback(future_passed_to_callback): with lock: callback_called[0] = True - outcome_passed_to_callback[0] = outcome + future_passed_to_callback_cell[0] = future_passed_to_callback computation_future.add_done_callback(callback) - returned_outcome = computation_future.outcome() - self.assertEqual(future.RETURNED, returned_outcome.category) + returned_value = computation_future.result() + self.assertEqual(return_value, returned_value) # The callback may not yet have been called! Sleep a tick. time.sleep(TICK) with lock: self.assertTrue(callback_called[0]) - self.assertEqual(future.RETURNED, outcome_passed_to_callback[0].category) + self.assertEqual(return_value, future_passed_to_callback_cell[0].result()) if __name__ == '__main__': unittest.main() diff --git a/src/python/src/_framework/foundation/_timer_future.py b/src/python/src/_framework/foundation/_timer_future.py index 86bc073d56..4aa66991c5 100644 --- a/src/python/src/_framework/foundation/_timer_future.py +++ b/src/python/src/_framework/foundation/_timer_future.py @@ -29,6 +29,7 @@ """Affords a Future implementation based on Python's threading.Timer.""" +import sys import threading import time @@ -52,7 +53,9 @@ class TimerFuture(future.Future): self._computing = False self._computed = False self._cancelled = False - self._outcome = None + self._result = None + self._exception = None + self._traceback = None self._waiting = [] def _compute(self): @@ -70,19 +73,24 @@ class TimerFuture(future.Future): self._computing = True try: - returned_value = self._computation() - outcome = future.returned(returned_value) + return_value = self._computation() + exception = None + traceback = None except Exception as e: # pylint: disable=broad-except - outcome = future.raised(e) + return_value = None + exception = e + traceback = sys.exc_info()[2] with self._lock: self._computing = False self._computed = True - self._outcome = outcome + self._return_value = return_value + self._exception = exception + self._traceback = traceback waiting = self._waiting for callback in waiting: - callback(outcome) + callback(self) def start(self): """Starts this Future. @@ -104,13 +112,11 @@ class TimerFuture(future.Future): else: self._timer.cancel() self._cancelled = True - self._outcome = future.aborted() - outcome = self._outcome waiting = self._waiting for callback in waiting: try: - callback(outcome) + callback(self) except Exception: # pylint: disable=broad-except pass @@ -121,36 +127,102 @@ class TimerFuture(future.Future): with self._lock: return self._cancelled + def running(self): + """See future.Future.running for specification.""" + with self._lock: + return not self._computed and not self._cancelled + def done(self): """See future.Future.done for specification.""" with self._lock: - return self._computed + return self._computed or self._cancelled + + def result(self, timeout=None): + """See future.Future.result for specification.""" + with self._lock: + if self._cancelled: + raise future.CancelledError() + elif self._computed: + if self._exception is None: + return self._return_value + else: + raise self._exception # pylint: disable=raising-bad-type + + condition = threading.Condition() + def notify_condition(unused_future): + with condition: + condition.notify() + self._waiting.append(notify_condition) + + with condition: + condition.wait(timeout=timeout) + + with self._lock: + if self._cancelled: + raise future.CancelledError() + elif self._computed: + if self._exception is None: + return self._return_value + else: + raise self._exception # pylint: disable=raising-bad-type + else: + raise future.TimeoutError() + + def exception(self, timeout=None): + """See future.Future.exception for specification.""" + with self._lock: + if self._cancelled: + raise future.CancelledError() + elif self._computed: + return self._exception + + condition = threading.Condition() + def notify_condition(unused_future): + with condition: + condition.notify() + self._waiting.append(notify_condition) + + with condition: + condition.wait(timeout=timeout) + + with self._lock: + if self._cancelled: + raise future.CancelledError() + elif self._computed: + return self._exception + else: + raise future.TimeoutError() - def outcome(self): - """See future.Future.outcome for specification.""" + def traceback(self, timeout=None): + """See future.Future.traceback for specification.""" with self._lock: - if self._computed or self._cancelled: - return self._outcome + if self._cancelled: + raise future.CancelledError() + elif self._computed: + return self._traceback condition = threading.Condition() - def notify_condition(unused_outcome): + def notify_condition(unused_future): with condition: condition.notify() self._waiting.append(notify_condition) with condition: - condition.wait() + condition.wait(timeout=timeout) with self._lock: - return self._outcome + if self._cancelled: + raise future.CancelledError() + elif self._computed: + return self._traceback + else: + raise future.TimeoutError() - def add_done_callback(self, callback): + def add_done_callback(self, fn): """See future.Future.add_done_callback for specification.""" with self._lock: if not self._computed and not self._cancelled: - self._waiting.append(callback) + self._waiting.append(fn) return - else: - outcome = self._outcome - callback(outcome) + fn(self) diff --git a/src/python/src/_framework/foundation/callable_util.py b/src/python/src/_framework/foundation/callable_util.py index 1f7546cb76..32b0751a01 100644 --- a/src/python/src/_framework/foundation/callable_util.py +++ b/src/python/src/_framework/foundation/callable_util.py @@ -29,18 +29,47 @@ """Utilities for working with callables.""" +import abc +import collections +import enum import functools import logging -from _framework.foundation import future + +class Outcome(object): + """A sum type describing the outcome of some call. + + Attributes: + kind: One of Kind.RETURNED or Kind.RAISED respectively indicating that the + call returned a value or raised an exception. + return_value: The value returned by the call. Must be present if kind is + Kind.RETURNED. + exception: The exception raised by the call. Must be present if kind is + Kind.RAISED. + """ + __metaclass__ = abc.ABCMeta + + @enum.unique + class Kind(enum.Enum): + """Identifies the general kind of the outcome of some call.""" + + RETURNED = object() + RAISED = object() + + +class _EasyOutcome( + collections.namedtuple( + '_EasyOutcome', ['kind', 'return_value', 'exception']), + Outcome): + """A trivial implementation of Outcome.""" def _call_logging_exceptions(behavior, message, *args, **kwargs): try: - return future.returned(behavior(*args, **kwargs)) + return _EasyOutcome(Outcome.Kind.RETURNED, behavior(*args, **kwargs), None) except Exception as e: # pylint: disable=broad-except logging.exception(message) - return future.raised(e) + return _EasyOutcome(Outcome.Kind.RAISED, None, e) def with_exceptions_logged(behavior, message): @@ -72,7 +101,7 @@ def call_logging_exceptions(behavior, message, *args, **kwargs): **kwargs: Keyword arguments to pass to the given behavior. Returns: - A future.Outcome describing whether the given behavior returned a value or - raised an exception. + An Outcome describing whether the given behavior returned a value or raised + an exception. """ return _call_logging_exceptions(behavior, message, *args, **kwargs) diff --git a/src/python/src/_framework/foundation/future.py b/src/python/src/_framework/foundation/future.py index f00c503257..bfc16fc1ea 100644 --- a/src/python/src/_framework/foundation/future.py +++ b/src/python/src/_framework/foundation/future.py @@ -27,146 +27,210 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -"""The Future interface missing from Python's standard library. +"""A Future interface. -Python's concurrent.futures library defines a Future class very much like the -Future defined here, but since that class is concrete and without construction -semantics it is only available within the concurrent.futures library itself. -The Future class defined here is an entirely abstract interface that anyone may +Python doesn't have a Future interface in its standard library. In the absence +of such a standard, three separate, incompatible implementations +(concurrent.futures.Future, ndb.Future, and asyncio.Future) have appeared. This +interface attempts to be as compatible as possible with +concurrent.futures.Future. From ndb.Future it adopts a traceback-object accessor +method. + +Unlike the concrete and implemented Future classes listed above, the Future +class defined in this module is an entirely abstract interface that anyone may implement and use. + +The one known incompatibility between this interface and the interface of +concurrent.futures.Future is that this interface defines its own CancelledError +and TimeoutError exceptions rather than raising the implementation-private +concurrent.futures._base.CancelledError and the +built-in-but-only-in-3.3-and-later TimeoutError. """ import abc -import collections - -RETURNED = object() -RAISED = object() -ABORTED = object() - -class Outcome(object): - """A sum type describing the outcome of some computation. - - Attributes: - category: One of RETURNED, RAISED, or ABORTED, respectively indicating - that the computation returned a value, raised an exception, or was - aborted. - return_value: The value returned by the computation. Must be present if - category is RETURNED. - exception: The exception raised by the computation. Must be present if - category is RAISED. - """ - __metaclass__ = abc.ABCMeta +class TimeoutError(Exception): + """Indicates that a particular call timed out.""" -class _EasyOutcome( - collections.namedtuple('_EasyOutcome', - ['category', 'return_value', 'exception']), - Outcome): - """A trivial implementation of Outcome.""" -# All Outcomes describing abortion are indistinguishable so there might as well -# be only one. -_ABORTED_OUTCOME = _EasyOutcome(ABORTED, None, None) +class CancelledError(Exception): + """Indicates that the computation underlying a Future was cancelled.""" -def aborted(): - """Returns an Outcome indicating that a computation was aborted. +class Future(object): + """A representation of a computation in another control flow. - Returns: - An Outcome indicating that a computation was aborted. + Computations represented by a Future may be yet to be begun, may be ongoing, + or may have already completed. """ - return _ABORTED_OUTCOME - - -def raised(exception): - """Returns an Outcome indicating that a computation raised an exception. - - Args: - exception: The exception raised by the computation. + __metaclass__ = abc.ABCMeta - Returns: - An Outcome indicating that a computation raised the given exception. - """ - return _EasyOutcome(RAISED, None, exception) + # NOTE(nathaniel): This isn't the return type that I would want to have if it + # were up to me. Were this interface being written from scratch, the return + # type of this method would probably be a sum type like: + # + # NOT_COMMENCED + # COMMENCED_AND_NOT_COMPLETED + # PARTIAL_RESULT<Partial_Result_Type> + # COMPLETED<Result_Type> + # UNCANCELLABLE + # NOT_IMMEDIATELY_DETERMINABLE + @abc.abstractmethod + def cancel(self): + """Attempts to cancel the computation. + This method does not block. -def returned(value): - """Returns an Outcome indicating that a computation returned a value. + Returns: + True if the computation has not yet begun, will not be allowed to take + place, and determination of both was possible without blocking. False + under all other circumstances including but not limited to the + computation's already having begun, the computation's already having + finished, and the computation's having been scheduled for execution on a + remote system for which a determination of whether or not it commenced + before being cancelled cannot be made without blocking. + """ + raise NotImplementedError() - Args: - value: The value returned by the computation. + # NOTE(nathaniel): Here too this isn't the return type that I'd want this + # method to have if it were up to me. I think I'd go with another sum type + # like: + # + # NOT_CANCELLED (this object's cancel method hasn't been called) + # NOT_COMMENCED + # COMMENCED_AND_NOT_COMPLETED + # PARTIAL_RESULT<Partial_Result_Type> + # COMPLETED<Result_Type> + # UNCANCELLABLE + # NOT_IMMEDIATELY_DETERMINABLE + # + # Notice how giving the cancel method the right semantics obviates most + # reasons for this method to exist. + @abc.abstractmethod + def cancelled(self): + """Describes whether the computation was cancelled. - Returns: - An Outcome indicating that a computation returned the given value. - """ - return _EasyOutcome(RETURNED, value, None) + This method does not block. + Returns: + True if the computation was cancelled any time before its result became + immediately available. False under all other circumstances including but + not limited to this object's cancel method not having been called and + the computation's result having become immediately available. + """ + raise NotImplementedError() -class Future(object): - """A representation of a computation happening in another control flow. + @abc.abstractmethod + def running(self): + """Describes whether the computation is taking place. - Computations represented by a Future may have already completed, may be - ongoing, or may be yet to be begun. + This method does not block. - Computations represented by a Future are considered uninterruptable; once - started they will be allowed to terminate either by returning or raising - an exception. - """ - __metaclass__ = abc.ABCMeta + Returns: + True if the computation is scheduled to take place in the future or is + taking place now, or False if the computation took place in the past or + was cancelled. + """ + raise NotImplementedError() + # NOTE(nathaniel): These aren't quite the semantics I'd like here either. I + # would rather this only returned True in cases in which the underlying + # computation completed successfully. A computation's having been cancelled + # conflicts with considering that computation "done". @abc.abstractmethod - def cancel(self): - """Attempts to cancel the computation. + def done(self): + """Describes whether the computation has taken place. + + This method does not block. Returns: - True if the computation will not be allowed to take place or False if - the computation has already taken place or is currently taking place. + True if the computation is known to have either completed or have been + unscheduled or interrupted. False if the computation may possibly be + executing or scheduled to execute later. """ raise NotImplementedError() @abc.abstractmethod - def cancelled(self): - """Describes whether the computation was cancelled. + def result(self, timeout=None): + """Accesses the outcome of the computation or raises its exception. + + This method may return immediately or may block. + + Args: + timeout: The length of time in seconds to wait for the computation to + finish or be cancelled, or None if this method should block until the + computation has finished or is cancelled no matter how long that takes. Returns: - True if the computation was cancelled and did not take place or False - if the computation took place, is taking place, or is scheduled to - take place in the future. + The return value of the computation. + + Raises: + TimeoutError: If a timeout value is passed and the computation does not + terminate within the allotted time. + CancelledError: If the computation was cancelled. + Exception: If the computation raised an exception, this call will raise + the same exception. """ raise NotImplementedError() @abc.abstractmethod - def done(self): - """Describes whether the computation has taken place. + def exception(self, timeout=None): + """Return the exception raised by the computation. + + This method may return immediately or may block. + + Args: + timeout: The length of time in seconds to wait for the computation to + terminate or be cancelled, or None if this method should block until + the computation is terminated or is cancelled no matter how long that + takes. Returns: - True if the computation took place; False otherwise. + The exception raised by the computation, or None if the computation did + not raise an exception. + + Raises: + TimeoutError: If a timeout value is passed and the computation does not + terminate within the allotted time. + CancelledError: If the computation was cancelled. """ raise NotImplementedError() @abc.abstractmethod - def outcome(self): - """Accesses the outcome of the computation. + def traceback(self, timeout=None): + """Access the traceback of the exception raised by the computation. - If the computation has not yet completed, this method blocks until it has. + This method may return immediately or may block. + + Args: + timeout: The length of time in seconds to wait for the computation to + terminate or be cancelled, or None if this method should block until + the computation is terminated or is cancelled no matter how long that + takes. Returns: - An Outcome describing the outcome of the computation. + The traceback of the exception raised by the computation, or None if the + computation did not raise an exception. + + Raises: + TimeoutError: If a timeout value is passed and the computation does not + terminate within the allotted time. + CancelledError: If the computation was cancelled. """ raise NotImplementedError() @abc.abstractmethod - def add_done_callback(self, callback): + def add_done_callback(self, fn): """Adds a function to be called at completion of the computation. - The callback will be passed an Outcome object describing the outcome of + The callback will be passed this Future object describing the outcome of the computation. If the computation has already completed, the callback will be called immediately. Args: - callback: A callable taking an Outcome as its single parameter. + fn: A callable taking a this Future object as its single parameter. """ raise NotImplementedError() diff --git a/src/python/src/grpc_early_adopter/__init__.py b/src/python/src/grpc_early_adopter/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/src/python/src/grpc_early_adopter/__init__.py diff --git a/src/python/src/grpc_early_adopter/_face_utilities.py b/src/python/src/grpc_early_adopter/_face_utilities.py new file mode 100644 index 0000000000..8b10be729b --- /dev/null +++ b/src/python/src/grpc_early_adopter/_face_utilities.py @@ -0,0 +1,143 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import abc +import collections + +from _framework.face import interfaces as face_interfaces + +from grpc_early_adopter import interfaces + + +class _InlineUnaryUnaryMethod(face_interfaces.InlineValueInValueOutMethod): + + def __init__(self, unary_unary_rpc_method): + self._method = unary_unary_rpc_method + + def service(self, request, context): + """See face_interfaces.InlineValueInValueOutMethod.service for spec.""" + return self._method.service_unary_unary(request) + + +class _InlineUnaryStreamMethod(face_interfaces.InlineValueInStreamOutMethod): + + def __init__(self, unary_stream_rpc_method): + self._method = unary_stream_rpc_method + + def service(self, request, context): + """See face_interfaces.InlineValueInStreamOutMethod.service for spec.""" + return self._method.service_unary_stream(request) + + +class _InlineStreamUnaryMethod(face_interfaces.InlineStreamInValueOutMethod): + + def __init__(self, stream_unary_rpc_method): + self._method = stream_unary_rpc_method + + def service(self, request_iterator, context): + """See face_interfaces.InlineStreamInValueOutMethod.service for spec.""" + return self._method.service_stream_unary(request_iterator) + + +class _InlineStreamStreamMethod(face_interfaces.InlineStreamInStreamOutMethod): + + def __init__(self, stream_stream_rpc_method): + self._method = stream_stream_rpc_method + + def service(self, request_iterator, context): + """See face_interfaces.InlineStreamInStreamOutMethod.service for spec.""" + return self._method.service_stream_stream(request_iterator) + + +class Breakdown(object): + """An intermediate representation of implementations of RPC methods. + + Attributes: + unary_unary_methods: + unary_stream_methods: + stream_unary_methods: + stream_stream_methods: + request_serializers: + request_deserializers: + response_serializers: + response_deserializers: + """ + __metaclass__ = abc.ABCMeta + + + +class _EasyBreakdown( + Breakdown, + collections.namedtuple( + '_EasyBreakdown', + ['unary_unary_methods', 'unary_stream_methods', 'stream_unary_methods', + 'stream_stream_methods', 'request_serializers', + 'request_deserializers', 'response_serializers', + 'response_deserializers'])): + pass + + +def break_down(methods): + """Breaks down RPC methods. + + Args: + methods: A dictionary from RPC mthod name to + interfaces.RpcMethod object describing the RPCs. + + Returns: + A Breakdown corresponding to the given methods. + """ + unary_unary = {} + unary_stream = {} + stream_unary = {} + stream_stream = {} + request_serializers = {} + request_deserializers = {} + response_serializers = {} + response_deserializers = {} + + for name, method in methods.iteritems(): + cardinality = method.cardinality() + if cardinality is interfaces.Cardinality.UNARY_UNARY: + unary_unary[name] = _InlineUnaryUnaryMethod(method) + elif cardinality is interfaces.Cardinality.UNARY_STREAM: + unary_stream[name] = _InlineUnaryStreamMethod(method) + elif cardinality is interfaces.Cardinality.STREAM_UNARY: + stream_unary[name] = _InlineStreamUnaryMethod(method) + elif cardinality is interfaces.Cardinality.STREAM_STREAM: + stream_stream[name] = _InlineStreamStreamMethod(method) + request_serializers[name] = method.serialize_request + request_deserializers[name] = method.deserialize_request + response_serializers[name] = method.serialize_response + response_deserializers[name] = method.deserialize_response + + return _EasyBreakdown( + unary_unary, unary_stream, stream_unary, stream_stream, + request_serializers, request_deserializers, response_serializers, + response_deserializers) diff --git a/src/python/src/grpc_early_adopter/implementations.py b/src/python/src/grpc_early_adopter/implementations.py new file mode 100644 index 0000000000..8a2f7fde61 --- /dev/null +++ b/src/python/src/grpc_early_adopter/implementations.py @@ -0,0 +1,129 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Entry points into GRPC.""" + +import threading + +from _adapter import fore +from _framework.base.packets import implementations as _tickets_implementations +from _framework.face import implementations as _face_implementations +from _framework.foundation import logging_pool +from grpc_early_adopter import _face_utilities +from grpc_early_adopter import interfaces + +_MEGA_TIMEOUT = 60 * 60 * 24 +_THREAD_POOL_SIZE = 80 + + +class _Server(interfaces.Server): + + def __init__(self, breakdown, port, private_key, certificate_chain): + self._lock = threading.Lock() + self._breakdown = breakdown + self._port = port + self._private_key = private_key + self._certificate_chain = certificate_chain + + self._pool = None + self._fore_link = None + self._back = None + + def start(self): + """See interfaces.Server.start for specification.""" + with self._lock: + if self._pool is None: + self._pool = logging_pool.pool(_THREAD_POOL_SIZE) + servicer = _face_implementations.servicer( + self._pool, + inline_value_in_value_out_methods=self._breakdown.unary_unary_methods, + inline_value_in_stream_out_methods=self._breakdown.unary_stream_methods, + inline_stream_in_value_out_methods=self._breakdown.stream_unary_methods, + inline_stream_in_stream_out_methods=self._breakdown.stream_stream_methods) + self._fore_link = fore.ForeLink( + self._pool, self._breakdown.request_deserializers, + self._breakdown.response_serializers, None, + ((self._private_key, self._certificate_chain),), port=self._port) + port = self._fore_link.start() + self._back = _tickets_implementations.back( + servicer, self._pool, self._pool, self._pool, _MEGA_TIMEOUT, + _MEGA_TIMEOUT) + self._fore_link.join_rear_link(self._back) + self._back.join_fore_link(self._fore_link) + return port + else: + raise ValueError('Server currently running!') + + def stop(self): + """See interfaces.Server.stop for specification.""" + with self._lock: + if self._pool is None: + raise ValueError('Server not running!') + else: + self._fore_link.stop() + self._pool.shutdown(wait=True) + self._pool = None + + +def _build_server(methods, port, private_key, certificate_chain): + breakdown = _face_utilities.break_down(methods) + return _Server(breakdown, port, private_key, certificate_chain) + + +def insecure_server(methods, port): + """Constructs an insecure interfaces.Server. + + Args: + methods: A dictionary from RPC method name to + interfaces.RpcMethod object describing the RPCs to be + serviced by the created server. + port: The port on which to serve. + + Returns: + An interfaces.Server that will run with no security and + service unsecured raw requests. + """ + return _build_server(methods, port, None, None) + + +def secure_server(methods, port, private_key, certificate_chain): + """Constructs a secure interfaces.Server. + + Args: + methods: A dictionary from RPC method name to + interfaces.RpcMethod object describing the RPCs to be + serviced by the created server. + port: The port on which to serve. + private_key: A pem-encoded private key. + certificate_chain: A pem-encoded certificate chain. + + Returns: + An interfaces.Server that will serve secure traffic. + """ + return _build_server(methods, port, private_key, certificate_chain) diff --git a/src/python/src/grpc_early_adopter/interfaces.py b/src/python/src/grpc_early_adopter/interfaces.py new file mode 100644 index 0000000000..c2806c235c --- /dev/null +++ b/src/python/src/grpc_early_adopter/interfaces.py @@ -0,0 +1,194 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Interfaces of GRPC.""" + +import abc +import enum + + +@enum.unique +class Cardinality(enum.Enum): + """Constants for the four cardinalities of RPC.""" + + UNARY_UNARY = 'request-unary/response-unary' + UNARY_STREAM = 'request-unary/response-streaming' + STREAM_UNARY = 'request-streaming/response-unary' + STREAM_STREAM = 'request-streaming/response-streaming' + + +class RpcMethod(object): + """A sum type for the implementation of an RPC method.""" + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def cardinality(self): + """Identifies the cardinality of this RpcMethod. + + Returns: + A Cardinality value identifying whether or not this + RpcMethod is request-unary or request-streaming and + whether or not it is response-unary or + response-streaming. + """ + raise NotImplementedError() + + @abc.abstractmethod + def serialize_request(self, request): + """Serializes a request value. + + Args: + request: A request value appropriate for this RpcMethod. + + Returns: + The serialization of the given request value as a + bytestring. + """ + raise NotImplementedError() + + @abc.abstractmethod + def deserialize_request(self, serialized_request): + """Deserializes a request value. + + Args: + serialized_request: A bytestring that is the + serialization of a request value appropriate for this + RpcMethod. + + Returns: + A request value corresponding to the given bytestring. + """ + raise NotImplementedError() + + @abc.abstractmethod + def serialize_response(self, response): + """Serializes a response value. + + Args: + response: A response value appropriate for this RpcMethod. + + Returns: + The serialization of the given response value as a + bytestring. + """ + raise NotImplementedError() + + @abc.abstractmethod + def deserialize_response(self, serialized_response): + """Deserializes a response value. + + Args: + serialized_response: A bytestring that is the + serialization of a response value appropriate for this + RpcMethod. + + Returns: + A response value corresponding to the given bytestring. + """ + raise NotImplementedError() + + @abc.abstractmethod + def service_unary_unary(self, request): + """Carries out this RPC. + + This method may only be called if the cardinality of this + RpcMethod is Cardinality.UNARY_UNARY. + + Args: + request: A request value appropriate for this RpcMethod. + + Returns: + A response value appropriate for this RpcMethod. + """ + raise NotImplementedError() + + @abc.abstractmethod + def service_unary_stream(self, request): + """Carries out this RPC. + + This method may only be called if the cardinality of this + RpcMethod is Cardinality.UNARY_STREAM. + + Args: + request: A request value appropriate for this RpcMethod. + + Yields: + Zero or more response values appropriate for this + RpcMethod. + """ + raise NotImplementedError() + + @abc.abstractmethod + def service_stream_unary(self, request_iterator): + """Carries out this RPC. + + This method may only be called if the cardinality of this + RpcMethod is Cardinality.STREAM_UNARY. + + Args: + request_iterator: An iterator of request values + appropriate for this RpcMethod. + + Returns: + A response value appropriate for this RpcMethod. + """ + raise NotImplementedError() + + @abc.abstractmethod + def service_stream_stream(self, request_iterator): + """Carries out this RPC. + + This method may only be called if the cardinality of this + RpcMethod is Cardinality.STREAM_STREAM. + + Args: + request_iterator: An iterator of request values + appropriate for this RpcMethod. + + Yields: + Zero or more response values appropraite for this + RpcMethod. + """ + raise NotImplementedError() + + +class Server(object): + """A GRPC Server.""" + __metaclass__ = abc.ABCMeta + + + @abc.abstractmethod + def start(self): + """Instructs this server to commence service of RPCs.""" + raise NotImplementedError() + + @abc.abstractmethod + def stop(self): + """Instructs this server to halt service of RPCs.""" + raise NotImplementedError() diff --git a/src/python/src/grpc_early_adopter/utilities.py b/src/python/src/grpc_early_adopter/utilities.py new file mode 100644 index 0000000000..333ed3a9db --- /dev/null +++ b/src/python/src/grpc_early_adopter/utilities.py @@ -0,0 +1,213 @@ +# Copyright 2015, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +"""Utilities for use with GRPC.""" + +from grpc_early_adopter import interfaces + + +class _RpcMethod(interfaces.RpcMethod): + + def __init__( + self, cardinality, unary_unary, unary_stream, stream_unary, + stream_stream, request_serializer, request_deserializer, + response_serializer, response_deserializer): + self._cardinality = cardinality + self._unary_unary = unary_unary + self._unary_stream = unary_stream + self._stream_unary = stream_unary + self._stream_stream = stream_stream + self._request_serializer = request_serializer + self._request_deserializer = request_deserializer + self._response_serializer = response_serializer + self._response_deserializer = response_deserializer + + def cardinality(self): + """See interfaces.RpcMethod.cardinality for specification.""" + return self._cardinality + + def serialize_request(self, request): + """See interfaces.RpcMethod.serialize_request for specification.""" + return self._request_serializer(request) + + def deserialize_request(self, serialized_request): + """See interfaces.RpcMethod.deserialize_request for specification.""" + return self._request_deserializer(serialized_request) + + def serialize_response(self, response): + """See interfaces.RpcMethod.serialize_response for specification.""" + return self._response_serializer(response) + + def deserialize_response(self, serialized_response): + """See interfaces.RpcMethod.deserialize_response for specification.""" + return self._response_deserializer(serialized_response) + + def service_unary_unary(self, request): + """See interfaces.RpcMethod.service_unary_unary for specification.""" + return self._unary_unary(request) + + def service_unary_stream(self, request): + """See interfaces.RpcMethod.service_unary_stream for specification.""" + return self._unary_stream(request) + + def service_stream_unary(self, request_iterator): + """See interfaces.RpcMethod.service_stream_unary for specification.""" + return self._stream_unary(request_iterator) + + def service_stream_stream(self, request_iterator): + """See interfaces.RpcMethod.service_stream_stream for specification.""" + return self._stream_stream(request_iterator) + + +def unary_unary_rpc_method( + behavior, request_serializer, request_deserializer, response_serializer, + response_deserializer): + """Constructs an interfaces.RpcMethod for the given behavior. + + Args: + behavior: A callable that implements a unary-unary RPC + method that accepts a single request and returns a single + response. + request_serializer: A callable that when called on a request + value returns a bytestring corresponding to that value. + request_deserializer: A callable that when called on a + bytestring returns the request value corresponding to that + bytestring. + response_serializer: A callable that when called on a + response value returns the bytestring corresponding to + that value. + response_deserializer: A callable that when called on a + bytestring returns the response value corresponding to + that bytestring. + + Returns: + An interfaces.RpcMethod constructed from the given + arguments representing a unary-request/unary-response RPC + method. + """ + return _RpcMethod( + interfaces.Cardinality.UNARY_UNARY, behavior, None, None, None, + request_serializer, request_deserializer, response_serializer, + response_deserializer) + + +def unary_stream_rpc_method( + behavior, request_serializer, request_deserializer, response_serializer, + response_deserializer): + """Constructs an interfaces.RpcMethod for the given behavior. + + Args: + behavior: A callable that implements a unary-stream RPC + method that accepts a single request and returns an + iterator of zero or more responses. + request_serializer: A callable that when called on a request + value returns a bytestring corresponding to that value. + request_deserializer: A callable that when called on a + bytestring returns the request value corresponding to that + bytestring. + response_serializer: A callable that when called on a + response value returns the bytestring corresponding to + that value. + response_deserializer: A callable that when called on a + bytestring returns the response value corresponding to + that bytestring. + + Returns: + An interfaces.RpcMethod constructed from the given + arguments representing a unary-request/streaming-response + RPC method. + """ + return _RpcMethod( + interfaces.Cardinality.UNARY_STREAM, None, behavior, None, None, + request_serializer, request_deserializer, response_serializer, + response_deserializer) + + +def stream_unary_rpc_method( + behavior, request_serializer, request_deserializer, response_serializer, + response_deserializer): + """Constructs an interfaces.RpcMethod for the given behavior. + + Args: + behavior: A callable that implements a stream-unary RPC + method that accepts an iterator of zero or more requests + and returns a single response. + request_serializer: A callable that when called on a request + value returns a bytestring corresponding to that value. + request_deserializer: A callable that when called on a + bytestring returns the request value corresponding to that + bytestring. + response_serializer: A callable that when called on a + response value returns the bytestring corresponding to + that value. + response_deserializer: A callable that when called on a + bytestring returns the response value corresponding to + that bytestring. + + Returns: + An interfaces.RpcMethod constructed from the given + arguments representing a streaming-request/unary-response + RPC method. + """ + return _RpcMethod( + interfaces.Cardinality.STREAM_UNARY, None, None, behavior, None, + request_serializer, request_deserializer, response_serializer, + response_deserializer) + + +def stream_stream_rpc_method( + behavior, request_serializer, request_deserializer, response_serializer, + response_deserializer): + """Constructs an interfaces.RpcMethod for the given behavior. + + Args: + behavior: A callable that implements a stream-stream RPC + method that accepts an iterator of zero or more requests + and returns an iterator of zero or more responses. + request_serializer: A callable that when called on a request + value returns a bytestring corresponding to that value. + request_deserializer: A callable that when called on a + bytestring returns the request value corresponding to that + bytestring. + response_serializer: A callable that when called on a + response value returns the bytestring corresponding to + that value. + response_deserializer: A callable that when called on a + bytestring returns the response value corresponding to + that bytestring. + + Returns: + An interfaces.RpcMethod constructed from the given + arguments representing a + streaming-request/streaming-response RPC method. + """ + return _RpcMethod( + interfaces.Cardinality.STREAM_STREAM, None, None, None, behavior, + request_serializer, request_deserializer, response_serializer, + response_deserializer) diff --git a/src/python/setup.py b/src/python/src/setup.py index 5e566bad4f..93af4d68ca 100644 --- a/src/python/setup.py +++ b/src/python/src/setup.py @@ -32,19 +32,17 @@ from distutils import core as _core _EXTENSION_SOURCES = ( - 'src/_adapter/_c.c', - 'src/_adapter/_call.c', - 'src/_adapter/_channel.c', - 'src/_adapter/_completion_queue.c', - 'src/_adapter/_error.c', - 'src/_adapter/_server.c', - 'src/_adapter/_server_credentials.c', + '_adapter/_c.c', + '_adapter/_call.c', + '_adapter/_channel.c', + '_adapter/_completion_queue.c', + '_adapter/_error.c', + '_adapter/_server.c', + '_adapter/_server_credentials.c', ) _EXTENSION_INCLUDE_DIRECTORIES = ( - 'src', - # TODO(nathaniel): Can this path specification be made to work? - #'../../include', + '.', ) _EXTENSION_LIBRARIES = ( @@ -52,16 +50,11 @@ _EXTENSION_LIBRARIES = ( 'grpc', ) -_EXTENSION_LIBRARY_DIRECTORIES = ( - # TODO(nathaniel): Can this path specification be made to work? - #'../../libs/dbg', -) - _EXTENSION_MODULE = _core.Extension( '_adapter._c', sources=list(_EXTENSION_SOURCES), include_dirs=_EXTENSION_INCLUDE_DIRECTORIES, libraries=_EXTENSION_LIBRARIES, - library_dirs=_EXTENSION_LIBRARY_DIRECTORIES) + ) _PACKAGES=( '_adapter', @@ -73,15 +66,17 @@ _PACKAGES=( '_framework.face.testing', '_framework.foundation', '_junkdrawer', + 'grpc_early_adopter', ) _PACKAGE_DIRECTORIES = { - '_adapter': 'src/_adapter', - '_framework': 'src/_framework', - '_junkdrawer': 'src/_junkdrawer', + '_adapter': '_adapter', + '_framework': '_framework', + '_junkdrawer': '_junkdrawer', + 'grpc_early_adopter': 'grpc_early_adopter', } _core.setup( - name='grpc', version='0.0.1', + name='grpc-2015', version='0.0.1', ext_modules=[_EXTENSION_MODULE], packages=_PACKAGES, package_dir=_PACKAGE_DIRECTORIES) |