diff options
author | Craig Tiller <ctiller@google.com> | 2015-02-02 14:43:23 -0800 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-02-02 14:43:23 -0800 |
commit | e863e1b855dcb4d1d156d3fa3d74467cedf3eb8f (patch) | |
tree | 2e0026e16c8790b9cef98fdf3592c3e4d07b22bd | |
parent | a7170ac4ad04591a5db33d5271ef6ce30662e9fd (diff) | |
parent | c10f5c97806f84f8805d236cd280f8ff3b26b574 (diff) |
Merge github.com:google/grpc into async-api
26 files changed, 249 insertions, 213 deletions
diff --git a/include/grpc/support/port_platform.h b/include/grpc/support/port_platform.h index 58fce64ff1..2bf5348315 100644 --- a/include/grpc/support/port_platform.h +++ b/include/grpc/support/port_platform.h @@ -56,6 +56,8 @@ #define GPR_CPU_LINUX 1 #define GPR_GCC_SYNC 1 #define GPR_POSIX_MULTIPOLL_WITH_POLL 1 +#define GPR_POSIX_WAKEUP_FD 1 +#define GPR_LINUX_EVENTFD 1 #define GPR_POSIX_SOCKET 1 #define GPR_POSIX_SOCKETADDR 1 #define GPR_POSIX_SOCKETUTILS 1 @@ -68,7 +70,7 @@ #define GPR_GCC_ATOMIC 1 #define GPR_LINUX 1 #define GPR_POSIX_MULTIPOLL_WITH_POLL 1 -#define GPR_POSIX_HAS_SPECIAL_WAKEUP_FD 1 +#define GPR_POSIX_WAKEUP_FD 1 #define GPR_LINUX_EVENTFD 1 #define GPR_POSIX_SOCKET 1 #define GPR_POSIX_SOCKETADDR 1 @@ -86,6 +88,8 @@ #define GPR_GCC_ATOMIC 1 #define GPR_POSIX_LOG 1 #define GPR_POSIX_MULTIPOLL_WITH_POLL 1 +#define GPR_POSIX_WAKEUP_FD 1 +#define GPR_POSIX_NO_SPECIAL_WAKEUP_FD 1 #define GPR_POSIX_SOCKET 1 #define GPR_POSIX_SOCKETADDR 1 #define GPR_POSIX_SOCKETUTILS 1 diff --git a/src/core/iomgr/pollset_kick.c b/src/core/iomgr/pollset_kick.c index 42b110d124..238ec75c61 100644 --- a/src/core/iomgr/pollset_kick.c +++ b/src/core/iomgr/pollset_kick.c @@ -138,6 +138,7 @@ void grpc_pollset_kick_kick(grpc_pollset_kick_state *kick_state) { } void grpc_pollset_kick_global_init_fallback_fd(void) { + gpr_mu_init(&fd_freelist_mu); grpc_wakeup_fd_global_init_force_fallback(); } diff --git a/src/core/iomgr/wakeup_fd_nospecial.c b/src/core/iomgr/wakeup_fd_nospecial.c index 21e8074d50..c1038bf379 100644 --- a/src/core/iomgr/wakeup_fd_nospecial.c +++ b/src/core/iomgr/wakeup_fd_nospecial.c @@ -38,16 +38,17 @@ #include <grpc/support/port_platform.h> -#ifndef GPR_POSIX_HAS_SPECIAL_WAKEUP_FD +#ifdef GPR_POSIX_NO_SPECIAL_WAKEUP_FD -#include "src/core/iomgr/wakeup_fd.h" +#include "src/core/iomgr/wakeup_fd_posix.h" +#include <stddef.h> static int check_availability_invalid(void) { return 0; } -const grpc_wakeup_fd_vtable specialized_wakeup_fd_vtable = { +const grpc_wakeup_fd_vtable grpc_specialized_wakeup_fd_vtable = { NULL, NULL, NULL, NULL, check_availability_invalid }; -#endif /* GPR_POSIX_HAS_SPECIAL_WAKEUP */ +#endif /* GPR_POSIX_NO_SPECIAL_WAKEUP_FD */ diff --git a/src/core/iomgr/wakeup_fd_pipe.c b/src/core/iomgr/wakeup_fd_pipe.c index f36e6eeb9f..f895478990 100644 --- a/src/core/iomgr/wakeup_fd_pipe.c +++ b/src/core/iomgr/wakeup_fd_pipe.c @@ -31,7 +31,10 @@ * */ -/* TODO(klempner): Allow this code to be disabled. */ +#include <grpc/support/port_platform.h> + +#ifdef GPR_POSIX_WAKEUP_FD + #include "src/core/iomgr/wakeup_fd_posix.h" #include <errno.h> @@ -87,7 +90,8 @@ static int pipe_check_availability(void) { return 1; } -const grpc_wakeup_fd_vtable pipe_wakeup_fd_vtable = { +const grpc_wakeup_fd_vtable grpc_pipe_wakeup_fd_vtable = { pipe_create, pipe_consume, pipe_wakeup, pipe_destroy, pipe_check_availability }; +#endif /* GPR_POSIX_WAKUP_FD */ diff --git a/src/core/iomgr/wakeup_fd_pipe.h b/src/core/iomgr/wakeup_fd_pipe.h index fc2898f570..a2fcde5b55 100644 --- a/src/core/iomgr/wakeup_fd_pipe.h +++ b/src/core/iomgr/wakeup_fd_pipe.h @@ -36,6 +36,6 @@ #include "src/core/iomgr/wakeup_fd_posix.h" -extern grpc_wakeup_fd_vtable pipe_wakeup_fd_vtable; +extern grpc_wakeup_fd_vtable grpc_pipe_wakeup_fd_vtable; #endif /* __GRPC_INTERNAL_IOMGR_WAKEUP_FD_PIPE_H_ */ diff --git a/src/core/iomgr/wakeup_fd_posix.c b/src/core/iomgr/wakeup_fd_posix.c index 9107cf37b1..d3cc3ec570 100644 --- a/src/core/iomgr/wakeup_fd_posix.c +++ b/src/core/iomgr/wakeup_fd_posix.c @@ -31,6 +31,10 @@ * */ +#include <grpc/support/port_platform.h> + +#ifdef GPR_POSIX_WAKEUP_FD + #include "src/core/iomgr/wakeup_fd_posix.h" #include "src/core/iomgr/wakeup_fd_pipe.h" #include <stddef.h> @@ -38,15 +42,15 @@ static const grpc_wakeup_fd_vtable *wakeup_fd_vtable = NULL; void grpc_wakeup_fd_global_init(void) { - if (specialized_wakeup_fd_vtable.check_availability()) { - wakeup_fd_vtable = &specialized_wakeup_fd_vtable; + if (grpc_specialized_wakeup_fd_vtable.check_availability()) { + wakeup_fd_vtable = &grpc_specialized_wakeup_fd_vtable; } else { - wakeup_fd_vtable = &pipe_wakeup_fd_vtable; + wakeup_fd_vtable = &grpc_pipe_wakeup_fd_vtable; } } void grpc_wakeup_fd_global_init_force_fallback(void) { - wakeup_fd_vtable = &pipe_wakeup_fd_vtable; + wakeup_fd_vtable = &grpc_pipe_wakeup_fd_vtable; } void grpc_wakeup_fd_global_destroy(void) { @@ -68,3 +72,5 @@ void grpc_wakeup_fd_wakeup(grpc_wakeup_fd_info *fd_info) { void grpc_wakeup_fd_destroy(grpc_wakeup_fd_info *fd_info) { wakeup_fd_vtable->destroy(fd_info); } + +#endif /* GPR_POSIX_WAKEUP_FD */ diff --git a/src/core/iomgr/wakeup_fd_posix.h b/src/core/iomgr/wakeup_fd_posix.h index c2769afb2a..75bb9fc766 100644 --- a/src/core/iomgr/wakeup_fd_posix.h +++ b/src/core/iomgr/wakeup_fd_posix.h @@ -62,29 +62,14 @@ #ifndef __GRPC_INTERNAL_IOMGR_WAKEUP_FD_POSIX_H_ #define __GRPC_INTERNAL_IOMGR_WAKEUP_FD_POSIX_H_ -typedef struct grpc_wakeup_fd_info grpc_wakeup_fd_info; - void grpc_wakeup_fd_global_init(void); void grpc_wakeup_fd_global_destroy(void); - -void grpc_wakeup_fd_create(grpc_wakeup_fd_info *fd_info); -void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd_info *fd_info); -void grpc_wakeup_fd_wakeup(grpc_wakeup_fd_info *fd_info); -void grpc_wakeup_fd_destroy(grpc_wakeup_fd_info *fd_info); - -#define GRPC_WAKEUP_FD_GET_READ_FD(fd_info) ((fd_info)->read_fd) - /* Force using the fallback implementation. This is intended for testing * purposes only.*/ void grpc_wakeup_fd_global_init_force_fallback(void); -/* Private structures; don't access their fields directly outside of wakeup fd - * code. */ -struct grpc_wakeup_fd_info { - int read_fd; - int write_fd; -}; +typedef struct grpc_wakeup_fd_info grpc_wakeup_fd_info; typedef struct grpc_wakeup_fd_vtable { void (*create)(grpc_wakeup_fd_info *fd_info); @@ -95,8 +80,20 @@ typedef struct grpc_wakeup_fd_vtable { int (*check_availability)(void); } grpc_wakeup_fd_vtable; +struct grpc_wakeup_fd_info { + int read_fd; + int write_fd; +}; + +#define GRPC_WAKEUP_FD_GET_READ_FD(fd_info) ((fd_info)->read_fd) + +void grpc_wakeup_fd_create(grpc_wakeup_fd_info *fd_info); +void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd_info *fd_info); +void grpc_wakeup_fd_wakeup(grpc_wakeup_fd_info *fd_info); +void grpc_wakeup_fd_destroy(grpc_wakeup_fd_info *fd_info); + /* Defined in some specialized implementation's .c file, or by * wakeup_fd_nospecial.c if no such implementation exists. */ -extern const grpc_wakeup_fd_vtable specialized_wakeup_fd_vtable; +extern const grpc_wakeup_fd_vtable grpc_specialized_wakeup_fd_vtable; #endif /* __GRPC_INTERNAL_IOMGR_WAKEUP_FD_POSIX_H_ */ diff --git a/src/core/support/log_posix.c b/src/core/support/log_posix.c index 1292c9e8c3..ab2d2e5a74 100644 --- a/src/core/support/log_posix.c +++ b/src/core/support/log_posix.c @@ -64,7 +64,7 @@ void gpr_log(const char *file, int line, gpr_log_severity severity, va_end(args); if (ret < 0) { message = NULL; - } else if (ret <= sizeof(buf) - 1) { + } else if ((size_t)ret <= sizeof(buf) - 1) { message = buf; } else { message = allocated = gpr_malloc(ret + 1); diff --git a/src/python/src/_adapter/_links_test.py b/src/python/src/_adapter/_links_test.py index 94f17d007b..8341460a9a 100644 --- a/src/python/src/_adapter/_links_test.py +++ b/src/python/src/_adapter/_links_test.py @@ -80,8 +80,8 @@ class RoundTripTest(unittest.TestCase): rear_link.start() front_to_back_ticket = tickets.FrontToBackPacket( - test_operation_id, 0, tickets.Kind.ENTIRE, test_method, interfaces.FULL, - None, None, _TIMEOUT) + test_operation_id, 0, tickets.Kind.ENTIRE, test_method, + interfaces.ServicedSubscription.Kind.FULL, None, None, _TIMEOUT) rear_link.accept_front_to_back_ticket(front_to_back_ticket) with test_fore_link.condition: @@ -133,8 +133,9 @@ class RoundTripTest(unittest.TestCase): rear_link.start() front_to_back_ticket = tickets.FrontToBackPacket( - test_operation_id, 0, tickets.Kind.ENTIRE, test_method, interfaces.FULL, - None, test_front_to_back_datum, _TIMEOUT) + test_operation_id, 0, tickets.Kind.ENTIRE, test_method, + interfaces.ServicedSubscription.Kind.FULL, None, + test_front_to_back_datum, _TIMEOUT) rear_link.accept_front_to_back_ticket(front_to_back_ticket) with test_fore_link.condition: @@ -196,7 +197,7 @@ class RoundTripTest(unittest.TestCase): commencement_ticket = tickets.FrontToBackPacket( test_operation_id, 0, tickets.Kind.COMMENCEMENT, test_method, - interfaces.FULL, None, None, _TIMEOUT) + interfaces.ServicedSubscription.Kind.FULL, None, None, _TIMEOUT) fore_sequence_number = 1 rear_link.accept_front_to_back_ticket(commencement_ticket) for request in scenario.requests(): diff --git a/src/python/src/_adapter/_lonely_rear_link_test.py b/src/python/src/_adapter/_lonely_rear_link_test.py index 18279e05a2..7ccdb0b530 100644 --- a/src/python/src/_adapter/_lonely_rear_link_test.py +++ b/src/python/src/_adapter/_lonely_rear_link_test.py @@ -69,7 +69,7 @@ class LonelyRearLinkTest(unittest.TestCase): front_to_back_ticket = packets.FrontToBackPacket( test_operation_id, 0, front_to_back_ticket_kind, test_method, - interfaces.FULL, None, None, _TIMEOUT) + interfaces.ServicedSubscription.Kind.FULL, None, None, _TIMEOUT) rear_link.accept_front_to_back_ticket(front_to_back_ticket) with fore_link.condition: diff --git a/src/python/src/_adapter/fore.py b/src/python/src/_adapter/fore.py index 0f00957938..c307e7ce63 100644 --- a/src/python/src/_adapter/fore.py +++ b/src/python/src/_adapter/fore.py @@ -116,7 +116,8 @@ class ForeLink(ticket_interfaces.ForeLink): self._response_serializers[method]) ticket = tickets.FrontToBackPacket( - call, 0, tickets.Kind.COMMENCEMENT, method, interfaces.FULL, None, None, + call, 0, tickets.Kind.COMMENCEMENT, method, + interfaces.ServicedSubscription.Kind.FULL, None, None, service_acceptance.deadline - time.time()) self._rear_link.accept_front_to_back_ticket(ticket) diff --git a/src/python/src/_framework/base/interfaces.py b/src/python/src/_framework/base/interfaces.py index de7137cbf7..70030e564d 100644 --- a/src/python/src/_framework/base/interfaces.py +++ b/src/python/src/_framework/base/interfaces.py @@ -29,27 +29,24 @@ """Interfaces defined and used by the base layer of RPC Framework.""" -# TODO(nathaniel): Use Python's new enum library for enumerated types rather -# than constants merely placed close together. - import abc +import enum # stream is referenced from specification in this module. from _framework.foundation import stream # pylint: disable=unused-import -# Operation outcomes. -COMPLETED = 'completed' -CANCELLED = 'cancelled' -EXPIRED = 'expired' -RECEPTION_FAILURE = 'reception failure' -TRANSMISSION_FAILURE = 'transmission failure' -SERVICER_FAILURE = 'servicer failure' -SERVICED_FAILURE = 'serviced failure' -# Subscription categories. -FULL = 'full' -TERMINATION_ONLY = 'termination only' -NONE = 'none' +@enum.unique +class Outcome(enum.Enum): + """Operation outcomes.""" + + COMPLETED = 'completed' + CANCELLED = 'cancelled' + EXPIRED = 'expired' + RECEPTION_FAILURE = 'reception failure' + TRANSMISSION_FAILURE = 'transmission failure' + SERVICER_FAILURE = 'servicer failure' + SERVICED_FAILURE = 'serviced failure' class OperationContext(object): @@ -70,9 +67,7 @@ class OperationContext(object): """Adds a function to be called upon operation termination. Args: - callback: A callable that will be passed one of COMPLETED, CANCELLED, - EXPIRED, RECEPTION_FAILURE, TRANSMISSION_FAILURE, SERVICER_FAILURE, or - SERVICED_FAILURE. + callback: A callable that will be passed an Outcome value. """ raise NotImplementedError() @@ -167,11 +162,20 @@ class ServicedSubscription(object): """A sum type representing a serviced's interest in an operation. Attributes: - category: One of FULL, TERMINATION_ONLY, or NONE. - ingestor: A ServicedIngestor. Must be present if category is FULL. + kind: A Kind value. + ingestor: A ServicedIngestor. Must be present if kind is Kind.FULL. Must + be None if kind is Kind.TERMINATION_ONLY or Kind.NONE. """ __metaclass__ = abc.ABCMeta + @enum.unique + class Kind(enum.Enum): + """Kinds of subscription.""" + + FULL = 'full' + TERMINATION_ONLY = 'termination only' + NONE = 'none' + class End(object): """Common type for entry-point objects on both sides of an operation.""" @@ -182,9 +186,8 @@ class End(object): """Reports the number of terminated operations broken down by outcome. Returns: - A dictionary from operation outcome constant (COMPLETED, CANCELLED, - EXPIRED, and so on) to an integer representing the number of operations - that terminated with that outcome. + A dictionary from Outcome value to an integer identifying the number + of operations that terminated with that outcome. """ raise NotImplementedError() diff --git a/src/python/src/_framework/base/interfaces_test.py b/src/python/src/_framework/base/interfaces_test.py index 6eb07ea505..8e26d884ec 100644 --- a/src/python/src/_framework/base/interfaces_test.py +++ b/src/python/src/_framework/base/interfaces_test.py @@ -49,13 +49,13 @@ TRIGGERED_FAILURE = 'triggered failure' WAIT_ON_CONDITION = 'wait on condition' EMPTY_OUTCOME_DICT = { - interfaces.COMPLETED: 0, - interfaces.CANCELLED: 0, - interfaces.EXPIRED: 0, - interfaces.RECEPTION_FAILURE: 0, - interfaces.TRANSMISSION_FAILURE: 0, - interfaces.SERVICER_FAILURE: 0, - interfaces.SERVICED_FAILURE: 0, + interfaces.Outcome.COMPLETED: 0, + interfaces.Outcome.CANCELLED: 0, + interfaces.Outcome.EXPIRED: 0, + interfaces.Outcome.RECEPTION_FAILURE: 0, + interfaces.Outcome.TRANSMISSION_FAILURE: 0, + interfaces.Outcome.SERVICER_FAILURE: 0, + interfaces.Outcome.SERVICED_FAILURE: 0, } @@ -169,7 +169,8 @@ class FrontAndBackTest(object): SYNCHRONOUS_ECHO, None, True, SMALL_TIMEOUT, util.none_serviced_subscription(), 'test trace ID') util.wait_for_idle(self.front) - self.assertEqual(1, self.front.operation_stats()[interfaces.COMPLETED]) + self.assertEqual( + 1, self.front.operation_stats()[interfaces.Outcome.COMPLETED]) # Assuming nothing really pathological (such as pauses on the order of # SMALL_TIMEOUT interfering with this test) there are a two different ways @@ -183,7 +184,7 @@ class FrontAndBackTest(object): first_back_possibility = EMPTY_OUTCOME_DICT # (2) The packet arrived at the back and the back completed the operation. second_back_possibility = dict(EMPTY_OUTCOME_DICT) - second_back_possibility[interfaces.COMPLETED] = 1 + second_back_possibility[interfaces.Outcome.COMPLETED] = 1 self.assertIn( back_operation_stats, (first_back_possibility, second_back_possibility)) # It's true that if the packet had arrived at the back and the back had @@ -204,8 +205,10 @@ class FrontAndBackTest(object): util.wait_for_idle(self.front) util.wait_for_idle(self.back) - self.assertEqual(1, self.front.operation_stats()[interfaces.COMPLETED]) - self.assertEqual(1, self.back.operation_stats()[interfaces.COMPLETED]) + self.assertEqual( + 1, self.front.operation_stats()[interfaces.Outcome.COMPLETED]) + self.assertEqual( + 1, self.back.operation_stats()[interfaces.Outcome.COMPLETED]) self.assertListEqual([(test_payload, True)], test_consumer.calls) def testBidirectionalStreamingEcho(self): @@ -226,8 +229,10 @@ class FrontAndBackTest(object): util.wait_for_idle(self.front) util.wait_for_idle(self.back) - self.assertEqual(1, self.front.operation_stats()[interfaces.COMPLETED]) - self.assertEqual(1, self.back.operation_stats()[interfaces.COMPLETED]) + self.assertEqual( + 1, self.front.operation_stats()[interfaces.Outcome.COMPLETED]) + self.assertEqual( + 1, self.back.operation_stats()[interfaces.Outcome.COMPLETED]) self.assertListEqual(test_payloads, test_consumer.values()) def testCancellation(self): @@ -242,7 +247,8 @@ class FrontAndBackTest(object): operation.cancel() util.wait_for_idle(self.front) - self.assertEqual(1, self.front.operation_stats()[interfaces.CANCELLED]) + self.assertEqual( + 1, self.front.operation_stats()[interfaces.Outcome.CANCELLED]) util.wait_for_idle(self.back) self.assertListEqual([], test_consumer.calls) @@ -260,7 +266,7 @@ class FrontAndBackTest(object): # The back started processing based on the first packet and then stopped # upon receiving the cancellation packet. second_back_possibility = dict(EMPTY_OUTCOME_DICT) - second_back_possibility[interfaces.CANCELLED] = 1 + second_back_possibility[interfaces.Outcome.CANCELLED] = 1 self.assertIn( back_operation_stats, (first_back_possibility, second_back_possibility)) @@ -292,8 +298,10 @@ class FrontAndBackTest(object): duration = termination_time_cell[0] - start_time self.assertLessEqual(timeout, duration) self.assertLess(duration, timeout + allowance) - self.assertEqual(interfaces.EXPIRED, outcome_cell[0]) + self.assertEqual(interfaces.Outcome.EXPIRED, outcome_cell[0]) util.wait_for_idle(self.front) - self.assertEqual(1, self.front.operation_stats()[interfaces.EXPIRED]) + self.assertEqual( + 1, self.front.operation_stats()[interfaces.Outcome.EXPIRED]) util.wait_for_idle(self.back) - self.assertLessEqual(1, self.back.operation_stats()[interfaces.EXPIRED]) + self.assertLessEqual( + 1, self.back.operation_stats()[interfaces.Outcome.EXPIRED]) diff --git a/src/python/src/_framework/base/packets/_ends.py b/src/python/src/_framework/base/packets/_ends.py index baaf5cacf9..b1d16451e2 100644 --- a/src/python/src/_framework/base/packets/_ends.py +++ b/src/python/src/_framework/base/packets/_ends.py @@ -51,13 +51,13 @@ from _framework.foundation import callable_util _IDLE_ACTION_EXCEPTION_LOG_MESSAGE = 'Exception calling idle action!' _OPERATION_OUTCOMES = ( - base_interfaces.COMPLETED, - base_interfaces.CANCELLED, - base_interfaces.EXPIRED, - base_interfaces.RECEPTION_FAILURE, - base_interfaces.TRANSMISSION_FAILURE, - base_interfaces.SERVICER_FAILURE, - base_interfaces.SERVICED_FAILURE, + base_interfaces.Outcome.COMPLETED, + base_interfaces.Outcome.CANCELLED, + base_interfaces.Outcome.EXPIRED, + base_interfaces.Outcome.RECEPTION_FAILURE, + base_interfaces.Outcome.TRANSMISSION_FAILURE, + base_interfaces.Outcome.SERVICER_FAILURE, + base_interfaces.Outcome.SERVICED_FAILURE, ) @@ -193,10 +193,10 @@ def _front_operate( lock = threading.Lock() with lock: termination_manager = _termination.front_termination_manager( - work_pool, utility_pool, termination_action, subscription.category) + work_pool, utility_pool, termination_action, subscription.kind) transmission_manager = _transmission.front_transmission_manager( lock, transmission_pool, callback, operation_id, name, - subscription.category, trace_id, timeout, termination_manager) + subscription.kind, trace_id, timeout, termination_manager) operation_context = _context.OperationContext( lock, operation_id, packets.Kind.SERVICED_FAILURE, termination_manager, transmission_manager) @@ -225,9 +225,10 @@ def _front_operate( transmission_manager.inmit(payload, complete) - returned_reception_manager = ( - None if subscription.category == base_interfaces.NONE - else reception_manager) + if subscription.kind is base_interfaces.ServicedSubscription.Kind.NONE: + returned_reception_manager = None + else: + returned_reception_manager = reception_manager return _FrontManagement( returned_reception_manager, emission_manager, operation_context, diff --git a/src/python/src/_framework/base/packets/_ingestion.py b/src/python/src/_framework/base/packets/_ingestion.py index ad5ed4cada..abc1e7a043 100644 --- a/src/python/src/_framework/base/packets/_ingestion.py +++ b/src/python/src/_framework/base/packets/_ingestion.py @@ -111,7 +111,7 @@ class _FrontConsumerCreator(_ConsumerCreator): def create_consumer(self, requirement): """See _ConsumerCreator.create_consumer for specification.""" - if self._subscription.category == interfaces.FULL: + if self._subscription.kind is interfaces.ServicedSubscription.Kind.FULL: try: return _ConsumerCreation( self._subscription.ingestor.consumer(self._operation_context), diff --git a/src/python/src/_framework/base/packets/_interfaces.py b/src/python/src/_framework/base/packets/_interfaces.py index 5f6c0593d0..d1bda95bf7 100644 --- a/src/python/src/_framework/base/packets/_interfaces.py +++ b/src/python/src/_framework/base/packets/_interfaces.py @@ -58,10 +58,7 @@ class TerminationManager(object): immediately. Args: - callback: A callable that will be passed one of base_interfaces.COMPLETED, - base_interfaces.CANCELLED, base_interfaces.EXPIRED, - base_interfaces.RECEPTION_FAILURE, base_interfaces.TRANSMISSION_FAILURE, - base_interfaces.SERVICER_FAILURE, or base_interfaces.SERVICED_FAILURE. + callback: A callable that will be passed a base_interfaces.Outcome value. """ raise NotImplementedError() diff --git a/src/python/src/_framework/base/packets/_termination.py b/src/python/src/_framework/base/packets/_termination.py index d586c2167b..ae3ba1c16f 100644 --- a/src/python/src/_framework/base/packets/_termination.py +++ b/src/python/src/_framework/base/packets/_termination.py @@ -29,6 +29,8 @@ """State and behavior for operation termination.""" +import enum + from _framework.base import interfaces from _framework.base.packets import _constants from _framework.base.packets import _interfaces @@ -37,26 +39,32 @@ from _framework.foundation import callable_util _CALLBACK_EXCEPTION_LOG_MESSAGE = 'Exception calling termination callback!' -# TODO(nathaniel): enum module. -_EMISSION = 'emission' -_TRANSMISSION = 'transmission' -_INGESTION = 'ingestion' - -_FRONT_NOT_LISTENING_REQUIREMENTS = (_TRANSMISSION,) -_BACK_NOT_LISTENING_REQUIREMENTS = (_EMISSION, _INGESTION,) -_LISTENING_REQUIREMENTS = (_TRANSMISSION, _INGESTION,) - _KINDS_TO_OUTCOMES = { - packets.Kind.COMPLETION: interfaces.COMPLETED, - packets.Kind.CANCELLATION: interfaces.CANCELLED, - packets.Kind.EXPIRATION: interfaces.EXPIRED, - packets.Kind.RECEPTION_FAILURE: interfaces.RECEPTION_FAILURE, - packets.Kind.TRANSMISSION_FAILURE: interfaces.TRANSMISSION_FAILURE, - packets.Kind.SERVICER_FAILURE: interfaces.SERVICER_FAILURE, - packets.Kind.SERVICED_FAILURE: interfaces.SERVICED_FAILURE, + packets.Kind.COMPLETION: interfaces.Outcome.COMPLETED, + packets.Kind.CANCELLATION: interfaces.Outcome.CANCELLED, + packets.Kind.EXPIRATION: interfaces.Outcome.EXPIRED, + packets.Kind.RECEPTION_FAILURE: interfaces.Outcome.RECEPTION_FAILURE, + packets.Kind.TRANSMISSION_FAILURE: interfaces.Outcome.TRANSMISSION_FAILURE, + packets.Kind.SERVICER_FAILURE: interfaces.Outcome.SERVICER_FAILURE, + packets.Kind.SERVICED_FAILURE: interfaces.Outcome.SERVICED_FAILURE, } +@enum.unique +class _Requirement(enum.Enum): + """Symbols indicating events required for termination.""" + + EMISSION = 'emission' + TRANSMISSION = 'transmission' + INGESTION = 'ingestion' + +_FRONT_NOT_LISTENING_REQUIREMENTS = (_Requirement.TRANSMISSION,) +_BACK_NOT_LISTENING_REQUIREMENTS = ( + _Requirement.EMISSION, _Requirement.INGESTION,) +_LISTENING_REQUIREMENTS = ( + _Requirement.TRANSMISSION, _Requirement.INGESTION,) + + class _TerminationManager(_interfaces.TerminationManager): """An implementation of _interfaces.TerminationManager.""" @@ -68,9 +76,8 @@ class _TerminationManager(_interfaces.TerminationManager): work_pool: A thread pool in which customer work will be done. utility_pool: A thread pool in which work utility work will be done. action: An action to call on operation termination. - requirements: A combination of _EMISSION, _TRANSMISSION, and _INGESTION - identifying what must finish for the operation to be considered - completed. + requirements: A combination of _Requirement values identifying what + must finish for the operation to be considered completed. local_failure: A packets.Kind specifying what constitutes local failure of customer work. """ @@ -137,21 +144,21 @@ class _TerminationManager(_interfaces.TerminationManager): def emission_complete(self): """See superclass method for specification.""" if self._outstanding_requirements is not None: - self._outstanding_requirements.discard(_EMISSION) + self._outstanding_requirements.discard(_Requirement.EMISSION) if not self._outstanding_requirements: self._terminate(packets.Kind.COMPLETION) def transmission_complete(self): """See superclass method for specification.""" if self._outstanding_requirements is not None: - self._outstanding_requirements.discard(_TRANSMISSION) + self._outstanding_requirements.discard(_Requirement.TRANSMISSION) if not self._outstanding_requirements: self._terminate(packets.Kind.COMPLETION) def ingestion_complete(self): """See superclass method for specification.""" if self._outstanding_requirements is not None: - self._outstanding_requirements.discard(_INGESTION) + self._outstanding_requirements.discard(_Requirement.INGESTION) if not self._outstanding_requirements: self._terminate(packets.Kind.COMPLETION) @@ -163,39 +170,46 @@ class _TerminationManager(_interfaces.TerminationManager): self._terminate(kind) -def front_termination_manager(work_pool, utility_pool, action, subscription): +def front_termination_manager( + work_pool, utility_pool, action, subscription_kind): """Creates a TerminationManager appropriate for front-side use. Args: work_pool: A thread pool in which customer work will be done. utility_pool: A thread pool in which work utility work will be done. action: An action to call on operation termination. - subscription: One of interfaces.FULL, interfaces.termination_only, or - interfaces.NONE. + subscription_kind: An interfaces.ServicedSubscription.Kind value. Returns: A TerminationManager appropriate for front-side use. """ + if subscription_kind is interfaces.ServicedSubscription.Kind.NONE: + requirements = _FRONT_NOT_LISTENING_REQUIREMENTS + else: + requirements = _LISTENING_REQUIREMENTS + return _TerminationManager( - work_pool, utility_pool, action, - _FRONT_NOT_LISTENING_REQUIREMENTS if subscription == interfaces.NONE else - _LISTENING_REQUIREMENTS, packets.Kind.SERVICED_FAILURE) + work_pool, utility_pool, action, requirements, + packets.Kind.SERVICED_FAILURE) -def back_termination_manager(work_pool, utility_pool, action, subscription): +def back_termination_manager(work_pool, utility_pool, action, subscription_kind): """Creates a TerminationManager appropriate for back-side use. Args: work_pool: A thread pool in which customer work will be done. utility_pool: A thread pool in which work utility work will be done. action: An action to call on operation termination. - subscription: One of interfaces.FULL, interfaces.termination_only, or - interfaces.NONE. + subscription_kind: An interfaces.ServicedSubscription.Kind value. Returns: A TerminationManager appropriate for back-side use. """ + if subscription_kind is interfaces.ServicedSubscription.Kind.NONE: + requirements = _BACK_NOT_LISTENING_REQUIREMENTS + else: + requirements = _LISTENING_REQUIREMENTS + return _TerminationManager( - work_pool, utility_pool, action, - _BACK_NOT_LISTENING_REQUIREMENTS if subscription == interfaces.NONE else - _LISTENING_REQUIREMENTS, packets.Kind.SERVICER_FAILURE) + work_pool, utility_pool, action, requirements, + packets.Kind.SERVICER_FAILURE) diff --git a/src/python/src/_framework/base/packets/_transmission.py b/src/python/src/_framework/base/packets/_transmission.py index 006128774d..24fe6e6164 100644 --- a/src/python/src/_framework/base/packets/_transmission.py +++ b/src/python/src/_framework/base/packets/_transmission.py @@ -91,20 +91,19 @@ class _Packetizer(object): class _FrontPacketizer(_Packetizer): """Front-side packet-creating behavior.""" - def __init__(self, name, subscription, trace_id, timeout): + def __init__(self, name, subscription_kind, trace_id, timeout): """Constructor. Args: name: The name of the operation. - subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or - interfaces.NONE describing the interest the front has in packets sent - from the back. + subscription_kind: An interfaces.ServicedSubscription.Kind value + describing the interest the front has in packets sent from the back. trace_id: A uuid.UUID identifying a set of related operations to which this operation belongs. timeout: A length of time in seconds to allow for the entire operation. """ self._name = name - self._subscription = subscription + self._subscription_kind = subscription_kind self._trace_id = trace_id self._timeout = timeout @@ -114,13 +113,13 @@ class _FrontPacketizer(_Packetizer): return packets.FrontToBackPacket( operation_id, sequence_number, packets.Kind.COMPLETION if complete else packets.Kind.CONTINUATION, - self._name, self._subscription, self._trace_id, payload, + self._name, self._subscription_kind, self._trace_id, payload, self._timeout) else: return packets.FrontToBackPacket( operation_id, 0, packets.Kind.ENTIRE if complete else packets.Kind.COMMENCEMENT, - self._name, self._subscription, self._trace_id, payload, + self._name, self._subscription_kind, self._trace_id, payload, self._timeout) def packetize_abortion(self, operation_id, sequence_number, kind): @@ -335,8 +334,8 @@ class _TransmittingTransmissionManager(TransmissionManager): def front_transmission_manager( - lock, pool, callback, operation_id, name, subscription, trace_id, timeout, - termination_manager): + lock, pool, callback, operation_id, name, subscription_kind, trace_id, + timeout, termination_manager): """Creates a TransmissionManager appropriate for front-side use. Args: @@ -347,9 +346,8 @@ def front_transmission_manager( of the operation. operation_id: The operation's ID. name: The name of the operation. - subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or - interfaces.NONE describing the interest the front has in packets sent - from the back. + subscription_kind: An interfaces.ServicedSubscription.Kind value + describing the interest the front has in packets sent from the back. trace_id: A uuid.UUID identifying a set of related operations to which this operation belongs. timeout: A length of time in seconds to allow for the entire operation. @@ -361,12 +359,13 @@ def front_transmission_manager( """ return _TransmittingTransmissionManager( lock, pool, callback, operation_id, _FrontPacketizer( - name, subscription, trace_id, timeout), + name, subscription_kind, trace_id, timeout), termination_manager) def back_transmission_manager( - lock, pool, callback, operation_id, termination_manager, subscription): + lock, pool, callback, operation_id, termination_manager, + subscription_kind): """Creates a TransmissionManager appropriate for back-side use. Args: @@ -378,14 +377,13 @@ def back_transmission_manager( operation_id: The operation's ID. termination_manager: The _interfaces.TerminationManager associated with this operation. - subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or - interfaces.NONE describing the interest the front has in packets sent from - the back. + subscription_kind: An interfaces.ServicedSubscription.Kind value + describing the interest the front has in packets sent from the back. Returns: A TransmissionManager appropriate for back-side use. """ - if subscription == interfaces.NONE: + if subscription_kind is interfaces.ServicedSubscription.Kind.NONE: return _EmptyTransmissionManager() else: return _TransmittingTransmissionManager( diff --git a/src/python/src/_framework/base/packets/packets.py b/src/python/src/_framework/base/packets/packets.py index 1315ca650e..f7503bdcd6 100644 --- a/src/python/src/_framework/base/packets/packets.py +++ b/src/python/src/_framework/base/packets/packets.py @@ -71,10 +71,9 @@ class FrontToBackPacket( Kind.RECEPTION_FAILURE, or Kind.TRANSMISSION_FAILURE. name: The name of an operation. Must be present if kind is Kind.COMMENCEMENT or Kind.ENTIRE. Must be None for any other kind. - subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or - interfaces.NONE describing the interest the front has in packets sent from - the back. Must be present if kind is Kind.COMMENCEMENT or Kind.ENTIRE. - Must be None for any other kind. + subscription: An interfaces.ServicedSubscription.Kind value describing the + interest the front has in packets sent from the back. Must be present if + kind is Kind.COMMENCEMENT or Kind.ENTIRE. Must be None for any other kind. trace_id: A uuid.UUID identifying a set of related operations to which this operation belongs. May be None. payload: A customer payload object. Must be present if kind is diff --git a/src/python/src/_framework/base/util.py b/src/python/src/_framework/base/util.py index 6bbd18a59a..35ce0443fc 100644 --- a/src/python/src/_framework/base/util.py +++ b/src/python/src/_framework/base/util.py @@ -36,13 +36,14 @@ from _framework.base import interfaces class _ServicedSubscription( - collections.namedtuple('_ServicedSubscription', ['category', 'ingestor']), + collections.namedtuple('_ServicedSubscription', ['kind', 'ingestor']), interfaces.ServicedSubscription): """See interfaces.ServicedSubscription for specification.""" -_NONE_SUBSCRIPTION = _ServicedSubscription(interfaces.NONE, None) +_NONE_SUBSCRIPTION = _ServicedSubscription( + interfaces.ServicedSubscription.Kind.NONE, None) _TERMINATION_ONLY_SUBSCRIPTION = _ServicedSubscription( - interfaces.TERMINATION_ONLY, None) + interfaces.ServicedSubscription.Kind.TERMINATION_ONLY, None) def none_serviced_subscription(): @@ -72,12 +73,14 @@ def full_serviced_subscription(ingestor): """Creates a "full" interfaces.ServicedSubscription object. Args: - ingestor: A ServicedIngestor. + ingestor: An interfaces.ServicedIngestor. Returns: - A ServicedSubscription object indicating a full subscription. + An interfaces.ServicedSubscription object indicating a full + subscription. """ - return _ServicedSubscription(interfaces.FULL, ingestor) + return _ServicedSubscription( + interfaces.ServicedSubscription.Kind.FULL, ingestor) def wait_for_idle(end): diff --git a/src/python/src/_framework/face/_calls.py b/src/python/src/_framework/face/_calls.py index ab58e6378b..9128aef7c4 100644 --- a/src/python/src/_framework/face/_calls.py +++ b/src/python/src/_framework/face/_calls.py @@ -94,7 +94,7 @@ class _OperationCancellableIterator(interfaces.CancellableIterator): def cancel(self): self._operation.cancel() - self._rendezvous.set_outcome(base_interfaces.CANCELLED) + self._rendezvous.set_outcome(base_interfaces.Outcome.CANCELLED) class _OperationFuture(future.Future): @@ -150,15 +150,12 @@ class _OperationFuture(future.Future): """Indicates to this object that the operation has terminated. Args: - operation_outcome: One of base_interfaces.COMPLETED, - base_interfaces.CANCELLED, base_interfaces.EXPIRED, - base_interfaces.RECEPTION_FAILURE, base_interfaces.TRANSMISSION_FAILURE, - base_interfaces.SERVICED_FAILURE, or base_interfaces.SERVICER_FAILURE - indicating the categorical outcome of the operation. + operation_outcome: A base_interfaces.Outcome value indicating the + outcome of the operation. """ with self._condition: if (self._outcome is None and - operation_outcome != base_interfaces.COMPLETED): + operation_outcome is not base_interfaces.Outcome.COMPLETED): self._outcome = future.raised( _control.abortion_outcome_to_exception(operation_outcome)) self._condition.notify_all() diff --git a/src/python/src/_framework/face/_control.py b/src/python/src/_framework/face/_control.py index 2c221321d6..9f1bf6d5fd 100644 --- a/src/python/src/_framework/face/_control.py +++ b/src/python/src/_framework/face/_control.py @@ -40,13 +40,17 @@ from _framework.foundation import stream INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Face) Internal Error! :-(' _OPERATION_OUTCOME_TO_RPC_ABORTION = { - base_interfaces.CANCELLED: interfaces.CANCELLED, - base_interfaces.EXPIRED: interfaces.EXPIRED, - base_interfaces.RECEPTION_FAILURE: interfaces.NETWORK_FAILURE, - base_interfaces.TRANSMISSION_FAILURE: interfaces.NETWORK_FAILURE, - base_interfaces.SERVICED_FAILURE: interfaces.SERVICED_FAILURE, - base_interfaces.SERVICER_FAILURE: interfaces.SERVICER_FAILURE, - } + base_interfaces.Outcome.CANCELLED: interfaces.Abortion.CANCELLED, + base_interfaces.Outcome.EXPIRED: interfaces.Abortion.EXPIRED, + base_interfaces.Outcome.RECEPTION_FAILURE: + interfaces.Abortion.NETWORK_FAILURE, + base_interfaces.Outcome.TRANSMISSION_FAILURE: + interfaces.Abortion.NETWORK_FAILURE, + base_interfaces.Outcome.SERVICED_FAILURE: + interfaces.Abortion.SERVICED_FAILURE, + base_interfaces.Outcome.SERVICER_FAILURE: + interfaces.Abortion.SERVICER_FAILURE, +} def _as_operation_termination_callback(rpc_abortion_callback): @@ -59,13 +63,13 @@ def _as_operation_termination_callback(rpc_abortion_callback): def _abortion_outcome_to_exception(abortion_outcome): - if abortion_outcome == base_interfaces.CANCELLED: + if abortion_outcome == base_interfaces.Outcome.CANCELLED: return exceptions.CancellationError() - elif abortion_outcome == base_interfaces.EXPIRED: + elif abortion_outcome == base_interfaces.Outcome.EXPIRED: return exceptions.ExpirationError() - elif abortion_outcome == base_interfaces.SERVICER_FAILURE: + elif abortion_outcome == base_interfaces.Outcome.SERVICER_FAILURE: return exceptions.ServicerError() - elif abortion_outcome == base_interfaces.SERVICED_FAILURE: + elif abortion_outcome == base_interfaces.Outcome.SERVICED_FAILURE: return exceptions.ServicedError() else: return exceptions.NetworkError() @@ -133,7 +137,7 @@ class Rendezvous(stream.Consumer): def set_outcome(self, outcome): with self._condition: - if outcome != base_interfaces.COMPLETED: + if outcome is not base_interfaces.Outcome.COMPLETED: self._abortion = outcome self._condition.notify() diff --git a/src/python/src/_framework/face/interfaces.py b/src/python/src/_framework/face/interfaces.py index 0cc7c70df3..2480454369 100644 --- a/src/python/src/_framework/face/interfaces.py +++ b/src/python/src/_framework/face/interfaces.py @@ -30,6 +30,7 @@ """Interfaces for the face layer of RPC Framework.""" import abc +import enum # exceptions, abandonment, and future are referenced from specification in this # module. @@ -58,14 +59,15 @@ class CancellableIterator(object): raise NotImplementedError() -# Constants that categorize RPC abortion. -# TODO(nathaniel): Learn and use Python's enum library for this de facto -# enumerated type -CANCELLED = 'abortion: cancelled' -EXPIRED = 'abortion: expired' -NETWORK_FAILURE = 'abortion: network failure' -SERVICED_FAILURE = 'abortion: serviced failure' -SERVICER_FAILURE = 'abortion: servicer failure' +@enum.unique +class Abortion(enum.Enum): + """Categories of RPC abortion.""" + + CANCELLED = 'cancelled' + EXPIRED = 'expired' + NETWORK_FAILURE = 'network failure' + SERVICED_FAILURE = 'serviced failure' + SERVICER_FAILURE = 'servicer failure' class RpcContext(object): @@ -93,9 +95,8 @@ class RpcContext(object): """Registers a callback to be called if the RPC is aborted. Args: - abortion_callback: A callable to be called and passed one of CANCELLED, - EXPIRED, NETWORK_FAILURE, SERVICED_FAILURE, or SERVICER_FAILURE in the - event of RPC abortion. + abortion_callback: A callable to be called and passed an Abortion value + in the event of RPC abortion. """ raise NotImplementedError() @@ -474,9 +475,8 @@ class Stub(object): request: The request value for the RPC. response_callback: A callback to be called to accept the response value of the RPC. - abortion_callback: A callback to be called to accept one of CANCELLED, - EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC - abortion. + abortion_callback: A callback to be called and passed an Abortion value + in the event of RPC abortion. timeout: A duration of time in seconds to allow for the RPC. Returns: @@ -494,9 +494,8 @@ class Stub(object): request: The request value for the RPC. response_consumer: A stream.Consumer to be called to accept the response values of the RPC. - abortion_callback: A callback to be called to accept one of CANCELLED, - EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC - abortion. + abortion_callback: A callback to be called and passed an Abortion value + in the event of RPC abortion. timeout: A duration of time in seconds to allow for the RPC. Returns: @@ -513,9 +512,8 @@ class Stub(object): name: The RPC method name. response_callback: A callback to be called to accept the response value of the RPC. - abortion_callback: A callback to be called to accept one of CANCELLED, - EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC - abortion. + abortion_callback: A callback to be called and passed an Abortion value + in the event of RPC abortion. timeout: A duration of time in seconds to allow for the RPC. Returns: @@ -533,9 +531,8 @@ class Stub(object): name: The RPC method name. response_consumer: A stream.Consumer to be called to accept the response values of the RPC. - abortion_callback: A callback to be called to accept one of CANCELLED, - EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC - abortion. + abortion_callback: A callback to be called and passed an Abortion value + in the event of RPC abortion. timeout: A duration of time in seconds to allow for the RPC. Returns: diff --git a/src/python/src/_framework/face/testing/event_invocation_synchronous_event_service_test_case.py b/src/python/src/_framework/face/testing/event_invocation_synchronous_event_service_test_case.py index dba73a9368..cb786f500c 100644 --- a/src/python/src/_framework/face/testing/event_invocation_synchronous_event_service_test_case.py +++ b/src/python/src/_framework/face/testing/event_invocation_synchronous_event_service_test_case.py @@ -176,7 +176,7 @@ class EventInvocationSynchronousEventServiceTestCase( name, request, callback.complete, callback.abort, _TIMEOUT) callback.block_until_terminated() - self.assertEqual(interfaces.EXPIRED, callback.abortion()) + self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion()) def testExpiredUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -190,7 +190,7 @@ class EventInvocationSynchronousEventServiceTestCase( name, request, callback, callback.abort, _TIMEOUT) callback.block_until_terminated() - self.assertEqual(interfaces.EXPIRED, callback.abortion()) + self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion()) def testExpiredStreamRequestUnaryResponse(self): for name, test_messages_sequence in ( @@ -202,7 +202,7 @@ class EventInvocationSynchronousEventServiceTestCase( name, callback.complete, callback.abort, _TIMEOUT) callback.block_until_terminated() - self.assertEqual(interfaces.EXPIRED, callback.abortion()) + self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion()) def testExpiredStreamRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -217,7 +217,7 @@ class EventInvocationSynchronousEventServiceTestCase( request_consumer.consume(request) callback.block_until_terminated() - self.assertEqual(interfaces.EXPIRED, callback.abortion()) + self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion()) def testFailedUnaryRequestUnaryResponse(self): for name, test_messages_sequence in ( @@ -231,7 +231,7 @@ class EventInvocationSynchronousEventServiceTestCase( name, request, callback.complete, callback.abort, _TIMEOUT) callback.block_until_terminated() - self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion()) + self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion()) def testFailedUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -245,7 +245,7 @@ class EventInvocationSynchronousEventServiceTestCase( name, request, callback, callback.abort, _TIMEOUT) callback.block_until_terminated() - self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion()) + self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion()) def testFailedStreamRequestUnaryResponse(self): for name, test_messages_sequence in ( @@ -262,7 +262,7 @@ class EventInvocationSynchronousEventServiceTestCase( request_consumer.terminate() callback.block_until_terminated() - self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion()) + self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion()) def testFailedStreamRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -279,7 +279,7 @@ class EventInvocationSynchronousEventServiceTestCase( request_consumer.terminate() callback.block_until_terminated() - self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion()) + self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion()) def testParallelInvocations(self): for name, test_messages_sequence in ( @@ -321,7 +321,7 @@ class EventInvocationSynchronousEventServiceTestCase( call.cancel() callback.block_until_terminated() - self.assertEqual(interfaces.CANCELLED, callback.abortion()) + self.assertEqual(interfaces.Abortion.CANCELLED, callback.abortion()) def testCancelledUnaryRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -335,7 +335,7 @@ class EventInvocationSynchronousEventServiceTestCase( call.cancel() callback.block_until_terminated() - self.assertEqual(interfaces.CANCELLED, callback.abortion()) + self.assertEqual(interfaces.Abortion.CANCELLED, callback.abortion()) def testCancelledStreamRequestUnaryResponse(self): for name, test_messages_sequence in ( @@ -351,7 +351,7 @@ class EventInvocationSynchronousEventServiceTestCase( call.cancel() callback.block_until_terminated() - self.assertEqual(interfaces.CANCELLED, callback.abortion()) + self.assertEqual(interfaces.Abortion.CANCELLED, callback.abortion()) def testCancelledStreamRequestStreamResponse(self): for name, test_messages_sequence in ( @@ -364,4 +364,4 @@ class EventInvocationSynchronousEventServiceTestCase( call.cancel() callback.block_until_terminated() - self.assertEqual(interfaces.CANCELLED, callback.abortion()) + self.assertEqual(interfaces.Abortion.CANCELLED, callback.abortion()) diff --git a/test/core/iomgr/poll_kick_posix_test.c b/test/core/iomgr/poll_kick_posix_test.c index 3c6d815c9d..2c5b444d3a 100644 --- a/test/core/iomgr/poll_kick_posix_test.c +++ b/test/core/iomgr/poll_kick_posix_test.c @@ -105,6 +105,7 @@ static void test_over_free(void) { grpc_pollset_kick_post_poll(&kick_state[i]); grpc_pollset_kick_destroy(&kick_state[i]); } + gpr_free(kick_state); } static void run_tests(void) { diff --git a/tools/run_tests/build_python.sh b/tools/run_tests/build_python.sh index 46e5797f62..4abb412c95 100755 --- a/tools/run_tests/build_python.sh +++ b/tools/run_tests/build_python.sh @@ -7,6 +7,5 @@ cd $(dirname $0)/../.. root=`pwd` virtualenv python2.7_virtual_environment -python2.7_virtual_environment/bin/pip install enum34==1.0.4 futures==2.2.0 -python2.7_virtual_environment/bin/pip install third_party/protobuf/python +python2.7_virtual_environment/bin/pip install enum34==1.0.4 futures==2.2.0 protobuf==2.6.1 python2.7_virtual_environment/bin/pip install src/python |