aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-02-02 14:43:23 -0800
committerGravatar Craig Tiller <ctiller@google.com>2015-02-02 14:43:23 -0800
commite863e1b855dcb4d1d156d3fa3d74467cedf3eb8f (patch)
tree2e0026e16c8790b9cef98fdf3592c3e4d07b22bd
parenta7170ac4ad04591a5db33d5271ef6ce30662e9fd (diff)
parentc10f5c97806f84f8805d236cd280f8ff3b26b574 (diff)
Merge github.com:google/grpc into async-api
-rw-r--r--include/grpc/support/port_platform.h6
-rw-r--r--src/core/iomgr/pollset_kick.c1
-rw-r--r--src/core/iomgr/wakeup_fd_nospecial.c9
-rw-r--r--src/core/iomgr/wakeup_fd_pipe.c8
-rw-r--r--src/core/iomgr/wakeup_fd_pipe.h2
-rw-r--r--src/core/iomgr/wakeup_fd_posix.c14
-rw-r--r--src/core/iomgr/wakeup_fd_posix.h31
-rw-r--r--src/core/support/log_posix.c2
-rw-r--r--src/python/src/_adapter/_links_test.py11
-rw-r--r--src/python/src/_adapter/_lonely_rear_link_test.py2
-rw-r--r--src/python/src/_adapter/fore.py3
-rw-r--r--src/python/src/_framework/base/interfaces.py49
-rw-r--r--src/python/src/_framework/base/interfaces_test.py44
-rw-r--r--src/python/src/_framework/base/packets/_ends.py25
-rw-r--r--src/python/src/_framework/base/packets/_ingestion.py2
-rw-r--r--src/python/src/_framework/base/packets/_interfaces.py5
-rw-r--r--src/python/src/_framework/base/packets/_termination.py82
-rw-r--r--src/python/src/_framework/base/packets/_transmission.py34
-rw-r--r--src/python/src/_framework/base/packets/packets.py7
-rw-r--r--src/python/src/_framework/base/util.py15
-rw-r--r--src/python/src/_framework/face/_calls.py11
-rw-r--r--src/python/src/_framework/face/_control.py28
-rw-r--r--src/python/src/_framework/face/interfaces.py43
-rw-r--r--src/python/src/_framework/face/testing/event_invocation_synchronous_event_service_test_case.py24
-rw-r--r--test/core/iomgr/poll_kick_posix_test.c1
-rwxr-xr-xtools/run_tests/build_python.sh3
26 files changed, 249 insertions, 213 deletions
diff --git a/include/grpc/support/port_platform.h b/include/grpc/support/port_platform.h
index 58fce64ff1..2bf5348315 100644
--- a/include/grpc/support/port_platform.h
+++ b/include/grpc/support/port_platform.h
@@ -56,6 +56,8 @@
#define GPR_CPU_LINUX 1
#define GPR_GCC_SYNC 1
#define GPR_POSIX_MULTIPOLL_WITH_POLL 1
+#define GPR_POSIX_WAKEUP_FD 1
+#define GPR_LINUX_EVENTFD 1
#define GPR_POSIX_SOCKET 1
#define GPR_POSIX_SOCKETADDR 1
#define GPR_POSIX_SOCKETUTILS 1
@@ -68,7 +70,7 @@
#define GPR_GCC_ATOMIC 1
#define GPR_LINUX 1
#define GPR_POSIX_MULTIPOLL_WITH_POLL 1
-#define GPR_POSIX_HAS_SPECIAL_WAKEUP_FD 1
+#define GPR_POSIX_WAKEUP_FD 1
#define GPR_LINUX_EVENTFD 1
#define GPR_POSIX_SOCKET 1
#define GPR_POSIX_SOCKETADDR 1
@@ -86,6 +88,8 @@
#define GPR_GCC_ATOMIC 1
#define GPR_POSIX_LOG 1
#define GPR_POSIX_MULTIPOLL_WITH_POLL 1
+#define GPR_POSIX_WAKEUP_FD 1
+#define GPR_POSIX_NO_SPECIAL_WAKEUP_FD 1
#define GPR_POSIX_SOCKET 1
#define GPR_POSIX_SOCKETADDR 1
#define GPR_POSIX_SOCKETUTILS 1
diff --git a/src/core/iomgr/pollset_kick.c b/src/core/iomgr/pollset_kick.c
index 42b110d124..238ec75c61 100644
--- a/src/core/iomgr/pollset_kick.c
+++ b/src/core/iomgr/pollset_kick.c
@@ -138,6 +138,7 @@ void grpc_pollset_kick_kick(grpc_pollset_kick_state *kick_state) {
}
void grpc_pollset_kick_global_init_fallback_fd(void) {
+ gpr_mu_init(&fd_freelist_mu);
grpc_wakeup_fd_global_init_force_fallback();
}
diff --git a/src/core/iomgr/wakeup_fd_nospecial.c b/src/core/iomgr/wakeup_fd_nospecial.c
index 21e8074d50..c1038bf379 100644
--- a/src/core/iomgr/wakeup_fd_nospecial.c
+++ b/src/core/iomgr/wakeup_fd_nospecial.c
@@ -38,16 +38,17 @@
#include <grpc/support/port_platform.h>
-#ifndef GPR_POSIX_HAS_SPECIAL_WAKEUP_FD
+#ifdef GPR_POSIX_NO_SPECIAL_WAKEUP_FD
-#include "src/core/iomgr/wakeup_fd.h"
+#include "src/core/iomgr/wakeup_fd_posix.h"
+#include <stddef.h>
static int check_availability_invalid(void) {
return 0;
}
-const grpc_wakeup_fd_vtable specialized_wakeup_fd_vtable = {
+const grpc_wakeup_fd_vtable grpc_specialized_wakeup_fd_vtable = {
NULL, NULL, NULL, NULL, check_availability_invalid
};
-#endif /* GPR_POSIX_HAS_SPECIAL_WAKEUP */
+#endif /* GPR_POSIX_NO_SPECIAL_WAKEUP_FD */
diff --git a/src/core/iomgr/wakeup_fd_pipe.c b/src/core/iomgr/wakeup_fd_pipe.c
index f36e6eeb9f..f895478990 100644
--- a/src/core/iomgr/wakeup_fd_pipe.c
+++ b/src/core/iomgr/wakeup_fd_pipe.c
@@ -31,7 +31,10 @@
*
*/
-/* TODO(klempner): Allow this code to be disabled. */
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_POSIX_WAKEUP_FD
+
#include "src/core/iomgr/wakeup_fd_posix.h"
#include <errno.h>
@@ -87,7 +90,8 @@ static int pipe_check_availability(void) {
return 1;
}
-const grpc_wakeup_fd_vtable pipe_wakeup_fd_vtable = {
+const grpc_wakeup_fd_vtable grpc_pipe_wakeup_fd_vtable = {
pipe_create, pipe_consume, pipe_wakeup, pipe_destroy, pipe_check_availability
};
+#endif /* GPR_POSIX_WAKUP_FD */
diff --git a/src/core/iomgr/wakeup_fd_pipe.h b/src/core/iomgr/wakeup_fd_pipe.h
index fc2898f570..a2fcde5b55 100644
--- a/src/core/iomgr/wakeup_fd_pipe.h
+++ b/src/core/iomgr/wakeup_fd_pipe.h
@@ -36,6 +36,6 @@
#include "src/core/iomgr/wakeup_fd_posix.h"
-extern grpc_wakeup_fd_vtable pipe_wakeup_fd_vtable;
+extern grpc_wakeup_fd_vtable grpc_pipe_wakeup_fd_vtable;
#endif /* __GRPC_INTERNAL_IOMGR_WAKEUP_FD_PIPE_H_ */
diff --git a/src/core/iomgr/wakeup_fd_posix.c b/src/core/iomgr/wakeup_fd_posix.c
index 9107cf37b1..d3cc3ec570 100644
--- a/src/core/iomgr/wakeup_fd_posix.c
+++ b/src/core/iomgr/wakeup_fd_posix.c
@@ -31,6 +31,10 @@
*
*/
+#include <grpc/support/port_platform.h>
+
+#ifdef GPR_POSIX_WAKEUP_FD
+
#include "src/core/iomgr/wakeup_fd_posix.h"
#include "src/core/iomgr/wakeup_fd_pipe.h"
#include <stddef.h>
@@ -38,15 +42,15 @@
static const grpc_wakeup_fd_vtable *wakeup_fd_vtable = NULL;
void grpc_wakeup_fd_global_init(void) {
- if (specialized_wakeup_fd_vtable.check_availability()) {
- wakeup_fd_vtable = &specialized_wakeup_fd_vtable;
+ if (grpc_specialized_wakeup_fd_vtable.check_availability()) {
+ wakeup_fd_vtable = &grpc_specialized_wakeup_fd_vtable;
} else {
- wakeup_fd_vtable = &pipe_wakeup_fd_vtable;
+ wakeup_fd_vtable = &grpc_pipe_wakeup_fd_vtable;
}
}
void grpc_wakeup_fd_global_init_force_fallback(void) {
- wakeup_fd_vtable = &pipe_wakeup_fd_vtable;
+ wakeup_fd_vtable = &grpc_pipe_wakeup_fd_vtable;
}
void grpc_wakeup_fd_global_destroy(void) {
@@ -68,3 +72,5 @@ void grpc_wakeup_fd_wakeup(grpc_wakeup_fd_info *fd_info) {
void grpc_wakeup_fd_destroy(grpc_wakeup_fd_info *fd_info) {
wakeup_fd_vtable->destroy(fd_info);
}
+
+#endif /* GPR_POSIX_WAKEUP_FD */
diff --git a/src/core/iomgr/wakeup_fd_posix.h b/src/core/iomgr/wakeup_fd_posix.h
index c2769afb2a..75bb9fc766 100644
--- a/src/core/iomgr/wakeup_fd_posix.h
+++ b/src/core/iomgr/wakeup_fd_posix.h
@@ -62,29 +62,14 @@
#ifndef __GRPC_INTERNAL_IOMGR_WAKEUP_FD_POSIX_H_
#define __GRPC_INTERNAL_IOMGR_WAKEUP_FD_POSIX_H_
-typedef struct grpc_wakeup_fd_info grpc_wakeup_fd_info;
-
void grpc_wakeup_fd_global_init(void);
void grpc_wakeup_fd_global_destroy(void);
-
-void grpc_wakeup_fd_create(grpc_wakeup_fd_info *fd_info);
-void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd_info *fd_info);
-void grpc_wakeup_fd_wakeup(grpc_wakeup_fd_info *fd_info);
-void grpc_wakeup_fd_destroy(grpc_wakeup_fd_info *fd_info);
-
-#define GRPC_WAKEUP_FD_GET_READ_FD(fd_info) ((fd_info)->read_fd)
-
/* Force using the fallback implementation. This is intended for testing
* purposes only.*/
void grpc_wakeup_fd_global_init_force_fallback(void);
-/* Private structures; don't access their fields directly outside of wakeup fd
- * code. */
-struct grpc_wakeup_fd_info {
- int read_fd;
- int write_fd;
-};
+typedef struct grpc_wakeup_fd_info grpc_wakeup_fd_info;
typedef struct grpc_wakeup_fd_vtable {
void (*create)(grpc_wakeup_fd_info *fd_info);
@@ -95,8 +80,20 @@ typedef struct grpc_wakeup_fd_vtable {
int (*check_availability)(void);
} grpc_wakeup_fd_vtable;
+struct grpc_wakeup_fd_info {
+ int read_fd;
+ int write_fd;
+};
+
+#define GRPC_WAKEUP_FD_GET_READ_FD(fd_info) ((fd_info)->read_fd)
+
+void grpc_wakeup_fd_create(grpc_wakeup_fd_info *fd_info);
+void grpc_wakeup_fd_consume_wakeup(grpc_wakeup_fd_info *fd_info);
+void grpc_wakeup_fd_wakeup(grpc_wakeup_fd_info *fd_info);
+void grpc_wakeup_fd_destroy(grpc_wakeup_fd_info *fd_info);
+
/* Defined in some specialized implementation's .c file, or by
* wakeup_fd_nospecial.c if no such implementation exists. */
-extern const grpc_wakeup_fd_vtable specialized_wakeup_fd_vtable;
+extern const grpc_wakeup_fd_vtable grpc_specialized_wakeup_fd_vtable;
#endif /* __GRPC_INTERNAL_IOMGR_WAKEUP_FD_POSIX_H_ */
diff --git a/src/core/support/log_posix.c b/src/core/support/log_posix.c
index 1292c9e8c3..ab2d2e5a74 100644
--- a/src/core/support/log_posix.c
+++ b/src/core/support/log_posix.c
@@ -64,7 +64,7 @@ void gpr_log(const char *file, int line, gpr_log_severity severity,
va_end(args);
if (ret < 0) {
message = NULL;
- } else if (ret <= sizeof(buf) - 1) {
+ } else if ((size_t)ret <= sizeof(buf) - 1) {
message = buf;
} else {
message = allocated = gpr_malloc(ret + 1);
diff --git a/src/python/src/_adapter/_links_test.py b/src/python/src/_adapter/_links_test.py
index 94f17d007b..8341460a9a 100644
--- a/src/python/src/_adapter/_links_test.py
+++ b/src/python/src/_adapter/_links_test.py
@@ -80,8 +80,8 @@ class RoundTripTest(unittest.TestCase):
rear_link.start()
front_to_back_ticket = tickets.FrontToBackPacket(
- test_operation_id, 0, tickets.Kind.ENTIRE, test_method, interfaces.FULL,
- None, None, _TIMEOUT)
+ test_operation_id, 0, tickets.Kind.ENTIRE, test_method,
+ interfaces.ServicedSubscription.Kind.FULL, None, None, _TIMEOUT)
rear_link.accept_front_to_back_ticket(front_to_back_ticket)
with test_fore_link.condition:
@@ -133,8 +133,9 @@ class RoundTripTest(unittest.TestCase):
rear_link.start()
front_to_back_ticket = tickets.FrontToBackPacket(
- test_operation_id, 0, tickets.Kind.ENTIRE, test_method, interfaces.FULL,
- None, test_front_to_back_datum, _TIMEOUT)
+ test_operation_id, 0, tickets.Kind.ENTIRE, test_method,
+ interfaces.ServicedSubscription.Kind.FULL, None,
+ test_front_to_back_datum, _TIMEOUT)
rear_link.accept_front_to_back_ticket(front_to_back_ticket)
with test_fore_link.condition:
@@ -196,7 +197,7 @@ class RoundTripTest(unittest.TestCase):
commencement_ticket = tickets.FrontToBackPacket(
test_operation_id, 0, tickets.Kind.COMMENCEMENT, test_method,
- interfaces.FULL, None, None, _TIMEOUT)
+ interfaces.ServicedSubscription.Kind.FULL, None, None, _TIMEOUT)
fore_sequence_number = 1
rear_link.accept_front_to_back_ticket(commencement_ticket)
for request in scenario.requests():
diff --git a/src/python/src/_adapter/_lonely_rear_link_test.py b/src/python/src/_adapter/_lonely_rear_link_test.py
index 18279e05a2..7ccdb0b530 100644
--- a/src/python/src/_adapter/_lonely_rear_link_test.py
+++ b/src/python/src/_adapter/_lonely_rear_link_test.py
@@ -69,7 +69,7 @@ class LonelyRearLinkTest(unittest.TestCase):
front_to_back_ticket = packets.FrontToBackPacket(
test_operation_id, 0, front_to_back_ticket_kind, test_method,
- interfaces.FULL, None, None, _TIMEOUT)
+ interfaces.ServicedSubscription.Kind.FULL, None, None, _TIMEOUT)
rear_link.accept_front_to_back_ticket(front_to_back_ticket)
with fore_link.condition:
diff --git a/src/python/src/_adapter/fore.py b/src/python/src/_adapter/fore.py
index 0f00957938..c307e7ce63 100644
--- a/src/python/src/_adapter/fore.py
+++ b/src/python/src/_adapter/fore.py
@@ -116,7 +116,8 @@ class ForeLink(ticket_interfaces.ForeLink):
self._response_serializers[method])
ticket = tickets.FrontToBackPacket(
- call, 0, tickets.Kind.COMMENCEMENT, method, interfaces.FULL, None, None,
+ call, 0, tickets.Kind.COMMENCEMENT, method,
+ interfaces.ServicedSubscription.Kind.FULL, None, None,
service_acceptance.deadline - time.time())
self._rear_link.accept_front_to_back_ticket(ticket)
diff --git a/src/python/src/_framework/base/interfaces.py b/src/python/src/_framework/base/interfaces.py
index de7137cbf7..70030e564d 100644
--- a/src/python/src/_framework/base/interfaces.py
+++ b/src/python/src/_framework/base/interfaces.py
@@ -29,27 +29,24 @@
"""Interfaces defined and used by the base layer of RPC Framework."""
-# TODO(nathaniel): Use Python's new enum library for enumerated types rather
-# than constants merely placed close together.
-
import abc
+import enum
# stream is referenced from specification in this module.
from _framework.foundation import stream # pylint: disable=unused-import
-# Operation outcomes.
-COMPLETED = 'completed'
-CANCELLED = 'cancelled'
-EXPIRED = 'expired'
-RECEPTION_FAILURE = 'reception failure'
-TRANSMISSION_FAILURE = 'transmission failure'
-SERVICER_FAILURE = 'servicer failure'
-SERVICED_FAILURE = 'serviced failure'
-# Subscription categories.
-FULL = 'full'
-TERMINATION_ONLY = 'termination only'
-NONE = 'none'
+@enum.unique
+class Outcome(enum.Enum):
+ """Operation outcomes."""
+
+ COMPLETED = 'completed'
+ CANCELLED = 'cancelled'
+ EXPIRED = 'expired'
+ RECEPTION_FAILURE = 'reception failure'
+ TRANSMISSION_FAILURE = 'transmission failure'
+ SERVICER_FAILURE = 'servicer failure'
+ SERVICED_FAILURE = 'serviced failure'
class OperationContext(object):
@@ -70,9 +67,7 @@ class OperationContext(object):
"""Adds a function to be called upon operation termination.
Args:
- callback: A callable that will be passed one of COMPLETED, CANCELLED,
- EXPIRED, RECEPTION_FAILURE, TRANSMISSION_FAILURE, SERVICER_FAILURE, or
- SERVICED_FAILURE.
+ callback: A callable that will be passed an Outcome value.
"""
raise NotImplementedError()
@@ -167,11 +162,20 @@ class ServicedSubscription(object):
"""A sum type representing a serviced's interest in an operation.
Attributes:
- category: One of FULL, TERMINATION_ONLY, or NONE.
- ingestor: A ServicedIngestor. Must be present if category is FULL.
+ kind: A Kind value.
+ ingestor: A ServicedIngestor. Must be present if kind is Kind.FULL. Must
+ be None if kind is Kind.TERMINATION_ONLY or Kind.NONE.
"""
__metaclass__ = abc.ABCMeta
+ @enum.unique
+ class Kind(enum.Enum):
+ """Kinds of subscription."""
+
+ FULL = 'full'
+ TERMINATION_ONLY = 'termination only'
+ NONE = 'none'
+
class End(object):
"""Common type for entry-point objects on both sides of an operation."""
@@ -182,9 +186,8 @@ class End(object):
"""Reports the number of terminated operations broken down by outcome.
Returns:
- A dictionary from operation outcome constant (COMPLETED, CANCELLED,
- EXPIRED, and so on) to an integer representing the number of operations
- that terminated with that outcome.
+ A dictionary from Outcome value to an integer identifying the number
+ of operations that terminated with that outcome.
"""
raise NotImplementedError()
diff --git a/src/python/src/_framework/base/interfaces_test.py b/src/python/src/_framework/base/interfaces_test.py
index 6eb07ea505..8e26d884ec 100644
--- a/src/python/src/_framework/base/interfaces_test.py
+++ b/src/python/src/_framework/base/interfaces_test.py
@@ -49,13 +49,13 @@ TRIGGERED_FAILURE = 'triggered failure'
WAIT_ON_CONDITION = 'wait on condition'
EMPTY_OUTCOME_DICT = {
- interfaces.COMPLETED: 0,
- interfaces.CANCELLED: 0,
- interfaces.EXPIRED: 0,
- interfaces.RECEPTION_FAILURE: 0,
- interfaces.TRANSMISSION_FAILURE: 0,
- interfaces.SERVICER_FAILURE: 0,
- interfaces.SERVICED_FAILURE: 0,
+ interfaces.Outcome.COMPLETED: 0,
+ interfaces.Outcome.CANCELLED: 0,
+ interfaces.Outcome.EXPIRED: 0,
+ interfaces.Outcome.RECEPTION_FAILURE: 0,
+ interfaces.Outcome.TRANSMISSION_FAILURE: 0,
+ interfaces.Outcome.SERVICER_FAILURE: 0,
+ interfaces.Outcome.SERVICED_FAILURE: 0,
}
@@ -169,7 +169,8 @@ class FrontAndBackTest(object):
SYNCHRONOUS_ECHO, None, True, SMALL_TIMEOUT,
util.none_serviced_subscription(), 'test trace ID')
util.wait_for_idle(self.front)
- self.assertEqual(1, self.front.operation_stats()[interfaces.COMPLETED])
+ self.assertEqual(
+ 1, self.front.operation_stats()[interfaces.Outcome.COMPLETED])
# Assuming nothing really pathological (such as pauses on the order of
# SMALL_TIMEOUT interfering with this test) there are a two different ways
@@ -183,7 +184,7 @@ class FrontAndBackTest(object):
first_back_possibility = EMPTY_OUTCOME_DICT
# (2) The packet arrived at the back and the back completed the operation.
second_back_possibility = dict(EMPTY_OUTCOME_DICT)
- second_back_possibility[interfaces.COMPLETED] = 1
+ second_back_possibility[interfaces.Outcome.COMPLETED] = 1
self.assertIn(
back_operation_stats, (first_back_possibility, second_back_possibility))
# It's true that if the packet had arrived at the back and the back had
@@ -204,8 +205,10 @@ class FrontAndBackTest(object):
util.wait_for_idle(self.front)
util.wait_for_idle(self.back)
- self.assertEqual(1, self.front.operation_stats()[interfaces.COMPLETED])
- self.assertEqual(1, self.back.operation_stats()[interfaces.COMPLETED])
+ self.assertEqual(
+ 1, self.front.operation_stats()[interfaces.Outcome.COMPLETED])
+ self.assertEqual(
+ 1, self.back.operation_stats()[interfaces.Outcome.COMPLETED])
self.assertListEqual([(test_payload, True)], test_consumer.calls)
def testBidirectionalStreamingEcho(self):
@@ -226,8 +229,10 @@ class FrontAndBackTest(object):
util.wait_for_idle(self.front)
util.wait_for_idle(self.back)
- self.assertEqual(1, self.front.operation_stats()[interfaces.COMPLETED])
- self.assertEqual(1, self.back.operation_stats()[interfaces.COMPLETED])
+ self.assertEqual(
+ 1, self.front.operation_stats()[interfaces.Outcome.COMPLETED])
+ self.assertEqual(
+ 1, self.back.operation_stats()[interfaces.Outcome.COMPLETED])
self.assertListEqual(test_payloads, test_consumer.values())
def testCancellation(self):
@@ -242,7 +247,8 @@ class FrontAndBackTest(object):
operation.cancel()
util.wait_for_idle(self.front)
- self.assertEqual(1, self.front.operation_stats()[interfaces.CANCELLED])
+ self.assertEqual(
+ 1, self.front.operation_stats()[interfaces.Outcome.CANCELLED])
util.wait_for_idle(self.back)
self.assertListEqual([], test_consumer.calls)
@@ -260,7 +266,7 @@ class FrontAndBackTest(object):
# The back started processing based on the first packet and then stopped
# upon receiving the cancellation packet.
second_back_possibility = dict(EMPTY_OUTCOME_DICT)
- second_back_possibility[interfaces.CANCELLED] = 1
+ second_back_possibility[interfaces.Outcome.CANCELLED] = 1
self.assertIn(
back_operation_stats, (first_back_possibility, second_back_possibility))
@@ -292,8 +298,10 @@ class FrontAndBackTest(object):
duration = termination_time_cell[0] - start_time
self.assertLessEqual(timeout, duration)
self.assertLess(duration, timeout + allowance)
- self.assertEqual(interfaces.EXPIRED, outcome_cell[0])
+ self.assertEqual(interfaces.Outcome.EXPIRED, outcome_cell[0])
util.wait_for_idle(self.front)
- self.assertEqual(1, self.front.operation_stats()[interfaces.EXPIRED])
+ self.assertEqual(
+ 1, self.front.operation_stats()[interfaces.Outcome.EXPIRED])
util.wait_for_idle(self.back)
- self.assertLessEqual(1, self.back.operation_stats()[interfaces.EXPIRED])
+ self.assertLessEqual(
+ 1, self.back.operation_stats()[interfaces.Outcome.EXPIRED])
diff --git a/src/python/src/_framework/base/packets/_ends.py b/src/python/src/_framework/base/packets/_ends.py
index baaf5cacf9..b1d16451e2 100644
--- a/src/python/src/_framework/base/packets/_ends.py
+++ b/src/python/src/_framework/base/packets/_ends.py
@@ -51,13 +51,13 @@ from _framework.foundation import callable_util
_IDLE_ACTION_EXCEPTION_LOG_MESSAGE = 'Exception calling idle action!'
_OPERATION_OUTCOMES = (
- base_interfaces.COMPLETED,
- base_interfaces.CANCELLED,
- base_interfaces.EXPIRED,
- base_interfaces.RECEPTION_FAILURE,
- base_interfaces.TRANSMISSION_FAILURE,
- base_interfaces.SERVICER_FAILURE,
- base_interfaces.SERVICED_FAILURE,
+ base_interfaces.Outcome.COMPLETED,
+ base_interfaces.Outcome.CANCELLED,
+ base_interfaces.Outcome.EXPIRED,
+ base_interfaces.Outcome.RECEPTION_FAILURE,
+ base_interfaces.Outcome.TRANSMISSION_FAILURE,
+ base_interfaces.Outcome.SERVICER_FAILURE,
+ base_interfaces.Outcome.SERVICED_FAILURE,
)
@@ -193,10 +193,10 @@ def _front_operate(
lock = threading.Lock()
with lock:
termination_manager = _termination.front_termination_manager(
- work_pool, utility_pool, termination_action, subscription.category)
+ work_pool, utility_pool, termination_action, subscription.kind)
transmission_manager = _transmission.front_transmission_manager(
lock, transmission_pool, callback, operation_id, name,
- subscription.category, trace_id, timeout, termination_manager)
+ subscription.kind, trace_id, timeout, termination_manager)
operation_context = _context.OperationContext(
lock, operation_id, packets.Kind.SERVICED_FAILURE,
termination_manager, transmission_manager)
@@ -225,9 +225,10 @@ def _front_operate(
transmission_manager.inmit(payload, complete)
- returned_reception_manager = (
- None if subscription.category == base_interfaces.NONE
- else reception_manager)
+ if subscription.kind is base_interfaces.ServicedSubscription.Kind.NONE:
+ returned_reception_manager = None
+ else:
+ returned_reception_manager = reception_manager
return _FrontManagement(
returned_reception_manager, emission_manager, operation_context,
diff --git a/src/python/src/_framework/base/packets/_ingestion.py b/src/python/src/_framework/base/packets/_ingestion.py
index ad5ed4cada..abc1e7a043 100644
--- a/src/python/src/_framework/base/packets/_ingestion.py
+++ b/src/python/src/_framework/base/packets/_ingestion.py
@@ -111,7 +111,7 @@ class _FrontConsumerCreator(_ConsumerCreator):
def create_consumer(self, requirement):
"""See _ConsumerCreator.create_consumer for specification."""
- if self._subscription.category == interfaces.FULL:
+ if self._subscription.kind is interfaces.ServicedSubscription.Kind.FULL:
try:
return _ConsumerCreation(
self._subscription.ingestor.consumer(self._operation_context),
diff --git a/src/python/src/_framework/base/packets/_interfaces.py b/src/python/src/_framework/base/packets/_interfaces.py
index 5f6c0593d0..d1bda95bf7 100644
--- a/src/python/src/_framework/base/packets/_interfaces.py
+++ b/src/python/src/_framework/base/packets/_interfaces.py
@@ -58,10 +58,7 @@ class TerminationManager(object):
immediately.
Args:
- callback: A callable that will be passed one of base_interfaces.COMPLETED,
- base_interfaces.CANCELLED, base_interfaces.EXPIRED,
- base_interfaces.RECEPTION_FAILURE, base_interfaces.TRANSMISSION_FAILURE,
- base_interfaces.SERVICER_FAILURE, or base_interfaces.SERVICED_FAILURE.
+ callback: A callable that will be passed a base_interfaces.Outcome value.
"""
raise NotImplementedError()
diff --git a/src/python/src/_framework/base/packets/_termination.py b/src/python/src/_framework/base/packets/_termination.py
index d586c2167b..ae3ba1c16f 100644
--- a/src/python/src/_framework/base/packets/_termination.py
+++ b/src/python/src/_framework/base/packets/_termination.py
@@ -29,6 +29,8 @@
"""State and behavior for operation termination."""
+import enum
+
from _framework.base import interfaces
from _framework.base.packets import _constants
from _framework.base.packets import _interfaces
@@ -37,26 +39,32 @@ from _framework.foundation import callable_util
_CALLBACK_EXCEPTION_LOG_MESSAGE = 'Exception calling termination callback!'
-# TODO(nathaniel): enum module.
-_EMISSION = 'emission'
-_TRANSMISSION = 'transmission'
-_INGESTION = 'ingestion'
-
-_FRONT_NOT_LISTENING_REQUIREMENTS = (_TRANSMISSION,)
-_BACK_NOT_LISTENING_REQUIREMENTS = (_EMISSION, _INGESTION,)
-_LISTENING_REQUIREMENTS = (_TRANSMISSION, _INGESTION,)
-
_KINDS_TO_OUTCOMES = {
- packets.Kind.COMPLETION: interfaces.COMPLETED,
- packets.Kind.CANCELLATION: interfaces.CANCELLED,
- packets.Kind.EXPIRATION: interfaces.EXPIRED,
- packets.Kind.RECEPTION_FAILURE: interfaces.RECEPTION_FAILURE,
- packets.Kind.TRANSMISSION_FAILURE: interfaces.TRANSMISSION_FAILURE,
- packets.Kind.SERVICER_FAILURE: interfaces.SERVICER_FAILURE,
- packets.Kind.SERVICED_FAILURE: interfaces.SERVICED_FAILURE,
+ packets.Kind.COMPLETION: interfaces.Outcome.COMPLETED,
+ packets.Kind.CANCELLATION: interfaces.Outcome.CANCELLED,
+ packets.Kind.EXPIRATION: interfaces.Outcome.EXPIRED,
+ packets.Kind.RECEPTION_FAILURE: interfaces.Outcome.RECEPTION_FAILURE,
+ packets.Kind.TRANSMISSION_FAILURE: interfaces.Outcome.TRANSMISSION_FAILURE,
+ packets.Kind.SERVICER_FAILURE: interfaces.Outcome.SERVICER_FAILURE,
+ packets.Kind.SERVICED_FAILURE: interfaces.Outcome.SERVICED_FAILURE,
}
+@enum.unique
+class _Requirement(enum.Enum):
+ """Symbols indicating events required for termination."""
+
+ EMISSION = 'emission'
+ TRANSMISSION = 'transmission'
+ INGESTION = 'ingestion'
+
+_FRONT_NOT_LISTENING_REQUIREMENTS = (_Requirement.TRANSMISSION,)
+_BACK_NOT_LISTENING_REQUIREMENTS = (
+ _Requirement.EMISSION, _Requirement.INGESTION,)
+_LISTENING_REQUIREMENTS = (
+ _Requirement.TRANSMISSION, _Requirement.INGESTION,)
+
+
class _TerminationManager(_interfaces.TerminationManager):
"""An implementation of _interfaces.TerminationManager."""
@@ -68,9 +76,8 @@ class _TerminationManager(_interfaces.TerminationManager):
work_pool: A thread pool in which customer work will be done.
utility_pool: A thread pool in which work utility work will be done.
action: An action to call on operation termination.
- requirements: A combination of _EMISSION, _TRANSMISSION, and _INGESTION
- identifying what must finish for the operation to be considered
- completed.
+ requirements: A combination of _Requirement values identifying what
+ must finish for the operation to be considered completed.
local_failure: A packets.Kind specifying what constitutes local failure of
customer work.
"""
@@ -137,21 +144,21 @@ class _TerminationManager(_interfaces.TerminationManager):
def emission_complete(self):
"""See superclass method for specification."""
if self._outstanding_requirements is not None:
- self._outstanding_requirements.discard(_EMISSION)
+ self._outstanding_requirements.discard(_Requirement.EMISSION)
if not self._outstanding_requirements:
self._terminate(packets.Kind.COMPLETION)
def transmission_complete(self):
"""See superclass method for specification."""
if self._outstanding_requirements is not None:
- self._outstanding_requirements.discard(_TRANSMISSION)
+ self._outstanding_requirements.discard(_Requirement.TRANSMISSION)
if not self._outstanding_requirements:
self._terminate(packets.Kind.COMPLETION)
def ingestion_complete(self):
"""See superclass method for specification."""
if self._outstanding_requirements is not None:
- self._outstanding_requirements.discard(_INGESTION)
+ self._outstanding_requirements.discard(_Requirement.INGESTION)
if not self._outstanding_requirements:
self._terminate(packets.Kind.COMPLETION)
@@ -163,39 +170,46 @@ class _TerminationManager(_interfaces.TerminationManager):
self._terminate(kind)
-def front_termination_manager(work_pool, utility_pool, action, subscription):
+def front_termination_manager(
+ work_pool, utility_pool, action, subscription_kind):
"""Creates a TerminationManager appropriate for front-side use.
Args:
work_pool: A thread pool in which customer work will be done.
utility_pool: A thread pool in which work utility work will be done.
action: An action to call on operation termination.
- subscription: One of interfaces.FULL, interfaces.termination_only, or
- interfaces.NONE.
+ subscription_kind: An interfaces.ServicedSubscription.Kind value.
Returns:
A TerminationManager appropriate for front-side use.
"""
+ if subscription_kind is interfaces.ServicedSubscription.Kind.NONE:
+ requirements = _FRONT_NOT_LISTENING_REQUIREMENTS
+ else:
+ requirements = _LISTENING_REQUIREMENTS
+
return _TerminationManager(
- work_pool, utility_pool, action,
- _FRONT_NOT_LISTENING_REQUIREMENTS if subscription == interfaces.NONE else
- _LISTENING_REQUIREMENTS, packets.Kind.SERVICED_FAILURE)
+ work_pool, utility_pool, action, requirements,
+ packets.Kind.SERVICED_FAILURE)
-def back_termination_manager(work_pool, utility_pool, action, subscription):
+def back_termination_manager(work_pool, utility_pool, action, subscription_kind):
"""Creates a TerminationManager appropriate for back-side use.
Args:
work_pool: A thread pool in which customer work will be done.
utility_pool: A thread pool in which work utility work will be done.
action: An action to call on operation termination.
- subscription: One of interfaces.FULL, interfaces.termination_only, or
- interfaces.NONE.
+ subscription_kind: An interfaces.ServicedSubscription.Kind value.
Returns:
A TerminationManager appropriate for back-side use.
"""
+ if subscription_kind is interfaces.ServicedSubscription.Kind.NONE:
+ requirements = _BACK_NOT_LISTENING_REQUIREMENTS
+ else:
+ requirements = _LISTENING_REQUIREMENTS
+
return _TerminationManager(
- work_pool, utility_pool, action,
- _BACK_NOT_LISTENING_REQUIREMENTS if subscription == interfaces.NONE else
- _LISTENING_REQUIREMENTS, packets.Kind.SERVICER_FAILURE)
+ work_pool, utility_pool, action, requirements,
+ packets.Kind.SERVICER_FAILURE)
diff --git a/src/python/src/_framework/base/packets/_transmission.py b/src/python/src/_framework/base/packets/_transmission.py
index 006128774d..24fe6e6164 100644
--- a/src/python/src/_framework/base/packets/_transmission.py
+++ b/src/python/src/_framework/base/packets/_transmission.py
@@ -91,20 +91,19 @@ class _Packetizer(object):
class _FrontPacketizer(_Packetizer):
"""Front-side packet-creating behavior."""
- def __init__(self, name, subscription, trace_id, timeout):
+ def __init__(self, name, subscription_kind, trace_id, timeout):
"""Constructor.
Args:
name: The name of the operation.
- subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or
- interfaces.NONE describing the interest the front has in packets sent
- from the back.
+ subscription_kind: An interfaces.ServicedSubscription.Kind value
+ describing the interest the front has in packets sent from the back.
trace_id: A uuid.UUID identifying a set of related operations to which
this operation belongs.
timeout: A length of time in seconds to allow for the entire operation.
"""
self._name = name
- self._subscription = subscription
+ self._subscription_kind = subscription_kind
self._trace_id = trace_id
self._timeout = timeout
@@ -114,13 +113,13 @@ class _FrontPacketizer(_Packetizer):
return packets.FrontToBackPacket(
operation_id, sequence_number,
packets.Kind.COMPLETION if complete else packets.Kind.CONTINUATION,
- self._name, self._subscription, self._trace_id, payload,
+ self._name, self._subscription_kind, self._trace_id, payload,
self._timeout)
else:
return packets.FrontToBackPacket(
operation_id, 0,
packets.Kind.ENTIRE if complete else packets.Kind.COMMENCEMENT,
- self._name, self._subscription, self._trace_id, payload,
+ self._name, self._subscription_kind, self._trace_id, payload,
self._timeout)
def packetize_abortion(self, operation_id, sequence_number, kind):
@@ -335,8 +334,8 @@ class _TransmittingTransmissionManager(TransmissionManager):
def front_transmission_manager(
- lock, pool, callback, operation_id, name, subscription, trace_id, timeout,
- termination_manager):
+ lock, pool, callback, operation_id, name, subscription_kind, trace_id,
+ timeout, termination_manager):
"""Creates a TransmissionManager appropriate for front-side use.
Args:
@@ -347,9 +346,8 @@ def front_transmission_manager(
of the operation.
operation_id: The operation's ID.
name: The name of the operation.
- subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or
- interfaces.NONE describing the interest the front has in packets sent
- from the back.
+ subscription_kind: An interfaces.ServicedSubscription.Kind value
+ describing the interest the front has in packets sent from the back.
trace_id: A uuid.UUID identifying a set of related operations to which
this operation belongs.
timeout: A length of time in seconds to allow for the entire operation.
@@ -361,12 +359,13 @@ def front_transmission_manager(
"""
return _TransmittingTransmissionManager(
lock, pool, callback, operation_id, _FrontPacketizer(
- name, subscription, trace_id, timeout),
+ name, subscription_kind, trace_id, timeout),
termination_manager)
def back_transmission_manager(
- lock, pool, callback, operation_id, termination_manager, subscription):
+ lock, pool, callback, operation_id, termination_manager,
+ subscription_kind):
"""Creates a TransmissionManager appropriate for back-side use.
Args:
@@ -378,14 +377,13 @@ def back_transmission_manager(
operation_id: The operation's ID.
termination_manager: The _interfaces.TerminationManager associated with
this operation.
- subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or
- interfaces.NONE describing the interest the front has in packets sent from
- the back.
+ subscription_kind: An interfaces.ServicedSubscription.Kind value
+ describing the interest the front has in packets sent from the back.
Returns:
A TransmissionManager appropriate for back-side use.
"""
- if subscription == interfaces.NONE:
+ if subscription_kind is interfaces.ServicedSubscription.Kind.NONE:
return _EmptyTransmissionManager()
else:
return _TransmittingTransmissionManager(
diff --git a/src/python/src/_framework/base/packets/packets.py b/src/python/src/_framework/base/packets/packets.py
index 1315ca650e..f7503bdcd6 100644
--- a/src/python/src/_framework/base/packets/packets.py
+++ b/src/python/src/_framework/base/packets/packets.py
@@ -71,10 +71,9 @@ class FrontToBackPacket(
Kind.RECEPTION_FAILURE, or Kind.TRANSMISSION_FAILURE.
name: The name of an operation. Must be present if kind is Kind.COMMENCEMENT
or Kind.ENTIRE. Must be None for any other kind.
- subscription: One of interfaces.FULL, interfaces.TERMINATION_ONLY, or
- interfaces.NONE describing the interest the front has in packets sent from
- the back. Must be present if kind is Kind.COMMENCEMENT or Kind.ENTIRE.
- Must be None for any other kind.
+ subscription: An interfaces.ServicedSubscription.Kind value describing the
+ interest the front has in packets sent from the back. Must be present if
+ kind is Kind.COMMENCEMENT or Kind.ENTIRE. Must be None for any other kind.
trace_id: A uuid.UUID identifying a set of related operations to which this
operation belongs. May be None.
payload: A customer payload object. Must be present if kind is
diff --git a/src/python/src/_framework/base/util.py b/src/python/src/_framework/base/util.py
index 6bbd18a59a..35ce0443fc 100644
--- a/src/python/src/_framework/base/util.py
+++ b/src/python/src/_framework/base/util.py
@@ -36,13 +36,14 @@ from _framework.base import interfaces
class _ServicedSubscription(
- collections.namedtuple('_ServicedSubscription', ['category', 'ingestor']),
+ collections.namedtuple('_ServicedSubscription', ['kind', 'ingestor']),
interfaces.ServicedSubscription):
"""See interfaces.ServicedSubscription for specification."""
-_NONE_SUBSCRIPTION = _ServicedSubscription(interfaces.NONE, None)
+_NONE_SUBSCRIPTION = _ServicedSubscription(
+ interfaces.ServicedSubscription.Kind.NONE, None)
_TERMINATION_ONLY_SUBSCRIPTION = _ServicedSubscription(
- interfaces.TERMINATION_ONLY, None)
+ interfaces.ServicedSubscription.Kind.TERMINATION_ONLY, None)
def none_serviced_subscription():
@@ -72,12 +73,14 @@ def full_serviced_subscription(ingestor):
"""Creates a "full" interfaces.ServicedSubscription object.
Args:
- ingestor: A ServicedIngestor.
+ ingestor: An interfaces.ServicedIngestor.
Returns:
- A ServicedSubscription object indicating a full subscription.
+ An interfaces.ServicedSubscription object indicating a full
+ subscription.
"""
- return _ServicedSubscription(interfaces.FULL, ingestor)
+ return _ServicedSubscription(
+ interfaces.ServicedSubscription.Kind.FULL, ingestor)
def wait_for_idle(end):
diff --git a/src/python/src/_framework/face/_calls.py b/src/python/src/_framework/face/_calls.py
index ab58e6378b..9128aef7c4 100644
--- a/src/python/src/_framework/face/_calls.py
+++ b/src/python/src/_framework/face/_calls.py
@@ -94,7 +94,7 @@ class _OperationCancellableIterator(interfaces.CancellableIterator):
def cancel(self):
self._operation.cancel()
- self._rendezvous.set_outcome(base_interfaces.CANCELLED)
+ self._rendezvous.set_outcome(base_interfaces.Outcome.CANCELLED)
class _OperationFuture(future.Future):
@@ -150,15 +150,12 @@ class _OperationFuture(future.Future):
"""Indicates to this object that the operation has terminated.
Args:
- operation_outcome: One of base_interfaces.COMPLETED,
- base_interfaces.CANCELLED, base_interfaces.EXPIRED,
- base_interfaces.RECEPTION_FAILURE, base_interfaces.TRANSMISSION_FAILURE,
- base_interfaces.SERVICED_FAILURE, or base_interfaces.SERVICER_FAILURE
- indicating the categorical outcome of the operation.
+ operation_outcome: A base_interfaces.Outcome value indicating the
+ outcome of the operation.
"""
with self._condition:
if (self._outcome is None and
- operation_outcome != base_interfaces.COMPLETED):
+ operation_outcome is not base_interfaces.Outcome.COMPLETED):
self._outcome = future.raised(
_control.abortion_outcome_to_exception(operation_outcome))
self._condition.notify_all()
diff --git a/src/python/src/_framework/face/_control.py b/src/python/src/_framework/face/_control.py
index 2c221321d6..9f1bf6d5fd 100644
--- a/src/python/src/_framework/face/_control.py
+++ b/src/python/src/_framework/face/_control.py
@@ -40,13 +40,17 @@ from _framework.foundation import stream
INTERNAL_ERROR_LOG_MESSAGE = ':-( RPC Framework (Face) Internal Error! :-('
_OPERATION_OUTCOME_TO_RPC_ABORTION = {
- base_interfaces.CANCELLED: interfaces.CANCELLED,
- base_interfaces.EXPIRED: interfaces.EXPIRED,
- base_interfaces.RECEPTION_FAILURE: interfaces.NETWORK_FAILURE,
- base_interfaces.TRANSMISSION_FAILURE: interfaces.NETWORK_FAILURE,
- base_interfaces.SERVICED_FAILURE: interfaces.SERVICED_FAILURE,
- base_interfaces.SERVICER_FAILURE: interfaces.SERVICER_FAILURE,
- }
+ base_interfaces.Outcome.CANCELLED: interfaces.Abortion.CANCELLED,
+ base_interfaces.Outcome.EXPIRED: interfaces.Abortion.EXPIRED,
+ base_interfaces.Outcome.RECEPTION_FAILURE:
+ interfaces.Abortion.NETWORK_FAILURE,
+ base_interfaces.Outcome.TRANSMISSION_FAILURE:
+ interfaces.Abortion.NETWORK_FAILURE,
+ base_interfaces.Outcome.SERVICED_FAILURE:
+ interfaces.Abortion.SERVICED_FAILURE,
+ base_interfaces.Outcome.SERVICER_FAILURE:
+ interfaces.Abortion.SERVICER_FAILURE,
+}
def _as_operation_termination_callback(rpc_abortion_callback):
@@ -59,13 +63,13 @@ def _as_operation_termination_callback(rpc_abortion_callback):
def _abortion_outcome_to_exception(abortion_outcome):
- if abortion_outcome == base_interfaces.CANCELLED:
+ if abortion_outcome == base_interfaces.Outcome.CANCELLED:
return exceptions.CancellationError()
- elif abortion_outcome == base_interfaces.EXPIRED:
+ elif abortion_outcome == base_interfaces.Outcome.EXPIRED:
return exceptions.ExpirationError()
- elif abortion_outcome == base_interfaces.SERVICER_FAILURE:
+ elif abortion_outcome == base_interfaces.Outcome.SERVICER_FAILURE:
return exceptions.ServicerError()
- elif abortion_outcome == base_interfaces.SERVICED_FAILURE:
+ elif abortion_outcome == base_interfaces.Outcome.SERVICED_FAILURE:
return exceptions.ServicedError()
else:
return exceptions.NetworkError()
@@ -133,7 +137,7 @@ class Rendezvous(stream.Consumer):
def set_outcome(self, outcome):
with self._condition:
- if outcome != base_interfaces.COMPLETED:
+ if outcome is not base_interfaces.Outcome.COMPLETED:
self._abortion = outcome
self._condition.notify()
diff --git a/src/python/src/_framework/face/interfaces.py b/src/python/src/_framework/face/interfaces.py
index 0cc7c70df3..2480454369 100644
--- a/src/python/src/_framework/face/interfaces.py
+++ b/src/python/src/_framework/face/interfaces.py
@@ -30,6 +30,7 @@
"""Interfaces for the face layer of RPC Framework."""
import abc
+import enum
# exceptions, abandonment, and future are referenced from specification in this
# module.
@@ -58,14 +59,15 @@ class CancellableIterator(object):
raise NotImplementedError()
-# Constants that categorize RPC abortion.
-# TODO(nathaniel): Learn and use Python's enum library for this de facto
-# enumerated type
-CANCELLED = 'abortion: cancelled'
-EXPIRED = 'abortion: expired'
-NETWORK_FAILURE = 'abortion: network failure'
-SERVICED_FAILURE = 'abortion: serviced failure'
-SERVICER_FAILURE = 'abortion: servicer failure'
+@enum.unique
+class Abortion(enum.Enum):
+ """Categories of RPC abortion."""
+
+ CANCELLED = 'cancelled'
+ EXPIRED = 'expired'
+ NETWORK_FAILURE = 'network failure'
+ SERVICED_FAILURE = 'serviced failure'
+ SERVICER_FAILURE = 'servicer failure'
class RpcContext(object):
@@ -93,9 +95,8 @@ class RpcContext(object):
"""Registers a callback to be called if the RPC is aborted.
Args:
- abortion_callback: A callable to be called and passed one of CANCELLED,
- EXPIRED, NETWORK_FAILURE, SERVICED_FAILURE, or SERVICER_FAILURE in the
- event of RPC abortion.
+ abortion_callback: A callable to be called and passed an Abortion value
+ in the event of RPC abortion.
"""
raise NotImplementedError()
@@ -474,9 +475,8 @@ class Stub(object):
request: The request value for the RPC.
response_callback: A callback to be called to accept the response value
of the RPC.
- abortion_callback: A callback to be called to accept one of CANCELLED,
- EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC
- abortion.
+ abortion_callback: A callback to be called and passed an Abortion value
+ in the event of RPC abortion.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
@@ -494,9 +494,8 @@ class Stub(object):
request: The request value for the RPC.
response_consumer: A stream.Consumer to be called to accept the response
values of the RPC.
- abortion_callback: A callback to be called to accept one of CANCELLED,
- EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC
- abortion.
+ abortion_callback: A callback to be called and passed an Abortion value
+ in the event of RPC abortion.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
@@ -513,9 +512,8 @@ class Stub(object):
name: The RPC method name.
response_callback: A callback to be called to accept the response value
of the RPC.
- abortion_callback: A callback to be called to accept one of CANCELLED,
- EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC
- abortion.
+ abortion_callback: A callback to be called and passed an Abortion value
+ in the event of RPC abortion.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
@@ -533,9 +531,8 @@ class Stub(object):
name: The RPC method name.
response_consumer: A stream.Consumer to be called to accept the response
values of the RPC.
- abortion_callback: A callback to be called to accept one of CANCELLED,
- EXPIRED, NETWORK_FAILURE, or SERVICER_FAILURE in the event of RPC
- abortion.
+ abortion_callback: A callback to be called and passed an Abortion value
+ in the event of RPC abortion.
timeout: A duration of time in seconds to allow for the RPC.
Returns:
diff --git a/src/python/src/_framework/face/testing/event_invocation_synchronous_event_service_test_case.py b/src/python/src/_framework/face/testing/event_invocation_synchronous_event_service_test_case.py
index dba73a9368..cb786f500c 100644
--- a/src/python/src/_framework/face/testing/event_invocation_synchronous_event_service_test_case.py
+++ b/src/python/src/_framework/face/testing/event_invocation_synchronous_event_service_test_case.py
@@ -176,7 +176,7 @@ class EventInvocationSynchronousEventServiceTestCase(
name, request, callback.complete, callback.abort, _TIMEOUT)
callback.block_until_terminated()
- self.assertEqual(interfaces.EXPIRED, callback.abortion())
+ self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion())
def testExpiredUnaryRequestStreamResponse(self):
for name, test_messages_sequence in (
@@ -190,7 +190,7 @@ class EventInvocationSynchronousEventServiceTestCase(
name, request, callback, callback.abort, _TIMEOUT)
callback.block_until_terminated()
- self.assertEqual(interfaces.EXPIRED, callback.abortion())
+ self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion())
def testExpiredStreamRequestUnaryResponse(self):
for name, test_messages_sequence in (
@@ -202,7 +202,7 @@ class EventInvocationSynchronousEventServiceTestCase(
name, callback.complete, callback.abort, _TIMEOUT)
callback.block_until_terminated()
- self.assertEqual(interfaces.EXPIRED, callback.abortion())
+ self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion())
def testExpiredStreamRequestStreamResponse(self):
for name, test_messages_sequence in (
@@ -217,7 +217,7 @@ class EventInvocationSynchronousEventServiceTestCase(
request_consumer.consume(request)
callback.block_until_terminated()
- self.assertEqual(interfaces.EXPIRED, callback.abortion())
+ self.assertEqual(interfaces.Abortion.EXPIRED, callback.abortion())
def testFailedUnaryRequestUnaryResponse(self):
for name, test_messages_sequence in (
@@ -231,7 +231,7 @@ class EventInvocationSynchronousEventServiceTestCase(
name, request, callback.complete, callback.abort, _TIMEOUT)
callback.block_until_terminated()
- self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion())
+ self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion())
def testFailedUnaryRequestStreamResponse(self):
for name, test_messages_sequence in (
@@ -245,7 +245,7 @@ class EventInvocationSynchronousEventServiceTestCase(
name, request, callback, callback.abort, _TIMEOUT)
callback.block_until_terminated()
- self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion())
+ self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion())
def testFailedStreamRequestUnaryResponse(self):
for name, test_messages_sequence in (
@@ -262,7 +262,7 @@ class EventInvocationSynchronousEventServiceTestCase(
request_consumer.terminate()
callback.block_until_terminated()
- self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion())
+ self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion())
def testFailedStreamRequestStreamResponse(self):
for name, test_messages_sequence in (
@@ -279,7 +279,7 @@ class EventInvocationSynchronousEventServiceTestCase(
request_consumer.terminate()
callback.block_until_terminated()
- self.assertEqual(interfaces.SERVICER_FAILURE, callback.abortion())
+ self.assertEqual(interfaces.Abortion.SERVICER_FAILURE, callback.abortion())
def testParallelInvocations(self):
for name, test_messages_sequence in (
@@ -321,7 +321,7 @@ class EventInvocationSynchronousEventServiceTestCase(
call.cancel()
callback.block_until_terminated()
- self.assertEqual(interfaces.CANCELLED, callback.abortion())
+ self.assertEqual(interfaces.Abortion.CANCELLED, callback.abortion())
def testCancelledUnaryRequestStreamResponse(self):
for name, test_messages_sequence in (
@@ -335,7 +335,7 @@ class EventInvocationSynchronousEventServiceTestCase(
call.cancel()
callback.block_until_terminated()
- self.assertEqual(interfaces.CANCELLED, callback.abortion())
+ self.assertEqual(interfaces.Abortion.CANCELLED, callback.abortion())
def testCancelledStreamRequestUnaryResponse(self):
for name, test_messages_sequence in (
@@ -351,7 +351,7 @@ class EventInvocationSynchronousEventServiceTestCase(
call.cancel()
callback.block_until_terminated()
- self.assertEqual(interfaces.CANCELLED, callback.abortion())
+ self.assertEqual(interfaces.Abortion.CANCELLED, callback.abortion())
def testCancelledStreamRequestStreamResponse(self):
for name, test_messages_sequence in (
@@ -364,4 +364,4 @@ class EventInvocationSynchronousEventServiceTestCase(
call.cancel()
callback.block_until_terminated()
- self.assertEqual(interfaces.CANCELLED, callback.abortion())
+ self.assertEqual(interfaces.Abortion.CANCELLED, callback.abortion())
diff --git a/test/core/iomgr/poll_kick_posix_test.c b/test/core/iomgr/poll_kick_posix_test.c
index 3c6d815c9d..2c5b444d3a 100644
--- a/test/core/iomgr/poll_kick_posix_test.c
+++ b/test/core/iomgr/poll_kick_posix_test.c
@@ -105,6 +105,7 @@ static void test_over_free(void) {
grpc_pollset_kick_post_poll(&kick_state[i]);
grpc_pollset_kick_destroy(&kick_state[i]);
}
+ gpr_free(kick_state);
}
static void run_tests(void) {
diff --git a/tools/run_tests/build_python.sh b/tools/run_tests/build_python.sh
index 46e5797f62..4abb412c95 100755
--- a/tools/run_tests/build_python.sh
+++ b/tools/run_tests/build_python.sh
@@ -7,6 +7,5 @@ cd $(dirname $0)/../..
root=`pwd`
virtualenv python2.7_virtual_environment
-python2.7_virtual_environment/bin/pip install enum34==1.0.4 futures==2.2.0
-python2.7_virtual_environment/bin/pip install third_party/protobuf/python
+python2.7_virtual_environment/bin/pip install enum34==1.0.4 futures==2.2.0 protobuf==2.6.1
python2.7_virtual_environment/bin/pip install src/python