aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-01 20:31:27 -0700
committerGravatar Craig Tiller <craig.tiller@gmail.com>2015-06-01 20:31:27 -0700
commit14556ecf8c59c7eb112c79e86e5c7bc0911f4478 (patch)
tree0ff7ca7402fbd8a20b5116b6d159539cc379d450 /src
parent8e0b08a33d819d0a2523ec439d545296e1ad2086 (diff)
parent3d67c7cf45f694a793be1adacafa2cfb98ecd75f (diff)
Merge branch 'count-the-things' into we-dont-need-no-backup
Conflicts: Makefile src/core/iomgr/pollset_posix.c src/core/surface/call.c src/core/surface/channel.c src/core/surface/server.c src/python/src/grpc/_adapter/_low_test.py tools/doxygen/Doxyfile.core.internal
Diffstat (limited to 'src')
-rw-r--r--src/core/census/README.md (renamed from src/python/src/grpc/_adapter/_call.h)71
-rw-r--r--src/core/census/context.c (renamed from src/python/src/grpc/_adapter/_client_credentials.h)32
-rw-r--r--src/core/census/context.h (renamed from src/python/src/grpc/_adapter/_completion_queue.h)26
-rw-r--r--src/core/census/grpc_context.c (renamed from src/python/src/grpc/_adapter/_channel.h)24
-rw-r--r--src/core/census/grpc_context.h (renamed from src/python/src/grpc/_adapter/_error.h)12
-rw-r--r--src/core/census/initialize.c (renamed from src/python/src/grpc/_adapter/_server.h)29
-rw-r--r--src/core/channel/child_channel.c11
-rw-r--r--src/core/iomgr/fd_posix.c53
-rw-r--r--src/core/iomgr/fd_posix.h10
-rw-r--r--src/core/iomgr/iomgr.c68
-rw-r--r--src/core/iomgr/iomgr.h37
-rw-r--r--src/core/iomgr/iomgr_internal.h4
-rw-r--r--src/core/iomgr/pollset_posix.c7
-rw-r--r--src/core/iomgr/pollset_windows.c10
-rw-r--r--src/core/iomgr/socket_windows.c11
-rw-r--r--src/core/iomgr/socket_windows.h4
-rw-r--r--src/core/iomgr/tcp_posix.c7
-rw-r--r--src/core/security/credentials.c12
-rw-r--r--src/core/surface/call.c12
-rw-r--r--src/core/surface/channel.c7
-rw-r--r--src/core/surface/channel_create.c3
-rw-r--r--src/core/surface/init.c6
-rw-r--r--src/core/surface/secure_channel_create.c3
-rw-r--r--src/core/surface/server.c34
-rw-r--r--src/core/transport/chttp2/gen_hpack_tables.c6
-rw-r--r--src/cpp/server/thread_pool.cc5
-rw-r--r--src/cpp/server/thread_pool.h2
-rw-r--r--src/python/src/grpc/_adapter/_c.c86
-rw-r--r--src/python/src/grpc/_adapter/_c/module.c61
-rw-r--r--src/python/src/grpc/_adapter/_c/types.c60
-rw-r--r--src/python/src/grpc/_adapter/_c/types.h271
-rw-r--r--src/python/src/grpc/_adapter/_c/types/call.c163
-rw-r--r--src/python/src/grpc/_adapter/_c/types/channel.c134
-rw-r--r--src/python/src/grpc/_adapter/_c/types/client_credentials.c286
-rw-r--r--src/python/src/grpc/_adapter/_c/types/completion_queue.c124
-rw-r--r--src/python/src/grpc/_adapter/_c/types/server.c183
-rw-r--r--src/python/src/grpc/_adapter/_c/types/server_credentials.c146
-rw-r--r--src/python/src/grpc/_adapter/_c/utility.c460
-rw-r--r--src/python/src/grpc/_adapter/_c_test.py188
-rw-r--r--src/python/src/grpc/_adapter/_call.c438
-rw-r--r--src/python/src/grpc/_adapter/_channel.c135
-rw-r--r--src/python/src/grpc/_adapter/_client_credentials.c121
-rw-r--r--src/python/src/grpc/_adapter/_completion_queue.c653
-rw-r--r--src/python/src/grpc/_adapter/_datatypes.py86
-rw-r--r--src/python/src/grpc/_adapter/_error.c79
-rw-r--r--src/python/src/grpc/_adapter/_intermediary_low.py258
-rw-r--r--src/python/src/grpc/_adapter/_intermediary_low_test.py421
-rw-r--r--src/python/src/grpc/_adapter/_low.py104
-rw-r--r--src/python/src/grpc/_adapter/_low_test.py476
-rw-r--r--src/python/src/grpc/_adapter/_server.c202
-rw-r--r--src/python/src/grpc/_adapter/_server_credentials.c152
-rw-r--r--src/python/src/grpc/_adapter/_server_credentials.h49
-rw-r--r--src/python/src/grpc/_adapter/_tag.c65
-rw-r--r--src/python/src/grpc/_adapter/_tag.h70
-rw-r--r--src/python/src/grpc/_adapter/_types.py368
-rw-r--r--src/python/src/grpc/_adapter/fore.py4
-rw-r--r--src/python/src/grpc/_adapter/rear.py4
-rw-r--r--src/python/src/setup.py18
58 files changed, 3459 insertions, 2912 deletions
diff --git a/src/python/src/grpc/_adapter/_call.h b/src/core/census/README.md
index b4cf9d7ec9..fb615a2194 100644
--- a/src/python/src/grpc/_adapter/_call.h
+++ b/src/core/census/README.md
@@ -1,5 +1,4 @@
-/*
- *
+<!---
* Copyright 2015, Google Inc.
* All rights reserved.
*
@@ -28,50 +27,50 @@
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
+-->
+
+# Census - a resource measurement and tracing system
-#ifndef _ADAPTER__CALL_H_
-#define _ADAPTER__CALL_H_
+This directory contains code for Census, which will ultimately provide the
+following features for any gRPC-using system:
+* A [dapper](http://research.google.com/pubs/pub36356.html)-like tracing
+ system, enabling tracing across a distributed infrastructure.
+* RPC statistics and measurements for key metrics, such as latency, bytes
+ transferred, number of errors etc.
+* Resource measurement framework which can be used for measuring custom
+ metrics. Through the use of [tags](#Tags), these can be broken down across
+ the entire distributed stack.
+* Easy integration of the above with
+ [Google Cloud Trace](https://cloud.google.com/tools/cloud-trace) and
+ [Google Cloud Monitoring](https://cloud.google.com/monitoring/).
-#include <Python.h>
-#include <grpc/grpc.h>
+## Concepts
-#include "grpc/_adapter/_completion_queue.h"
-#include "grpc/_adapter/_channel.h"
-#include "grpc/_adapter/_server.h"
+### Context
-typedef struct {
- PyObject_HEAD
+### Operations
- CompletionQueue *completion_queue;
- Channel *channel;
- Server *server;
+### Tags
- /* Legacy state. */
- grpc_call_details call_details;
- grpc_metadata_array recv_metadata;
- grpc_metadata_array recv_trailing_metadata;
- grpc_metadata *send_metadata;
- size_t send_metadata_count;
- grpc_metadata *send_trailing_metadata;
- size_t send_trailing_metadata_count;
- int adding_to_trailing;
+### Metrics
- grpc_byte_buffer *send_message;
- grpc_byte_buffer *recv_message;
+## API
- grpc_status_code status;
- char *status_details;
- size_t status_details_capacity;
+### Internal/RPC API
- int cancelled;
+### External/Client API
- grpc_call *c_call;
-} Call;
+### RPC API
-extern PyTypeObject pygrpc_CallType;
+## Files in this directory
-int pygrpc_add_call(PyObject *module);
+Note that files and functions in this directory can be split into two
+categories:
+* Files that define core census library functions. Functions etc. in these
+ files are named census\_\*, and constitute the core census library
+ functionality. At some time in the future, these will become a standalone
+ library.
+* Files that define functions etc. that provide a convenient interface between
+ grpc and the core census functionality. These files are all named
+ grpc\_\*.{c,h}, and define function names beginning with grpc\_census\_\*.
-#endif /* _ADAPTER__CALL_H_ */
diff --git a/src/python/src/grpc/_adapter/_client_credentials.h b/src/core/census/context.c
index fe04016d20..1358c5127b 100644
--- a/src/python/src/grpc/_adapter/_client_credentials.h
+++ b/src/core/census/context.c
@@ -31,19 +31,29 @@
*
*/
-#ifndef _ADAPTER__CLIENT_CREDENTIALS_H_
-#define _ADAPTER__CLIENT_CREDENTIALS_H_
+#include "context.h"
-#include <Python.h>
-#include <grpc/grpc_security.h>
+#include <string.h>
+#include <grpc/census.h>
+#include <grpc/support/alloc.h>
-typedef struct {
- PyObject_HEAD
- grpc_credentials *c_client_credentials;
-} ClientCredentials;
+/* Placeholder implementation only. */
-extern PyTypeObject pygrpc_ClientCredentialsType;
+size_t census_context_serialize(const census_context *context, char *buffer,
+ size_t buf_size) {
+ /* TODO(aveitch): implement serialization */
+ return 0;
+}
-int pygrpc_add_client_credentials(PyObject *module);
+int census_context_deserialize(const char *buffer, census_context **context) {
+ int ret = 0;
+ if (buffer != NULL) {
+ /* TODO(aveitch): implement deserialization. */
+ ret = 1;
+ }
+ *context = gpr_malloc(sizeof(census_context));
+ memset(*context, 0, sizeof(census_context));
+ return ret;
+}
-#endif /* _ADAPTER__CLIENT_CREDENTIALS_H_ */
+void census_context_destroy(census_context *context) { gpr_free(context); }
diff --git a/src/python/src/grpc/_adapter/_completion_queue.h b/src/core/census/context.h
index 516694daa1..d43a69f7e5 100644
--- a/src/python/src/grpc/_adapter/_completion_queue.h
+++ b/src/core/census/context.h
@@ -31,19 +31,19 @@
*
*/
-#ifndef _ADAPTER__COMPLETION_QUEUE_H_
-#define _ADAPTER__COMPLETION_QUEUE_H_
+#ifndef GRPC_INTERNAL_CORE_CENSUS_CONTEXT_H
+#define GRPC_INTERNAL_CORE_CENSUS_CONTEXT_H
-#include <Python.h>
-#include <grpc/grpc.h>
+#include <grpc/census.h>
-typedef struct {
- PyObject_HEAD
- grpc_completion_queue *c_completion_queue;
-} CompletionQueue;
+/* census_context is the in-memory representation of information needed to
+ * maintain tracing, RPC statistics and resource usage information. */
+struct census_context {
+ gpr_uint64 op_id; /* Operation identifier - unique per-context */
+ gpr_uint64 trace_id; /* Globally unique trace identifier */
+ /* TODO(aveitch) Add census tags:
+ const census_tag_set *tags;
+ */
+};
-extern PyTypeObject pygrpc_CompletionQueueType;
-
-int pygrpc_add_completion_queue(PyObject *module);
-
-#endif /* _ADAPTER__COMPLETION_QUEUE_H_ */
+#endif /* GRPC_INTERNAL_CORE_CENSUS_CONTEXT_H */
diff --git a/src/python/src/grpc/_adapter/_channel.h b/src/core/census/grpc_context.c
index 65894939a2..cf2353199f 100644
--- a/src/python/src/grpc/_adapter/_channel.h
+++ b/src/core/census/grpc_context.c
@@ -31,19 +31,15 @@
*
*/
-#ifndef _ADAPTER__CHANNEL_H_
-#define _ADAPTER__CHANNEL_H_
+#include <grpc/census.h>
+#include "src/core/census/grpc_context.h"
-#include <Python.h>
-#include <grpc/grpc.h>
+void *grpc_census_context_create() {
+ census_context *context;
+ census_context_deserialize(NULL, &context);
+ return (void *)context;
+}
-typedef struct {
- PyObject_HEAD
- grpc_channel *c_channel;
-} Channel;
-
-extern PyTypeObject pygrpc_ChannelType;
-
-int pygrpc_add_channel(PyObject *module);
-
-#endif /* _ADAPTER__CHANNEL_H_ */
+void grpc_census_context_destroy(void *context) {
+ census_context_destroy((census_context *)context);
+}
diff --git a/src/python/src/grpc/_adapter/_error.h b/src/core/census/grpc_context.h
index 6988b1c95e..f610f6ce21 100644
--- a/src/python/src/grpc/_adapter/_error.h
+++ b/src/core/census/grpc_context.h
@@ -31,12 +31,12 @@
*
*/
-#ifndef _ADAPTER__ERROR_H_
-#define _ADAPTER__ERROR_H_
+/* GRPC <--> CENSUS context interface */
-#include <Python.h>
-#include <grpc/grpc.h>
+#ifndef CENSUS_GRPC_CONTEXT_H
+#define CENSUS_GRPC_CONTEXT_H
-const PyObject *pygrpc_translate_call_error(grpc_call_error call_error);
+void *grpc_census_context_create();
+void grpc_census_context_destroy(void *context);
-#endif /* _ADAPTER__ERROR_H_ */
+#endif /* CENSUS_GRPC_CONTEXT_H */
diff --git a/src/python/src/grpc/_adapter/_server.h b/src/core/census/initialize.c
index d31d4e678b..057ac78ee7 100644
--- a/src/python/src/grpc/_adapter/_server.h
+++ b/src/core/census/initialize.c
@@ -31,21 +31,20 @@
*
*/
-#ifndef _ADAPTER__SERVER_H_
-#define _ADAPTER__SERVER_H_
+#include <grpc/census.h>
-#include <Python.h>
-#include <grpc/grpc.h>
+static int census_fns_enabled = CENSUS_NONE;
-#include "grpc/_adapter/_completion_queue.h"
+int census_initialize(int functions) {
+ if (census_fns_enabled != CENSUS_NONE) {
+ return 1;
+ }
+ if (functions != CENSUS_NONE) {
+ return 1;
+ } else {
+ census_fns_enabled = functions;
+ return 0;
+ }
+}
-typedef struct {
- PyObject_HEAD
-
- CompletionQueue *completion_queue;
- grpc_server *c_server;
-} Server;
-
-int pygrpc_add_server(PyObject *module);
-
-#endif /* _ADAPTER__SERVER_H_ */
+void census_shutdown() { census_fns_enabled = CENSUS_NONE; }
diff --git a/src/core/channel/child_channel.c b/src/core/channel/child_channel.c
index bdd4147117..cdc1c11681 100644
--- a/src/core/channel/child_channel.c
+++ b/src/core/channel/child_channel.c
@@ -58,6 +58,9 @@ typedef struct {
gpr_uint8 sending_farewell;
/* have we sent farewell (goaway + disconnect) */
gpr_uint8 sent_farewell;
+
+ grpc_iomgr_closure finally_destroy_channel_closure;
+ grpc_iomgr_closure send_farewells_closure;
} lb_channel_data;
typedef struct { grpc_child_channel *channel; } lb_call_data;
@@ -222,12 +225,16 @@ static void maybe_destroy_channel(grpc_child_channel *channel) {
lb_channel_data *chand = LINK_BACK_ELEM_FROM_CHANNEL(channel)->channel_data;
if (chand->destroyed && chand->disconnected && chand->active_calls == 0 &&
!chand->sending_farewell && !chand->calling_back) {
- grpc_iomgr_add_callback(finally_destroy_channel, channel);
+ chand->finally_destroy_channel_closure.cb = finally_destroy_channel;
+ chand->finally_destroy_channel_closure.cb_arg = channel;
+ grpc_iomgr_add_callback(&chand->finally_destroy_channel_closure);
} else if (chand->destroyed && !chand->disconnected &&
chand->active_calls == 0 && !chand->sending_farewell &&
!chand->sent_farewell) {
chand->sending_farewell = 1;
- grpc_iomgr_add_callback(send_farewells, channel);
+ chand->send_farewells_closure.cb = send_farewells;
+ chand->send_farewells_closure.cb_arg = channel;
+ grpc_iomgr_add_callback(&chand->send_farewells_closure);
}
}
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index 4a55049b0b..a43e3ed278 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -90,6 +90,7 @@ static grpc_fd *alloc_fd(int fd) {
gpr_mu_init(&r->set_state_mu);
gpr_mu_init(&r->watcher_mu);
}
+
gpr_atm_rel_store(&r->refst, 1);
gpr_atm_rel_store(&r->readst, NOT_READY);
gpr_atm_rel_store(&r->writest, NOT_READY);
@@ -133,8 +134,7 @@ static void unref_by(grpc_fd *fd, int n) {
#endif
old = gpr_atm_full_fetch_add(&fd->refst, -n);
if (old == n) {
- close(fd->fd);
- grpc_iomgr_add_callback(fd->on_done, fd->on_done_user_data);
+ grpc_iomgr_add_callback(&fd->on_done_closure);
freelist_fd(fd);
grpc_iomgr_unregister_object(&fd->iomgr_object);
} else {
@@ -196,8 +196,8 @@ static void wake_all_watchers_locked(grpc_fd *fd) {
}
void grpc_fd_orphan(grpc_fd *fd, grpc_iomgr_cb_func on_done, void *user_data) {
- fd->on_done = on_done ? on_done : do_nothing;
- fd->on_done_user_data = user_data;
+ grpc_iomgr_closure_init(&fd->on_done_closure, on_done ? on_done : do_nothing,
+ user_data);
shutdown(fd->fd, SHUT_RDWR);
REF_BY(fd, 1, "orphan"); /* remove active status, but keep referenced */
gpr_mu_lock(&fd->watcher_mu);
@@ -225,21 +225,20 @@ void grpc_fd_unref(grpc_fd *fd) {
}
#endif
-static void make_callback(grpc_iomgr_cb_func cb, void *arg, int success,
+static void process_callback(grpc_iomgr_closure *closure, int success,
int allow_synchronous_callback) {
if (allow_synchronous_callback) {
- cb(arg, success);
+ closure->cb(closure->cb_arg, success);
} else {
- grpc_iomgr_add_delayed_callback(cb, arg, success);
+ grpc_iomgr_add_delayed_callback(closure, success);
}
}
-static void make_callbacks(grpc_iomgr_closure *callbacks, size_t n, int success,
- int allow_synchronous_callback) {
+static void process_callbacks(grpc_iomgr_closure *callbacks, size_t n,
+ int success, int allow_synchronous_callback) {
size_t i;
for (i = 0; i < n; i++) {
- make_callback(callbacks[i].cb, callbacks[i].cb_arg, success,
- allow_synchronous_callback);
+ process_callback(callbacks + i, success, allow_synchronous_callback);
}
}
@@ -264,10 +263,9 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure,
/* swap was unsuccessful due to an intervening set_ready call.
Fall through to the READY code below */
case READY:
- assert(gpr_atm_no_barrier_load(st) == READY);
+ GPR_ASSERT(gpr_atm_no_barrier_load(st) == READY);
gpr_atm_rel_store(st, NOT_READY);
- make_callback(closure->cb, closure->cb_arg,
- !gpr_atm_acq_load(&fd->shutdown),
+ process_callback(closure, !gpr_atm_acq_load(&fd->shutdown),
allow_synchronous_callback);
return;
default: /* WAITING */
@@ -281,7 +279,7 @@ static void notify_on(grpc_fd *fd, gpr_atm *st, grpc_iomgr_closure *closure,
abort();
}
-static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure *callbacks,
+static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure **callbacks,
size_t *ncallbacks) {
gpr_intptr state = gpr_atm_acq_load(st);
@@ -299,9 +297,9 @@ static void set_ready_locked(gpr_atm *st, grpc_iomgr_closure *callbacks,
Fall through to the WAITING code below */
state = gpr_atm_acq_load(st);
default: /* waiting */
- assert(gpr_atm_no_barrier_load(st) != READY &&
- gpr_atm_no_barrier_load(st) != NOT_READY);
- callbacks[(*ncallbacks)++] = *(grpc_iomgr_closure *)state;
+ GPR_ASSERT(gpr_atm_no_barrier_load(st) != READY &&
+ gpr_atm_no_barrier_load(st) != NOT_READY);
+ callbacks[(*ncallbacks)++] = (grpc_iomgr_closure *)state;
gpr_atm_rel_store(st, NOT_READY);
return;
}
@@ -312,25 +310,30 @@ static void set_ready(grpc_fd *fd, gpr_atm *st,
/* only one set_ready can be active at once (but there may be a racing
notify_on) */
int success;
- grpc_iomgr_closure cb;
+ grpc_iomgr_closure* closure;
size_t ncb = 0;
+
gpr_mu_lock(&fd->set_state_mu);
- set_ready_locked(st, &cb, &ncb);
+ set_ready_locked(st, &closure, &ncb);
gpr_mu_unlock(&fd->set_state_mu);
success = !gpr_atm_acq_load(&fd->shutdown);
- make_callbacks(&cb, ncb, success, allow_synchronous_callback);
+ GPR_ASSERT(ncb <= 1);
+ if (ncb > 0) {
+ process_callbacks(closure, ncb, success, allow_synchronous_callback);
+ }
}
void grpc_fd_shutdown(grpc_fd *fd) {
- grpc_iomgr_closure cb[2];
size_t ncb = 0;
gpr_mu_lock(&fd->set_state_mu);
GPR_ASSERT(!gpr_atm_no_barrier_load(&fd->shutdown));
gpr_atm_rel_store(&fd->shutdown, 1);
- set_ready_locked(&fd->readst, cb, &ncb);
- set_ready_locked(&fd->writest, cb, &ncb);
+ set_ready_locked(&fd->readst, &fd->shutdown_closures[0], &ncb);
+ set_ready_locked(&fd->writest, &fd->shutdown_closures[0], &ncb);
gpr_mu_unlock(&fd->set_state_mu);
- make_callbacks(cb, ncb, 0, 0);
+ GPR_ASSERT(ncb <= 2);
+ process_callbacks(fd->shutdown_closures[0], ncb, 0 /* GPR_FALSE */,
+ 0 /* GPR_FALSE */);
}
void grpc_fd_notify_on_read(grpc_fd *fd, grpc_iomgr_closure *closure) {
diff --git a/src/core/iomgr/fd_posix.h b/src/core/iomgr/fd_posix.h
index d1d6c8fe1f..70992aead5 100644
--- a/src/core/iomgr/fd_posix.h
+++ b/src/core/iomgr/fd_posix.h
@@ -40,11 +40,6 @@
#include <grpc/support/sync.h>
#include <grpc/support/time.h>
-typedef struct {
- grpc_iomgr_cb_func cb;
- void *cb_arg;
-} grpc_iomgr_closure;
-
typedef struct grpc_fd grpc_fd;
typedef struct grpc_fd_watcher {
@@ -96,10 +91,11 @@ struct grpc_fd {
gpr_atm readst;
gpr_atm writest;
- grpc_iomgr_cb_func on_done;
- void *on_done_user_data;
struct grpc_fd *freelist_next;
+ grpc_iomgr_closure on_done_closure;
+ grpc_iomgr_closure *shutdown_closures[2];
+
grpc_iomgr_object iomgr_object;
};
diff --git a/src/core/iomgr/iomgr.c b/src/core/iomgr/iomgr.c
index 8266b9247e..c6c44658df 100644
--- a/src/core/iomgr/iomgr.c
+++ b/src/core/iomgr/iomgr.c
@@ -43,17 +43,10 @@
#include <grpc/support/thd.h>
#include <grpc/support/sync.h>
-typedef struct delayed_callback {
- grpc_iomgr_cb_func cb;
- void *cb_arg;
- int success;
- struct delayed_callback *next;
-} delayed_callback;
-
static gpr_mu g_mu;
static gpr_cv g_rcv;
-static delayed_callback *g_cbs_head = NULL;
-static delayed_callback *g_cbs_tail = NULL;
+static grpc_iomgr_closure *g_cbs_head = NULL;
+static grpc_iomgr_closure *g_cbs_tail = NULL;
static int g_shutdown;
static gpr_event g_background_callback_executor_done;
static grpc_iomgr_object g_root_object;
@@ -67,12 +60,11 @@ static void background_callback_executor(void *ignored) {
gpr_timespec short_deadline =
gpr_time_add(gpr_now(), gpr_time_from_millis(100));
if (g_cbs_head) {
- delayed_callback *cb = g_cbs_head;
- g_cbs_head = cb->next;
+ grpc_iomgr_closure *closure = g_cbs_head;
+ g_cbs_head = closure->next;
if (!g_cbs_head) g_cbs_tail = NULL;
gpr_mu_unlock(&g_mu);
- cb->cb(cb->cb_arg, cb->success);
- gpr_free(cb);
+ closure->cb(closure->cb_arg, closure->success);
gpr_mu_lock(&g_mu);
} else if (grpc_alarm_check(&g_mu, gpr_now(), &deadline)) {
} else {
@@ -114,8 +106,8 @@ static size_t count_objects(void) {
}
void grpc_iomgr_shutdown(void) {
- delayed_callback *cb;
grpc_iomgr_object *obj;
+ grpc_iomgr_closure *closure;
gpr_timespec shutdown_deadline =
gpr_time_add(gpr_now(), gpr_time_from_seconds(10));
@@ -135,13 +127,12 @@ void grpc_iomgr_shutdown(void) {
}
if (g_cbs_head) {
do {
- cb = g_cbs_head;
- g_cbs_head = cb->next;
+ closure = g_cbs_head;
+ g_cbs_head = closure->next;
if (!g_cbs_head) g_cbs_tail = NULL;
gpr_mu_unlock(&g_mu);
- cb->cb(cb->cb_arg, 0);
- gpr_free(cb);
+ closure->cb(closure->cb_arg, 0);
gpr_mu_lock(&g_mu);
} while (g_cbs_head);
continue;
@@ -202,19 +193,23 @@ void grpc_iomgr_unregister_object(grpc_iomgr_object *obj) {
gpr_mu_unlock(&g_mu);
}
-void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg,
- int success) {
- delayed_callback *dcb = gpr_malloc(sizeof(delayed_callback));
- dcb->cb = cb;
- dcb->cb_arg = cb_arg;
- dcb->success = success;
+
+void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb,
+ void *cb_arg) {
+ closure->cb = cb;
+ closure->cb_arg = cb_arg;
+ closure->next = NULL;
+}
+
+void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *closure, int success) {
+ closure->success = success;
gpr_mu_lock(&g_mu);
- dcb->next = NULL;
+ closure->next = NULL;
if (!g_cbs_tail) {
- g_cbs_head = g_cbs_tail = dcb;
+ g_cbs_head = g_cbs_tail = closure;
} else {
- g_cbs_tail->next = dcb;
- g_cbs_tail = dcb;
+ g_cbs_tail->next = closure;
+ g_cbs_tail = closure;
}
if (g_shutdown) {
gpr_cv_signal(&g_rcv);
@@ -222,25 +217,27 @@ void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg,
gpr_mu_unlock(&g_mu);
}
-void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg) {
- grpc_iomgr_add_delayed_callback(cb, cb_arg, 1);
+
+void grpc_iomgr_add_callback(grpc_iomgr_closure *closure) {
+ grpc_iomgr_add_delayed_callback(closure, 1 /* GPR_TRUE */);
}
+
int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) {
int n = 0;
gpr_mu *retake_mu = NULL;
- delayed_callback *cb;
+ grpc_iomgr_closure *closure;
for (;;) {
/* check for new work */
if (!gpr_mu_trylock(&g_mu)) {
break;
}
- cb = g_cbs_head;
- if (!cb) {
+ closure = g_cbs_head;
+ if (!closure) {
gpr_mu_unlock(&g_mu);
break;
}
- g_cbs_head = cb->next;
+ g_cbs_head = closure->next;
if (!g_cbs_head) g_cbs_tail = NULL;
gpr_mu_unlock(&g_mu);
/* if we have a mutex to drop, do so before executing work */
@@ -249,8 +246,7 @@ int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success) {
retake_mu = drop_mu;
drop_mu = NULL;
}
- cb->cb(cb->cb_arg, success && cb->success);
- gpr_free(cb);
+ closure->cb(closure->cb_arg, success && closure->success);
n++;
}
if (retake_mu) {
diff --git a/src/core/iomgr/iomgr.h b/src/core/iomgr/iomgr.h
index 1f5d23fdda..a10e481e48 100644
--- a/src/core/iomgr/iomgr.h
+++ b/src/core/iomgr/iomgr.h
@@ -34,14 +34,43 @@
#ifndef GRPC_INTERNAL_CORE_IOMGR_IOMGR_H
#define GRPC_INTERNAL_CORE_IOMGR_IOMGR_H
-/* gRPC Callback definition */
+/** gRPC Callback definition.
+ *
+ * \param arg Arbitrary input.
+ * \param success An indication on the state of the iomgr. On false, cleanup
+ * actions should be taken (eg, shutdown). */
typedef void (*grpc_iomgr_cb_func)(void *arg, int success);
+/** A closure over a grpc_iomgr_cb_func. */
+typedef struct grpc_iomgr_closure {
+ /** Bound callback. */
+ grpc_iomgr_cb_func cb;
+
+ /** Arguments to be passed to "cb". */
+ void *cb_arg;
+
+ /** Internal. A boolean indication to "cb" on the state of the iomgr.
+ * For instance, closures created during a shutdown would have this field set
+ * to false. */
+ int success;
+
+ /**< Internal. Do not touch */
+ struct grpc_iomgr_closure *next;
+} grpc_iomgr_closure;
+
+/** Initializes \a closure with \a cb and \a cb_arg. */
+void grpc_iomgr_closure_init(grpc_iomgr_closure *closure, grpc_iomgr_cb_func cb,
+ void *cb_arg);
+
+/** Initializes the iomgr. */
void grpc_iomgr_init(void);
+
+/** Signals the intention to shutdown the iomgr. */
void grpc_iomgr_shutdown(void);
-/* This function is called from within a callback or from anywhere else
- and causes the invocation of a callback at some point in the future */
-void grpc_iomgr_add_callback(grpc_iomgr_cb_func cb, void *cb_arg);
+/** Registers a closure to be invoked at some point in the future.
+ *
+ * Can be called from within a callback or from anywhere else */
+void grpc_iomgr_add_callback(grpc_iomgr_closure *closure);
#endif /* GRPC_INTERNAL_CORE_IOMGR_IOMGR_H */
diff --git a/src/core/iomgr/iomgr_internal.h b/src/core/iomgr/iomgr_internal.h
index 54eadf1edc..6c1e0e1799 100644
--- a/src/core/iomgr/iomgr_internal.h
+++ b/src/core/iomgr/iomgr_internal.h
@@ -35,7 +35,6 @@
#define GRPC_INTERNAL_CORE_IOMGR_IOMGR_INTERNAL_H
#include "src/core/iomgr/iomgr.h"
-#include "src/core/iomgr/iomgr_internal.h"
#include <grpc/support/sync.h>
typedef struct grpc_iomgr_object {
@@ -45,8 +44,7 @@ typedef struct grpc_iomgr_object {
} grpc_iomgr_object;
int grpc_maybe_call_delayed_callbacks(gpr_mu *drop_mu, int success);
-void grpc_iomgr_add_delayed_callback(grpc_iomgr_cb_func cb, void *cb_arg,
- int success);
+void grpc_iomgr_add_delayed_callback(grpc_iomgr_closure *iocb, int success);
void grpc_iomgr_register_object(grpc_iomgr_object *obj, const char *name);
void grpc_iomgr_unregister_object(grpc_iomgr_object *obj);
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 96f8acfd7e..6ffba4dbac 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -193,6 +193,7 @@ typedef struct grpc_unary_promote_args {
const grpc_pollset_vtable *original_vtable;
grpc_pollset *pollset;
grpc_fd *fd;
+ grpc_iomgr_closure promotion_closure;
} grpc_unary_promote_args;
static void basic_do_promote(void *args, int success) {
@@ -215,7 +216,7 @@ static void basic_do_promote(void *args, int success) {
/* First we need to ensure that nobody is polling concurrently */
if (pollset->counter != 0) {
grpc_pollset_kick(pollset);
- grpc_iomgr_add_callback(basic_do_promote, up_args);
+ grpc_iomgr_add_callback(&up_args->promotion_closure);
gpr_mu_unlock(&pollset->mu);
return;
}
@@ -302,7 +303,9 @@ static void basic_pollset_add_fd(grpc_pollset *pollset, grpc_fd *fd) {
up_args->pollset = pollset;
up_args->fd = fd;
up_args->original_vtable = pollset->vtable;
- grpc_iomgr_add_callback(basic_do_promote, up_args);
+ up_args->promotion_closure.cb = unary_poll_do_promote;
+ up_args->promotion_closure.cb_arg = up_args;
+ grpc_iomgr_add_callback(&up_args->promotion_closure);
grpc_pollset_kick(pollset);
}
diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c
index 8484bb1ff0..88db774cc4 100644
--- a/src/core/iomgr/pollset_windows.c
+++ b/src/core/iomgr/pollset_windows.c
@@ -64,15 +64,15 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
gpr_timespec now;
now = gpr_now();
if (gpr_time_cmp(now, deadline) > 0) {
- return 0;
+ return 0 /* GPR_FALSE */;
}
- if (grpc_maybe_call_delayed_callbacks(NULL, 1)) {
- return 1;
+ if (grpc_maybe_call_delayed_callbacks(NULL, 1 /* GPR_TRUE */)) {
+ return 1 /* GPR_TRUE */;
}
if (grpc_alarm_check(NULL, now, &deadline)) {
- return 1;
+ return 1 /* GPR_TRUE */;
}
- return 0;
+ return 0 /* GPR_FALSE */;
}
void grpc_pollset_kick(grpc_pollset *p) { }
diff --git a/src/core/iomgr/socket_windows.c b/src/core/iomgr/socket_windows.c
index ee5150a696..805fa8a4fc 100644
--- a/src/core/iomgr/socket_windows.c
+++ b/src/core/iomgr/socket_windows.c
@@ -39,7 +39,6 @@
#include <grpc/support/log.h>
#include "src/core/iomgr/iocp_windows.h"
-#include "src/core/iomgr/iomgr.h"
#include "src/core/iomgr/iomgr_internal.h"
#include "src/core/iomgr/pollset.h"
#include "src/core/iomgr/pollset_windows.h"
@@ -64,13 +63,15 @@ int grpc_winsocket_shutdown(grpc_winsocket *socket) {
gpr_mu_lock(&socket->state_mu);
if (socket->read_info.cb) {
callbacks_set++;
- grpc_iomgr_add_delayed_callback(socket->read_info.cb,
- socket->read_info.opaque, 0);
+ grpc_iomgr_closure_init(&socket->shutdown_closure, socket->read_info.cb,
+ socket->read_info.opaque);
+ grpc_iomgr_add_delayed_callback(&socket->shutdown_closure, 0);
}
if (socket->write_info.cb) {
callbacks_set++;
- grpc_iomgr_add_delayed_callback(socket->write_info.cb,
- socket->write_info.opaque, 0);
+ grpc_iomgr_closure_init(&socket->shutdown_closure, socket->write_info.cb,
+ socket->write_info.opaque);
+ grpc_iomgr_add_delayed_callback(&socket->shutdown_closure, 0);
}
gpr_mu_unlock(&socket->state_mu);
return callbacks_set;
diff --git a/src/core/iomgr/socket_windows.h b/src/core/iomgr/socket_windows.h
index b27eb14219..d5fee39604 100644
--- a/src/core/iomgr/socket_windows.h
+++ b/src/core/iomgr/socket_windows.h
@@ -39,6 +39,8 @@
#include <grpc/support/sync.h>
#include <grpc/support/atm.h>
+#include "src/core/iomgr/iomgr.h"
+
/* This holds the data for an outstanding read or write on a socket.
The mutex to protect the concurrent access to that data is the one
inside the winsocket wrapper. */
@@ -93,6 +95,8 @@ typedef struct grpc_winsocket {
there is a pending operation that the IO Completion Port will have to
wait for. The socket will be collected at that time. */
int orphan;
+
+ grpc_iomgr_closure shutdown_closure;
} grpc_winsocket;
/* Create a wrapped windows handle. This takes ownership of it, meaning that
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index cd6b2ecae6..2f19f9d442 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -280,6 +280,8 @@ typedef struct {
grpc_iomgr_closure read_closure;
grpc_iomgr_closure write_closure;
+
+ grpc_iomgr_closure handle_read_closure;
} grpc_tcp;
static void grpc_tcp_handle_read(void *arg /* grpc_tcp */, int success);
@@ -443,7 +445,8 @@ static void grpc_tcp_notify_on_read(grpc_endpoint *ep, grpc_endpoint_read_cb cb,
tcp->finished_edge = 0;
grpc_fd_notify_on_read(tcp->em_fd, &tcp->read_closure);
} else {
- grpc_iomgr_add_callback(grpc_tcp_handle_read, tcp);
+ tcp->handle_read_closure.cb_arg = tcp;
+ grpc_iomgr_add_callback(&tcp->handle_read_closure);
}
}
@@ -592,6 +595,8 @@ grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size) {
tcp->read_closure.cb_arg = tcp;
tcp->write_closure.cb = grpc_tcp_handle_write;
tcp->write_closure.cb_arg = tcp;
+
+ tcp->handle_read_closure.cb = grpc_tcp_handle_read;
return &tcp->base;
}
diff --git a/src/core/security/credentials.c b/src/core/security/credentials.c
index 3894f1be4f..f3d0cf5452 100644
--- a/src/core/security/credentials.c
+++ b/src/core/security/credentials.c
@@ -54,6 +54,7 @@
typedef struct {
grpc_credentials *creds;
grpc_credentials_metadata_cb cb;
+ grpc_iomgr_closure *on_simulated_token_fetch_done_closure;
void *user_data;
} grpc_credentials_metadata_request;
@@ -65,6 +66,8 @@ grpc_credentials_metadata_request_create(grpc_credentials *creds,
gpr_malloc(sizeof(grpc_credentials_metadata_request));
r->creds = grpc_credentials_ref(creds);
r->cb = cb;
+ r->on_simulated_token_fetch_done_closure =
+ gpr_malloc(sizeof(grpc_iomgr_closure));
r->user_data = user_data;
return r;
}
@@ -72,6 +75,7 @@ grpc_credentials_metadata_request_create(grpc_credentials *creds,
static void grpc_credentials_metadata_request_destroy(
grpc_credentials_metadata_request *r) {
grpc_credentials_unref(r->creds);
+ gpr_free(r->on_simulated_token_fetch_done_closure);
gpr_free(r);
}
@@ -843,9 +847,11 @@ static void fake_oauth2_get_request_metadata(grpc_credentials *creds,
grpc_fake_oauth2_credentials *c = (grpc_fake_oauth2_credentials *)creds;
if (c->is_async) {
- grpc_iomgr_add_callback(
- on_simulated_token_fetch_done,
- grpc_credentials_metadata_request_create(creds, cb, user_data));
+ grpc_credentials_metadata_request *cb_arg =
+ grpc_credentials_metadata_request_create(creds, cb, user_data);
+ grpc_iomgr_closure_init(cb_arg->on_simulated_token_fetch_done_closure,
+ on_simulated_token_fetch_done, cb_arg);
+ grpc_iomgr_add_callback(cb_arg->on_simulated_token_fetch_done_closure);
} else {
cb(user_data, c->access_token_md->entries, 1, GRPC_CREDENTIALS_OK);
}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 138cd084c5..1e066ea42b 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -31,6 +31,7 @@
*
*/
+#include "src/core/census/grpc_context.h"
#include "src/core/surface/call.h"
#include "src/core/channel/channel_stack.h"
#include "src/core/iomgr/alarm.h"
@@ -228,6 +229,7 @@ struct grpc_call {
gpr_slice_buffer incoming_message;
gpr_uint32 incoming_message_length;
+ grpc_iomgr_closure destroy_closure;
};
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
@@ -273,6 +275,8 @@ grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
if (call->is_client) {
call->request_set[GRPC_IOREQ_SEND_TRAILING_METADATA] = REQSET_DONE;
call->request_set[GRPC_IOREQ_SEND_STATUS] = REQSET_DONE;
+ call->context[GRPC_CONTEXT_TRACING].value = grpc_census_context_create();
+ call->context[GRPC_CONTEXT_TRACING].destroy = grpc_census_context_destroy;
}
GPR_ASSERT(add_initial_metadata_count < MAX_SEND_INITIAL_METADATA_COUNT);
for (i = 0; i < add_initial_metadata_count; i++) {
@@ -374,7 +378,9 @@ void grpc_call_internal_unref(grpc_call *c, int allow_immediate_deletion) {
if (allow_immediate_deletion) {
destroy_call(c, 1);
} else {
- grpc_iomgr_add_callback(destroy_call, c);
+ c->destroy_closure.cb = destroy_call;
+ c->destroy_closure.cb_arg = c;
+ grpc_iomgr_add_callback(&c->destroy_closure);
}
}
}
@@ -1309,8 +1315,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func, tag);
}
-void grpc_call_context_set(grpc_call *call, grpc_context_index elem, void *value,
- void (*destroy)(void *value)) {
+void grpc_call_context_set(grpc_call *call, grpc_context_index elem,
+ void *value, void (*destroy)(void *value)) {
if (call->context[elem].destroy) {
call->context[elem].destroy(call->context[elem].value);
}
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index d8fd442702..c6cfdb014c 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -61,6 +61,7 @@ struct grpc_channel {
gpr_mu registered_call_mu;
registered_call *registered_calls;
+ grpc_iomgr_closure destroy_closure;
};
#define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1))
@@ -204,8 +205,10 @@ void grpc_channel_internal_unref(grpc_channel *c, const char *reason) {
#else
void grpc_channel_internal_unref(grpc_channel *c) {
#endif
- if (gpr_unref(&c->refs)) {
- grpc_iomgr_add_callback(destroy_channel, c);
+ if (gpr_unref(&channel->refs)) {
+ channel->destroy_closure.cb = destroy_channel;
+ channel->destroy_closure.cb_arg = channel;
+ grpc_iomgr_add_callback(&channel->destroy_closure);
}
}
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index 061669ec6e..a49b754d9a 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -196,9 +196,10 @@ grpc_channel *grpc_channel_create(const char *target,
const grpc_channel_filter *filters[MAX_FILTERS];
int n = 0;
filters[n++] = &grpc_client_surface_filter;
+ /* TODO(census)
if (grpc_channel_args_is_census_enabled(args)) {
filters[n++] = &grpc_client_census_filter;
- }
+ } */
filters[n++] = &grpc_client_channel_filter;
GPR_ASSERT(n <= MAX_FILTERS);
channel = grpc_channel_create_from_filters(filters, n, args, mdctx, 1);
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index d6eb9b2c24..ac6871c6f2 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -31,11 +31,11 @@
*
*/
+#include <grpc/census.h>
#include <grpc/grpc.h>
#include "src/core/channel/channel_stack.h"
#include "src/core/debug/trace.h"
#include "src/core/iomgr/iomgr.h"
-#include "src/core/statistics/census_interface.h"
#include "src/core/profiling/timers.h"
#include "src/core/surface/call.h"
#include "src/core/surface/init.h"
@@ -64,7 +64,9 @@ void grpc_init(void) {
grpc_security_pre_init();
grpc_iomgr_init();
grpc_tracer_init("GRPC_TRACE");
- census_init();
+ if (census_initialize(CENSUS_NONE)) {
+ gpr_log(GPR_ERROR, "Could not initialize census.");
+ }
grpc_timers_global_init();
}
gpr_mu_unlock(&g_init_mu);
diff --git a/src/core/surface/secure_channel_create.c b/src/core/surface/secure_channel_create.c
index b9c6d5f760..3875eb0614 100644
--- a/src/core/surface/secure_channel_create.c
+++ b/src/core/surface/secure_channel_create.c
@@ -235,9 +235,10 @@ grpc_channel *grpc_secure_channel_create(grpc_credentials *creds,
new_args_from_connector != NULL ? new_args_from_connector : args,
&connector_arg);
filters[n++] = &grpc_client_surface_filter;
+ /* TODO(census)
if (grpc_channel_args_is_census_enabled(args)) {
filters[n++] = &grpc_client_census_filter;
- }
+ } */
filters[n++] = &grpc_client_channel_filter;
GPR_ASSERT(n <= MAX_FILTERS);
channel = grpc_channel_create_from_filters(filters, n, args_copy, mdctx, 1);
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 99e3ee6d2e..051c325b83 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -123,6 +123,8 @@ struct channel_data {
channel_registered_method *registered_methods;
gpr_uint32 registered_method_slots;
gpr_uint32 registered_method_max_probes;
+ grpc_iomgr_closure finish_shutdown_channel_closure;
+ grpc_iomgr_closure finish_destroy_channel_closure;
};
typedef struct shutdown_tag {
@@ -184,6 +186,8 @@ struct call_data {
void (*on_done_recv)(void *user_data, int success);
void *recv_user_data;
+ grpc_iomgr_closure kill_zombie_closure;
+
call_data **root[CALL_LIST_COUNT];
call_link links[CALL_LIST_COUNT];
};
@@ -317,7 +321,9 @@ static void destroy_channel(channel_data *chand) {
orphan_channel(chand);
server_ref(chand->server);
maybe_finish_shutdown(chand->server);
- grpc_iomgr_add_callback(finish_destroy_channel, chand);
+ chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
+ chand->finish_destroy_channel_closure.cb_arg = chand;
+ grpc_iomgr_add_callback(&chand->finish_destroy_channel_closure);
}
static void finish_start_new_rpc_and_unlock(grpc_server *server,
@@ -473,7 +479,8 @@ static void server_on_recv(void *ptr, int success) {
gpr_mu_lock(&chand->server->mu);
if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED;
- grpc_iomgr_add_callback(kill_zombie, elem);
+ grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
+ grpc_iomgr_add_callback(&calld->kill_zombie_closure);
}
gpr_mu_unlock(&chand->server->mu);
break;
@@ -481,11 +488,14 @@ static void server_on_recv(void *ptr, int success) {
gpr_mu_lock(&chand->server->mu);
if (calld->state == NOT_STARTED) {
calld->state = ZOMBIED;
- grpc_iomgr_add_callback(kill_zombie, elem);
+ grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
+ grpc_iomgr_add_callback(&calld->kill_zombie_closure);
} else if (calld->state == PENDING) {
call_list_remove(calld, PENDING_START);
calld->state = ZOMBIED;
- grpc_iomgr_add_callback(kill_zombie, elem);
+ grpc_iomgr_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
+ grpc_iomgr_add_callback(&calld->kill_zombie_closure);
+
}
if (call_list_remove(calld, ALL_CALLS)) {
decrement_call_count(chand);
@@ -580,16 +590,16 @@ static void finish_shutdown_channel(void *p, int success) {
gpr_free(sca);
}
-static void shutdown_channel(channel_data *chand, int send_goaway,
- int send_disconnect) {
+static void shutdown_channel(channel_data *chand) {
shutdown_channel_args *sca;
- gpr_log(GPR_DEBUG, "shutdown_channel: %p %d %d", chand, send_goaway, send_disconnect);
GRPC_CHANNEL_INTERNAL_REF(chand->channel, "shutdown");
sca = gpr_malloc(sizeof(shutdown_channel_args));
sca->chand = chand;
sca->send_goaway = send_goaway;
sca->send_disconnect = send_disconnect;
- grpc_iomgr_add_callback(finish_shutdown_channel, sca);
+ chand->finish_shutdown_channel_closure.cb = finish_shutdown_channel;
+ chand->finish_shutdown_channel_closure.cb_arg = sca;
+ grpc_iomgr_add_callback(&chand->finish_shutdown_channel_closure);
}
static void init_call_elem(grpc_call_element *elem,
@@ -708,7 +718,9 @@ grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
size_t filter_count,
const grpc_channel_args *args) {
size_t i;
- int census_enabled = grpc_channel_args_is_census_enabled(args);
+ /* TODO(census): restore this once we finalize census filter etc.
+ int census_enabled = grpc_channel_args_is_census_enabled(args); */
+ int census_enabled = 0;
grpc_server *server = gpr_malloc(sizeof(grpc_server));
@@ -733,9 +745,10 @@ grpc_server *grpc_server_create_from_filters(grpc_channel_filter **filters,
server->channel_filters =
gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *));
server->channel_filters[0] = &server_surface_filter;
+ /* TODO(census): restore this once we rework census filter
if (census_enabled) {
server->channel_filters[1] = &grpc_server_census_filter;
- }
+ } */
for (i = 0; i < filter_count; i++) {
server->channel_filters[i + 1 + census_enabled] = filters[i];
}
@@ -1004,6 +1017,7 @@ void grpc_server_destroy(grpc_server *server) {
server->listeners = l->next;
gpr_free(l);
}
+
gpr_mu_unlock(&server->mu);
server_unref(server);
diff --git a/src/core/transport/chttp2/gen_hpack_tables.c b/src/core/transport/chttp2/gen_hpack_tables.c
index 86b593129b..bdaa3cf094 100644
--- a/src/core/transport/chttp2/gen_hpack_tables.c
+++ b/src/core/transport/chttp2/gen_hpack_tables.c
@@ -219,10 +219,10 @@ static int state_index(int bitofs, symset syms, int *isnew) {
emit - the symbol to emit on this nibble (or -1 if no symbol has been
found)
syms - the set of symbols that could be matched */
-static void build_dec_tbl(int state, int nibble, int nibbits, int bitofs,
+static void build_dec_tbl(int state, int nibble, int nibbits, unsigned bitofs,
int emit, symset syms) {
int i;
- int bit;
+ unsigned bit;
/* If we have four bits in the nibble we're looking at, then we can fill in
a slot in the lookup tables. */
@@ -338,7 +338,7 @@ static void generate_base64_inverse_table(void) {
static const char alphabet[] =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=";
unsigned char inverse[256];
- int i;
+ unsigned i;
memset(inverse, 255, sizeof(inverse));
for (i = 0; i < strlen(alphabet); i++) {
diff --git a/src/cpp/server/thread_pool.cc b/src/cpp/server/thread_pool.cc
index e8d0e89ed2..118cabcb61 100644
--- a/src/cpp/server/thread_pool.cc
+++ b/src/cpp/server/thread_pool.cc
@@ -60,7 +60,7 @@ void ThreadPool::ThreadFunc() {
ThreadPool::ThreadPool(int num_threads) : shutdown_(false) {
for (int i = 0; i < num_threads; i++) {
- threads_.push_back(grpc::thread(&ThreadPool::ThreadFunc, this));
+ threads_.push_back(new grpc::thread(&ThreadPool::ThreadFunc, this));
}
}
@@ -71,7 +71,8 @@ ThreadPool::~ThreadPool() {
cv_.notify_all();
}
for (auto t = threads_.begin(); t != threads_.end(); t++) {
- t->join();
+ (*t)->join();
+ delete *t;
}
}
diff --git a/src/cpp/server/thread_pool.h b/src/cpp/server/thread_pool.h
index 0f24d6e9b3..26f25611b5 100644
--- a/src/cpp/server/thread_pool.h
+++ b/src/cpp/server/thread_pool.h
@@ -57,7 +57,7 @@ class ThreadPool GRPC_FINAL : public ThreadPoolInterface {
grpc::condition_variable cv_;
bool shutdown_;
std::queue<std::function<void()>> callbacks_;
- std::vector<grpc::thread> threads_;
+ std::vector<grpc::thread*> threads_;
void ThreadFunc();
};
diff --git a/src/python/src/grpc/_adapter/_c.c b/src/python/src/grpc/_adapter/_c.c
deleted file mode 100644
index f096a55b61..0000000000
--- a/src/python/src/grpc/_adapter/_c.c
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <Python.h>
-#include <grpc/grpc.h>
-
-#include "grpc/_adapter/_completion_queue.h"
-#include "grpc/_adapter/_channel.h"
-#include "grpc/_adapter/_call.h"
-#include "grpc/_adapter/_server.h"
-#include "grpc/_adapter/_client_credentials.h"
-#include "grpc/_adapter/_server_credentials.h"
-
-static PyObject *init(PyObject *self) {
- grpc_init();
- Py_RETURN_NONE;
-}
-
-static PyObject *shutdown(PyObject *self) {
- grpc_shutdown();
- Py_RETURN_NONE;
-}
-
-static PyMethodDef _c_methods[] = {
- {"init", (PyCFunction)init, METH_NOARGS,
- "Initialize the module's static state."},
- {"shut_down", (PyCFunction)shutdown, METH_NOARGS,
- "Shut down the module's static state."},
- {NULL},
-};
-
-PyMODINIT_FUNC init_c(void) {
- PyObject *module;
-
- module = Py_InitModule3("_c", _c_methods,
- "Wrappings of C structures and functions.");
-
- if (pygrpc_add_completion_queue(module) == -1) {
- return;
- }
- if (pygrpc_add_channel(module) == -1) {
- return;
- }
- if (pygrpc_add_call(module) == -1) {
- return;
- }
- if (pygrpc_add_server(module) == -1) {
- return;
- }
- if (pygrpc_add_client_credentials(module) == -1) {
- return;
- }
- if (pygrpc_add_server_credentials(module) == -1) {
- return;
- }
-}
diff --git a/src/python/src/grpc/_adapter/_c/module.c b/src/python/src/grpc/_adapter/_c/module.c
new file mode 100644
index 0000000000..1f3aedd9d8
--- /dev/null
+++ b/src/python/src/grpc/_adapter/_c/module.c
@@ -0,0 +1,61 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <stdlib.h>
+
+#define PY_SSIZE_T_CLEAN
+#include <Python.h>
+#include <grpc/grpc.h>
+
+#include "grpc/_adapter/_c/types.h"
+
+static PyMethodDef c_methods[] = {
+ {NULL}
+};
+
+PyMODINIT_FUNC init_c(void) {
+ PyObject *module;
+
+ module = Py_InitModule3("_c", c_methods,
+ "Wrappings of C structures and functions.");
+
+ if (pygrpc_module_add_types(module) < 0) {
+ return;
+ }
+
+ /* GRPC maintains an internal counter of how many times it has been
+ initialized and handles multiple pairs of grpc_init()/grpc_shutdown()
+ invocations accordingly. */
+ grpc_init();
+ atexit(&grpc_shutdown);
+}
diff --git a/src/python/src/grpc/_adapter/_c/types.c b/src/python/src/grpc/_adapter/_c/types.c
new file mode 100644
index 0000000000..8855c32ca6
--- /dev/null
+++ b/src/python/src/grpc/_adapter/_c/types.c
@@ -0,0 +1,60 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "grpc/_adapter/_c/types.h"
+
+#define PY_SSIZE_T_CLEAN
+#include <Python.h>
+#include <grpc/grpc.h>
+
+int pygrpc_module_add_types(PyObject *module) {
+ int i;
+ PyTypeObject *types[] = {
+ &pygrpc_ClientCredentials_type,
+ &pygrpc_ServerCredentials_type,
+ &pygrpc_CompletionQueue_type,
+ &pygrpc_Call_type,
+ &pygrpc_Channel_type,
+ &pygrpc_Server_type
+ };
+ for (i = 0; i < sizeof(types)/sizeof(PyTypeObject *); ++i) {
+ if (PyType_Ready(types[i]) < 0) {
+ return -1;
+ }
+ }
+ for (i = 0; i < sizeof(types)/sizeof(PyTypeObject *); ++i) {
+ Py_INCREF(types[i]);
+ PyModule_AddObject(module, types[i]->tp_name, (PyObject *)types[i]);
+ }
+ return 0;
+}
diff --git a/src/python/src/grpc/_adapter/_c/types.h b/src/python/src/grpc/_adapter/_c/types.h
new file mode 100644
index 0000000000..e189ae2566
--- /dev/null
+++ b/src/python/src/grpc/_adapter/_c/types.h
@@ -0,0 +1,271 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC__ADAPTER__C_TYPES_H_
+#define GRPC__ADAPTER__C_TYPES_H_
+
+#define PY_SSIZE_T_CLEAN
+#include <Python.h>
+#include <grpc/grpc.h>
+#include <grpc/grpc_security.h>
+
+
+/*=========================*/
+/* Client-side credentials */
+/*=========================*/
+
+typedef struct ClientCredentials {
+ PyObject_HEAD
+ grpc_credentials *c_creds;
+} ClientCredentials;
+void pygrpc_ClientCredentials_dealloc(ClientCredentials *self);
+ClientCredentials *pygrpc_ClientCredentials_google_default(
+ PyTypeObject *type, PyObject *ignored);
+ClientCredentials *pygrpc_ClientCredentials_ssl(
+ PyTypeObject *type, PyObject *args, PyObject *kwargs);
+ClientCredentials *pygrpc_ClientCredentials_composite(
+ PyTypeObject *type, PyObject *args, PyObject *kwargs);
+ClientCredentials *pygrpc_ClientCredentials_compute_engine(
+ PyTypeObject *type, PyObject *ignored);
+ClientCredentials *pygrpc_ClientCredentials_service_account(
+ PyTypeObject *type, PyObject *args, PyObject *kwargs);
+ClientCredentials *pygrpc_ClientCredentials_jwt(
+ PyTypeObject *type, PyObject *args, PyObject *kwargs);
+ClientCredentials *pygrpc_ClientCredentials_refresh_token(
+ PyTypeObject *type, PyObject *args, PyObject *kwargs);
+ClientCredentials *pygrpc_ClientCredentials_fake_transport_security(
+ PyTypeObject *type, PyObject *ignored);
+ClientCredentials *pygrpc_ClientCredentials_iam(
+ PyTypeObject *type, PyObject *args, PyObject *kwargs);
+extern PyTypeObject pygrpc_ClientCredentials_type;
+
+
+/*=========================*/
+/* Server-side credentials */
+/*=========================*/
+
+typedef struct ServerCredentials {
+ PyObject_HEAD
+ grpc_server_credentials *c_creds;
+} ServerCredentials;
+void pygrpc_ServerCredentials_dealloc(ServerCredentials *self);
+ServerCredentials *pygrpc_ServerCredentials_ssl(
+ PyTypeObject *type, PyObject *args, PyObject *kwargs);
+ServerCredentials *pygrpc_ServerCredentials_fake_transport_security(
+ PyTypeObject *type, PyObject *ignored);
+extern PyTypeObject pygrpc_ServerCredentials_type;
+
+
+/*==================*/
+/* Completion queue */
+/*==================*/
+
+typedef struct CompletionQueue {
+ PyObject_HEAD
+ grpc_completion_queue *c_cq;
+} CompletionQueue;
+CompletionQueue *pygrpc_CompletionQueue_new(
+ PyTypeObject *type, PyObject *args, PyObject *kwargs);
+void pygrpc_CompletionQueue_dealloc(CompletionQueue *self);
+PyObject *pygrpc_CompletionQueue_next(
+ CompletionQueue *self, PyObject *args, PyObject *kwargs);
+PyObject *pygrpc_CompletionQueue_shutdown(
+ CompletionQueue *self, PyObject *ignored);
+extern PyTypeObject pygrpc_CompletionQueue_type;
+
+
+/*======*/
+/* Call */
+/*======*/
+
+typedef struct Call {
+ PyObject_HEAD
+ grpc_call *c_call;
+ CompletionQueue *cq;
+} Call;
+Call *pygrpc_Call_new_empty(CompletionQueue *cq);
+void pygrpc_Call_dealloc(Call *self);
+PyObject *pygrpc_Call_start_batch(Call *self, PyObject *args, PyObject *kwargs);
+PyObject *pygrpc_Call_cancel(Call *self, PyObject *args, PyObject *kwargs);
+extern PyTypeObject pygrpc_Call_type;
+
+
+/*=========*/
+/* Channel */
+/*=========*/
+
+typedef struct Channel {
+ PyObject_HEAD
+ grpc_channel *c_chan;
+} Channel;
+Channel *pygrpc_Channel_new(
+ PyTypeObject *type, PyObject *args, PyObject *kwargs);
+void pygrpc_Channel_dealloc(Channel *self);
+Call *pygrpc_Channel_create_call(
+ Channel *self, PyObject *args, PyObject *kwargs);
+extern PyTypeObject pygrpc_Channel_type;
+
+
+/*========*/
+/* Server */
+/*========*/
+
+typedef struct Server {
+ PyObject_HEAD
+ grpc_server *c_serv;
+ CompletionQueue *cq;
+} Server;
+Server *pygrpc_Server_new(PyTypeObject *type, PyObject *args, PyObject *kwargs);
+void pygrpc_Server_dealloc(Server *self);
+PyObject *pygrpc_Server_request_call(
+ Server *self, PyObject *args, PyObject *kwargs);
+PyObject *pygrpc_Server_add_http2_port(
+ Server *self, PyObject *args, PyObject *kwargs);
+PyObject *pygrpc_Server_start(Server *self, PyObject *ignored);
+PyObject *pygrpc_Server_shutdown(
+ Server *self, PyObject *args, PyObject *kwargs);
+extern PyTypeObject pygrpc_Server_type;
+
+/*=========*/
+/* Utility */
+/*=========*/
+
+/* Every tag that passes from Python GRPC to GRPC core is of this type. */
+typedef struct pygrpc_tag {
+ PyObject *user_tag;
+ Call *call;
+ grpc_call_details request_call_details;
+ grpc_metadata_array request_metadata;
+ grpc_op *ops;
+ size_t nops;
+ int is_new_call;
+} pygrpc_tag;
+
+/* Construct a tag associated with a batch call. Does not take ownership of the
+ resources in the elements of ops. */
+pygrpc_tag *pygrpc_produce_batch_tag(PyObject *user_tag, Call *call,
+ grpc_op *ops, size_t nops);
+
+
+/* Construct a tag associated with a server request. The calling code should
+ use the appropriate fields of the produced tag in the invocation of
+ grpc_server_request_call. */
+pygrpc_tag *pygrpc_produce_request_tag(PyObject *user_tag, Call *empty_call);
+
+/* Construct a tag associated with a server shutdown. */
+pygrpc_tag *pygrpc_produce_server_shutdown_tag(PyObject *user_tag);
+
+/* Frees all resources owned by the tag and the tag itself. */
+void pygrpc_discard_tag(pygrpc_tag *tag);
+
+/* Consumes an event and its associated tag, providing a Python tuple of the
+ form `(type, tag, call, call_details, results)` (where type is an integer
+ corresponding to a grpc_completion_type, tag is an arbitrary PyObject, call
+ is the call object associated with the event [if any], call_details is a
+ tuple of form `(method, host, deadline)` [if such details are available],
+ and resultd is a list of tuples of form `(type, metadata, message, status,
+ cancelled)` [where type corresponds to a grpc_op_type, metadata is a
+ sequence of 2-sequences of strings, message is a byte string, and status is
+ a 2-tuple of an integer corresponding to grpc_status_code and a string of
+ status details]).
+
+ Frees all resources associated with the event tag. */
+PyObject *pygrpc_consume_event(grpc_event event);
+
+/* Transliterate the Python tuple of form `(type, metadata, message,
+ status)` (where type is an integer corresponding to a grpc_op_type, metadata
+ is a sequence of 2-sequences of strings, message is a byte string, and
+ status is 2-tuple of an integer corresponding to grpc_status_code and a
+ string of status details) to a grpc_op suitable for use in a
+ grpc_call_start_batch invocation. The grpc_op is a 'directory' of resources
+ that must be freed after GRPC core is done with them.
+
+ Calls gpr_malloc (or the appropriate type-specific grpc_*_create function)
+ to populate the appropriate union-discriminated members of the op.
+
+ Returns true on success, false on failure. */
+int pygrpc_produce_op(PyObject *op, grpc_op *result);
+
+/* Discards all resources associated with the passed in op that was produced by
+ pygrpc_produce_op. */
+void pygrpc_discard_op(grpc_op op);
+
+/* Transliterate the grpc_ops (which have been sent through a
+ grpc_call_start_batch invocation and whose corresponding event has appeared
+ on a completion queue) to a Python tuple of form `(type, metadata, message,
+ status, cancelled)` (where type is an integer corresponding to a
+ grpc_op_type, metadata is a sequence of 2-sequences of strings, message is a
+ byte string, and status is 2-tuple of an integer corresponding to
+ grpc_status_code and a string of status details).
+
+ Calls gpr_free (or the appropriate type-specific grpc_*_destroy function) on
+ the appropriate union-discriminated populated members of the ops. */
+PyObject *pygrpc_consume_ops(grpc_op *op, size_t nops);
+
+/* Transliterate from a gpr_timespec to a double (in units of seconds, either
+ from the epoch if interpreted absolutely or as a delta otherwise). */
+double pygrpc_cast_gpr_timespec_to_double(gpr_timespec timespec);
+
+/* Transliterate from a double (in units of seconds from the epoch if
+ interpreted absolutely or as a delta otherwise) to a gpr_timespec. */
+gpr_timespec pygrpc_cast_double_to_gpr_timespec(double seconds);
+
+/* Returns true on success, false on failure. */
+int pygrpc_cast_pylist_to_send_metadata(
+ PyObject *pylist, grpc_metadata **metadata, size_t *count);
+/* Returns a metadata array as a Python object on success, else NULL. */
+PyObject *pygrpc_cast_metadata_array_to_pylist(grpc_metadata_array metadata);
+
+/* Transliterate from a list of python channel arguments (2-tuples of string
+ and string|integer|None) to a grpc_channel_args object. The strings placed
+ in the grpc_channel_args object's grpc_arg elements are views of the Python
+ object. The Python object must live long enough for the grpc_channel_args
+ to be used. Arguments set to None are silently ignored. Returns true on
+ success, false on failure. */
+int pygrpc_produce_channel_args(PyObject *py_args, grpc_channel_args *c_args);
+void pygrpc_discard_channel_args(grpc_channel_args args);
+
+/* Read the bytes from grpc_byte_buffer to a gpr_malloc'd array of bytes;
+ output to result and result_size. */
+void pygrpc_byte_buffer_to_bytes(
+ grpc_byte_buffer *buffer, char **result, size_t *result_size);
+
+
+/*========*/
+/* Module */
+/*========*/
+
+/* Returns 0 on success, -1 on failure. */
+int pygrpc_module_add_types(PyObject *module);
+
+#endif /* GRPC__ADAPTER__C_TYPES_H_ */
diff --git a/src/python/src/grpc/_adapter/_c/types/call.c b/src/python/src/grpc/_adapter/_c/types/call.c
new file mode 100644
index 0000000000..0739070044
--- /dev/null
+++ b/src/python/src/grpc/_adapter/_c/types/call.c
@@ -0,0 +1,163 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "grpc/_adapter/_c/types.h"
+
+#define PY_SSIZE_T_CLEAN
+#include <Python.h>
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+
+
+PyMethodDef pygrpc_Call_methods[] = {
+ {"start_batch", (PyCFunction)pygrpc_Call_start_batch, METH_KEYWORDS, ""},
+ {"cancel", (PyCFunction)pygrpc_Call_cancel, METH_KEYWORDS, ""},
+ {NULL}
+};
+const char pygrpc_Call_doc[] = "See grpc._adapter._types.Call.";
+PyTypeObject pygrpc_Call_type = {
+ PyObject_HEAD_INIT(NULL)
+ 0, /* ob_size */
+ "Call", /* tp_name */
+ sizeof(Call), /* tp_basicsize */
+ 0, /* tp_itemsize */
+ (destructor)pygrpc_Call_dealloc, /* tp_dealloc */
+ 0, /* tp_print */
+ 0, /* tp_getattr */
+ 0, /* tp_setattr */
+ 0, /* tp_compare */
+ 0, /* tp_repr */
+ 0, /* tp_as_number */
+ 0, /* tp_as_sequence */
+ 0, /* tp_as_mapping */
+ 0, /* tp_hash */
+ 0, /* tp_call */
+ 0, /* tp_str */
+ 0, /* tp_getattro */
+ 0, /* tp_setattro */
+ 0, /* tp_as_buffer */
+ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
+ pygrpc_Call_doc, /* tp_doc */
+ 0, /* tp_traverse */
+ 0, /* tp_clear */
+ 0, /* tp_richcompare */
+ 0, /* tp_weaklistoffset */
+ 0, /* tp_iter */
+ 0, /* tp_iternext */
+ pygrpc_Call_methods, /* tp_methods */
+ 0, /* tp_members */
+ 0, /* tp_getset */
+ 0, /* tp_base */
+ 0, /* tp_dict */
+ 0, /* tp_descr_get */
+ 0, /* tp_descr_set */
+ 0, /* tp_dictoffset */
+ 0, /* tp_init */
+ 0, /* tp_alloc */
+ 0 /* tp_new */
+};
+
+Call *pygrpc_Call_new_empty(CompletionQueue *cq) {
+ Call *call = (Call *)pygrpc_Call_type.tp_alloc(&pygrpc_Call_type, 0);
+ call->c_call = NULL;
+ call->cq = cq;
+ Py_XINCREF(call->cq);
+ return call;
+}
+void pygrpc_Call_dealloc(Call *self) {
+ if (self->c_call) {
+ grpc_call_destroy(self->c_call);
+ }
+ Py_XDECREF(self->cq);
+ self->ob_type->tp_free((PyObject *)self);
+}
+PyObject *pygrpc_Call_start_batch(Call *self, PyObject *args, PyObject *kwargs) {
+ PyObject *op_list;
+ PyObject *user_tag;
+ grpc_op *ops;
+ size_t nops;
+ size_t i;
+ size_t j;
+ pygrpc_tag *tag;
+ grpc_call_error errcode;
+ static char *keywords[] = {"ops", "tag", NULL};
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OO:start_batch", keywords,
+ &op_list, &user_tag)) {
+ return NULL;
+ }
+ if (!PyList_Check(op_list)) {
+ PyErr_SetString(PyExc_TypeError, "expected a list of OpArgs");
+ return NULL;
+ }
+ nops = PyList_Size(op_list);
+ ops = gpr_malloc(sizeof(grpc_op) * nops);
+ for (i = 0; i < nops; ++i) {
+ PyObject *item = PyList_GET_ITEM(op_list, i);
+ if (!pygrpc_produce_op(item, &ops[i])) {
+ for (j = 0; j < i; ++j) {
+ pygrpc_discard_op(ops[j]);
+ }
+ return NULL;
+ }
+ }
+ tag = pygrpc_produce_batch_tag(user_tag, self, ops, nops);
+ errcode = grpc_call_start_batch(self->c_call, tag->ops, tag->nops, tag);
+ gpr_free(ops);
+ return PyInt_FromLong(errcode);
+}
+PyObject *pygrpc_Call_cancel(Call *self, PyObject *args, PyObject *kwargs) {
+ PyObject *py_code = NULL;
+ grpc_call_error errcode;
+ int code;
+ char *details = NULL;
+ static char *keywords[] = {"code", "details", NULL};
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|Os:start_batch", keywords,
+ &py_code, &details)) {
+ return NULL;
+ }
+ if (py_code != NULL && details != NULL) {
+ if (!PyInt_Check(py_code)) {
+ PyErr_SetString(PyExc_TypeError, "expected integer code");
+ return NULL;
+ }
+ code = PyInt_AsLong(py_code);
+ errcode = grpc_call_cancel_with_status(self->c_call, code, details);
+ } else if (py_code != NULL || details != NULL) {
+ PyErr_SetString(PyExc_ValueError,
+ "if `code` is specified, so must `details`");
+ return NULL;
+ } else {
+ errcode = grpc_call_cancel(self->c_call);
+ }
+ return PyInt_FromLong(errcode);
+}
diff --git a/src/python/src/grpc/_adapter/_c/types/channel.c b/src/python/src/grpc/_adapter/_c/types/channel.c
new file mode 100644
index 0000000000..c235597466
--- /dev/null
+++ b/src/python/src/grpc/_adapter/_c/types/channel.c
@@ -0,0 +1,134 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "grpc/_adapter/_c/types.h"
+
+#define PY_SSIZE_T_CLEAN
+#include <Python.h>
+#include <grpc/grpc.h>
+
+
+PyMethodDef pygrpc_Channel_methods[] = {
+ {"create_call", (PyCFunction)pygrpc_Channel_create_call, METH_KEYWORDS, ""},
+ {NULL}
+};
+const char pygrpc_Channel_doc[] = "See grpc._adapter._types.Channel.";
+PyTypeObject pygrpc_Channel_type = {
+ PyObject_HEAD_INIT(NULL)
+ 0, /* ob_size */
+ "Channel", /* tp_name */
+ sizeof(Channel), /* tp_basicsize */
+ 0, /* tp_itemsize */
+ (destructor)pygrpc_Channel_dealloc, /* tp_dealloc */
+ 0, /* tp_print */
+ 0, /* tp_getattr */
+ 0, /* tp_setattr */
+ 0, /* tp_compare */
+ 0, /* tp_repr */
+ 0, /* tp_as_number */
+ 0, /* tp_as_sequence */
+ 0, /* tp_as_mapping */
+ 0, /* tp_hash */
+ 0, /* tp_call */
+ 0, /* tp_str */
+ 0, /* tp_getattro */
+ 0, /* tp_setattro */
+ 0, /* tp_as_buffer */
+ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
+ pygrpc_Channel_doc, /* tp_doc */
+ 0, /* tp_traverse */
+ 0, /* tp_clear */
+ 0, /* tp_richcompare */
+ 0, /* tp_weaklistoffset */
+ 0, /* tp_iter */
+ 0, /* tp_iternext */
+ pygrpc_Channel_methods, /* tp_methods */
+ 0, /* tp_members */
+ 0, /* tp_getset */
+ 0, /* tp_base */
+ 0, /* tp_dict */
+ 0, /* tp_descr_get */
+ 0, /* tp_descr_set */
+ 0, /* tp_dictoffset */
+ 0, /* tp_init */
+ 0, /* tp_alloc */
+ (newfunc)pygrpc_Channel_new /* tp_new */
+};
+
+Channel *pygrpc_Channel_new(
+ PyTypeObject *type, PyObject *args, PyObject *kwargs) {
+ Channel *self;
+ const char *target;
+ PyObject *py_args;
+ ClientCredentials *creds = NULL;
+ grpc_channel_args c_args;
+ char *keywords[] = {"target", "args", "creds", NULL};
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "sO|O!:Channel", keywords,
+ &target, &py_args, &pygrpc_ClientCredentials_type, &creds)) {
+ return NULL;
+ }
+ if (!pygrpc_produce_channel_args(py_args, &c_args)) {
+ return NULL;
+ }
+ self = (Channel *)type->tp_alloc(type, 0);
+ if (creds) {
+ self->c_chan = grpc_secure_channel_create(creds->c_creds, target, &c_args);
+ } else {
+ self->c_chan = grpc_channel_create(target, &c_args);
+ }
+ pygrpc_discard_channel_args(c_args);
+ return self;
+}
+void pygrpc_Channel_dealloc(Channel *self) {
+ grpc_channel_destroy(self->c_chan);
+ self->ob_type->tp_free((PyObject *)self);
+}
+
+Call *pygrpc_Channel_create_call(
+ Channel *self, PyObject *args, PyObject *kwargs) {
+ Call *call;
+ CompletionQueue *cq;
+ const char *method;
+ const char *host;
+ double deadline;
+ char *keywords[] = {"cq", "method", "host", "deadline", NULL};
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!ssd:create_call", keywords,
+ &pygrpc_CompletionQueue_type, &cq, &method, &host, &deadline)) {
+ return NULL;
+ }
+ call = pygrpc_Call_new_empty(cq);
+ call->c_call = grpc_channel_create_call(
+ self->c_chan, cq->c_cq, method, host,
+ pygrpc_cast_double_to_gpr_timespec(deadline));
+ return call;
+}
diff --git a/src/python/src/grpc/_adapter/_c/types/client_credentials.c b/src/python/src/grpc/_adapter/_c/types/client_credentials.c
new file mode 100644
index 0000000000..6a4561c060
--- /dev/null
+++ b/src/python/src/grpc/_adapter/_c/types/client_credentials.c
@@ -0,0 +1,286 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "grpc/_adapter/_c/types.h"
+
+#define PY_SSIZE_T_CLEAN
+#include <Python.h>
+#include <grpc/grpc.h>
+#include <grpc/grpc_security.h>
+
+
+PyMethodDef pygrpc_ClientCredentials_methods[] = {
+ {"google_default", (PyCFunction)pygrpc_ClientCredentials_google_default,
+ METH_CLASS|METH_NOARGS, ""},
+ {"ssl", (PyCFunction)pygrpc_ClientCredentials_ssl,
+ METH_CLASS|METH_KEYWORDS, ""},
+ {"composite", (PyCFunction)pygrpc_ClientCredentials_composite,
+ METH_CLASS|METH_KEYWORDS, ""},
+ {"compute_engine", (PyCFunction)pygrpc_ClientCredentials_compute_engine,
+ METH_CLASS|METH_NOARGS, ""},
+ {"service_account", (PyCFunction)pygrpc_ClientCredentials_service_account,
+ METH_CLASS|METH_KEYWORDS, ""},
+ {"jwt", (PyCFunction)pygrpc_ClientCredentials_jwt,
+ METH_CLASS|METH_KEYWORDS, ""},
+ {"refresh_token", (PyCFunction)pygrpc_ClientCredentials_refresh_token,
+ METH_CLASS|METH_KEYWORDS, ""},
+ {"fake_transport_security",
+ (PyCFunction)pygrpc_ClientCredentials_fake_transport_security,
+ METH_CLASS|METH_NOARGS, ""},
+ {"iam", (PyCFunction)pygrpc_ClientCredentials_iam,
+ METH_CLASS|METH_KEYWORDS, ""},
+ {NULL}
+};
+const char pygrpc_ClientCredentials_doc[] = "";
+PyTypeObject pygrpc_ClientCredentials_type = {
+ PyObject_HEAD_INIT(NULL)
+ 0, /* ob_size */
+ "ClientCredentials", /* tp_name */
+ sizeof(ClientCredentials), /* tp_basicsize */
+ 0, /* tp_itemsize */
+ (destructor)pygrpc_ClientCredentials_dealloc, /* tp_dealloc */
+ 0, /* tp_print */
+ 0, /* tp_getattr */
+ 0, /* tp_setattr */
+ 0, /* tp_compare */
+ 0, /* tp_repr */
+ 0, /* tp_as_number */
+ 0, /* tp_as_sequence */
+ 0, /* tp_as_mapping */
+ 0, /* tp_hash */
+ 0, /* tp_call */
+ 0, /* tp_str */
+ 0, /* tp_getattro */
+ 0, /* tp_setattro */
+ 0, /* tp_as_buffer */
+ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
+ pygrpc_ClientCredentials_doc, /* tp_doc */
+ 0, /* tp_traverse */
+ 0, /* tp_clear */
+ 0, /* tp_richcompare */
+ 0, /* tp_weaklistoffset */
+ 0, /* tp_iter */
+ 0, /* tp_iternext */
+ pygrpc_ClientCredentials_methods, /* tp_methods */
+ 0, /* tp_members */
+ 0, /* tp_getset */
+ 0, /* tp_base */
+ 0, /* tp_dict */
+ 0, /* tp_descr_get */
+ 0, /* tp_descr_set */
+ 0, /* tp_dictoffset */
+ 0, /* tp_init */
+ 0, /* tp_alloc */
+ 0 /* tp_new */
+};
+
+void pygrpc_ClientCredentials_dealloc(ClientCredentials *self) {
+ grpc_credentials_release(self->c_creds);
+ self->ob_type->tp_free((PyObject *)self);
+}
+
+ClientCredentials *pygrpc_ClientCredentials_google_default(
+ PyTypeObject *type, PyObject *ignored) {
+ ClientCredentials *self = (ClientCredentials *)type->tp_alloc(type, 0);
+ self->c_creds = grpc_google_default_credentials_create();
+ if (!self->c_creds) {
+ Py_DECREF(self);
+ PyErr_SetString(PyExc_RuntimeError,
+ "couldn't create Google default credentials");
+ return NULL;
+ }
+ return self;
+}
+
+ClientCredentials *pygrpc_ClientCredentials_ssl(
+ PyTypeObject *type, PyObject *args, PyObject *kwargs) {
+ ClientCredentials *self;
+ const char *root_certs;
+ const char *private_key = NULL;
+ const char *cert_chain = NULL;
+ grpc_ssl_pem_key_cert_pair key_cert_pair;
+ static char *keywords[] = {"root_certs", "private_key", "cert_chain", NULL};
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "z|zz:ssl", keywords,
+ &root_certs, &private_key, &cert_chain)) {
+ return NULL;
+ }
+ self = (ClientCredentials *)type->tp_alloc(type, 0);
+ if (private_key && cert_chain) {
+ key_cert_pair.private_key = private_key;
+ key_cert_pair.cert_chain = cert_chain;
+ self->c_creds = grpc_ssl_credentials_create(root_certs, &key_cert_pair);
+ } else {
+ self->c_creds = grpc_ssl_credentials_create(root_certs, NULL);
+ }
+ if (!self->c_creds) {
+ Py_DECREF(self);
+ PyErr_SetString(PyExc_RuntimeError, "couldn't create ssl credentials");
+ return NULL;
+ }
+ return self;
+}
+
+ClientCredentials *pygrpc_ClientCredentials_composite(
+ PyTypeObject *type, PyObject *args, PyObject *kwargs) {
+ ClientCredentials *self;
+ ClientCredentials *creds1;
+ ClientCredentials *creds2;
+ static char *keywords[] = {"creds1", "creds2", NULL};
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!O!:composite", keywords,
+ &pygrpc_ClientCredentials_type, &creds1,
+ &pygrpc_ClientCredentials_type, &creds2)) {
+ return NULL;
+ }
+ self = (ClientCredentials *)type->tp_alloc(type, 0);
+ self->c_creds = grpc_composite_credentials_create(
+ creds1->c_creds, creds2->c_creds);
+ if (!self->c_creds) {
+ Py_DECREF(self);
+ PyErr_SetString(PyExc_RuntimeError, "couldn't create composite credentials");
+ return NULL;
+ }
+ return self;
+}
+
+ClientCredentials *pygrpc_ClientCredentials_compute_engine(
+ PyTypeObject *type, PyObject *ignored) {
+ ClientCredentials *self = (ClientCredentials *)type->tp_alloc(type, 0);
+ self->c_creds = grpc_compute_engine_credentials_create();
+ if (!self->c_creds) {
+ Py_DECREF(self);
+ PyErr_SetString(PyExc_RuntimeError,
+ "couldn't create compute engine credentials");
+ return NULL;
+ }
+ return self;
+}
+
+ClientCredentials *pygrpc_ClientCredentials_service_account(
+ PyTypeObject *type, PyObject *args, PyObject *kwargs) {
+ ClientCredentials *self;
+ const char *json_key;
+ const char *scope;
+ double lifetime;
+ static char *keywords[] = {"json_key", "scope", "token_lifetime", NULL};
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "ssd:service_account", keywords,
+ &json_key, &scope, &lifetime)) {
+ return NULL;
+ }
+ self = (ClientCredentials *)type->tp_alloc(type, 0);
+ self->c_creds = grpc_service_account_credentials_create(
+ json_key, scope, pygrpc_cast_double_to_gpr_timespec(lifetime));
+ if (!self->c_creds) {
+ Py_DECREF(self);
+ PyErr_SetString(PyExc_RuntimeError,
+ "couldn't create service account credentials");
+ return NULL;
+ }
+ return self;
+}
+
+ClientCredentials *pygrpc_ClientCredentials_jwt(
+ PyTypeObject *type, PyObject *args, PyObject *kwargs) {
+ ClientCredentials *self;
+ const char *json_key;
+ double lifetime;
+ static char *keywords[] = {"json_key", "token_lifetime", NULL};
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "sd:jwt", keywords,
+ &json_key, &lifetime)) {
+ return NULL;
+ }
+ self = (ClientCredentials *)type->tp_alloc(type, 0);
+ self->c_creds = grpc_jwt_credentials_create(
+ json_key, pygrpc_cast_double_to_gpr_timespec(lifetime));
+ if (!self->c_creds) {
+ Py_DECREF(self);
+ PyErr_SetString(PyExc_RuntimeError, "couldn't create JWT credentials");
+ return NULL;
+ }
+ return self;
+}
+
+ClientCredentials *pygrpc_ClientCredentials_refresh_token(
+ PyTypeObject *type, PyObject *args, PyObject *kwargs) {
+ ClientCredentials *self;
+ const char *json_refresh_token;
+ static char *keywords[] = {"json_refresh_token", NULL};
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s:refresh_token", keywords,
+ &json_refresh_token)) {
+ return NULL;
+ }
+ self = (ClientCredentials *)type->tp_alloc(type, 0);
+ self->c_creds = grpc_refresh_token_credentials_create(json_refresh_token);
+ if (!self->c_creds) {
+ Py_DECREF(self);
+ PyErr_SetString(PyExc_RuntimeError,
+ "couldn't create credentials from refresh token");
+ return NULL;
+ }
+ return self;
+}
+
+ClientCredentials *pygrpc_ClientCredentials_fake_transport_security(
+ PyTypeObject *type, PyObject *ignored) {
+ ClientCredentials *self = (ClientCredentials *)type->tp_alloc(type, 0);
+ self->c_creds = grpc_fake_transport_security_credentials_create();
+ if (!self->c_creds) {
+ Py_DECREF(self);
+ PyErr_SetString(PyExc_RuntimeError,
+ "couldn't create fake credentials; "
+ "something is horribly wrong with the universe");
+ return NULL;
+ }
+ return self;
+}
+
+ClientCredentials *pygrpc_ClientCredentials_iam(
+ PyTypeObject *type, PyObject *args, PyObject *kwargs) {
+ ClientCredentials *self;
+ const char *authorization_token;
+ const char *authority_selector;
+ static char *keywords[] = {"authorization_token", "authority_selector", NULL};
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "ss:iam", keywords,
+ &authorization_token, &authority_selector)) {
+ return NULL;
+ }
+ self = (ClientCredentials *)type->tp_alloc(type, 0);
+ self->c_creds = grpc_iam_credentials_create(authorization_token,
+ authority_selector);
+ if (!self->c_creds) {
+ Py_DECREF(self);
+ PyErr_SetString(PyExc_RuntimeError, "couldn't create IAM credentials");
+ return NULL;
+ }
+ return self;
+}
+
diff --git a/src/python/src/grpc/_adapter/_c/types/completion_queue.c b/src/python/src/grpc/_adapter/_c/types/completion_queue.c
new file mode 100644
index 0000000000..2dd44b6ddd
--- /dev/null
+++ b/src/python/src/grpc/_adapter/_c/types/completion_queue.c
@@ -0,0 +1,124 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "grpc/_adapter/_c/types.h"
+
+#define PY_SSIZE_T_CLEAN
+#include <Python.h>
+#include <grpc/grpc.h>
+
+
+PyMethodDef pygrpc_CompletionQueue_methods[] = {
+ {"next", (PyCFunction)pygrpc_CompletionQueue_next, METH_KEYWORDS, ""},
+ {"shutdown", (PyCFunction)pygrpc_CompletionQueue_shutdown, METH_NOARGS, ""},
+ {NULL}
+};
+const char pygrpc_CompletionQueue_doc[] =
+ "See grpc._adapter._types.CompletionQueue.";
+PyTypeObject pygrpc_CompletionQueue_type = {
+ PyObject_HEAD_INIT(NULL)
+ 0, /* ob_size */
+ "CompletionQueue", /* tp_name */
+ sizeof(CompletionQueue), /* tp_basicsize */
+ 0, /* tp_itemsize */
+ (destructor)pygrpc_CompletionQueue_dealloc, /* tp_dealloc */
+ 0, /* tp_print */
+ 0, /* tp_getattr */
+ 0, /* tp_setattr */
+ 0, /* tp_compare */
+ 0, /* tp_repr */
+ 0, /* tp_as_number */
+ 0, /* tp_as_sequence */
+ 0, /* tp_as_mapping */
+ 0, /* tp_hash */
+ 0, /* tp_call */
+ 0, /* tp_str */
+ 0, /* tp_getattro */
+ 0, /* tp_setattro */
+ 0, /* tp_as_buffer */
+ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
+ pygrpc_CompletionQueue_doc, /* tp_doc */
+ 0, /* tp_traverse */
+ 0, /* tp_clear */
+ 0, /* tp_richcompare */
+ 0, /* tp_weaklistoffset */
+ 0, /* tp_iter */
+ 0, /* tp_iternext */
+ pygrpc_CompletionQueue_methods, /* tp_methods */
+ 0, /* tp_members */
+ 0, /* tp_getset */
+ 0, /* tp_base */
+ 0, /* tp_dict */
+ 0, /* tp_descr_get */
+ 0, /* tp_descr_set */
+ 0, /* tp_dictoffset */
+ 0, /* tp_init */
+ 0, /* tp_alloc */
+ (newfunc)pygrpc_CompletionQueue_new /* tp_new */
+};
+
+CompletionQueue *pygrpc_CompletionQueue_new(
+ PyTypeObject *type, PyObject *args, PyObject *kwargs) {
+ CompletionQueue *self = (CompletionQueue *)type->tp_alloc(type, 0);
+ self->c_cq = grpc_completion_queue_create();
+ return self;
+}
+
+void pygrpc_CompletionQueue_dealloc(CompletionQueue *self) {
+ grpc_completion_queue_destroy(self->c_cq);
+ self->ob_type->tp_free((PyObject *)self);
+}
+
+PyObject *pygrpc_CompletionQueue_next(
+ CompletionQueue *self, PyObject *args, PyObject *kwargs) {
+ double deadline;
+ grpc_event event;
+ PyObject *transliterated_event;
+ static char *keywords[] = {"deadline", NULL};
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "d:next", keywords,
+ &deadline)) {
+ return NULL;
+ }
+ Py_BEGIN_ALLOW_THREADS;
+ event = grpc_completion_queue_next(
+ self->c_cq, pygrpc_cast_double_to_gpr_timespec(deadline));
+ Py_END_ALLOW_THREADS;
+ transliterated_event = pygrpc_consume_event(event);
+ return transliterated_event;
+}
+
+PyObject *pygrpc_CompletionQueue_shutdown(
+ CompletionQueue *self, PyObject *ignored) {
+ grpc_completion_queue_shutdown(self->c_cq);
+ Py_RETURN_NONE;
+}
diff --git a/src/python/src/grpc/_adapter/_c/types/server.c b/src/python/src/grpc/_adapter/_c/types/server.c
new file mode 100644
index 0000000000..65d84b58fe
--- /dev/null
+++ b/src/python/src/grpc/_adapter/_c/types/server.c
@@ -0,0 +1,183 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "grpc/_adapter/_c/types.h"
+
+#define PY_SSIZE_T_CLEAN
+#include <Python.h>
+#include <grpc/grpc.h>
+
+
+PyMethodDef pygrpc_Server_methods[] = {
+ {"request_call", (PyCFunction)pygrpc_Server_request_call,
+ METH_KEYWORDS, ""},
+ {"add_http2_port", (PyCFunction)pygrpc_Server_add_http2_port,
+ METH_KEYWORDS, ""},
+ {"start", (PyCFunction)pygrpc_Server_start, METH_NOARGS, ""},
+ {"shutdown", (PyCFunction)pygrpc_Server_shutdown, METH_KEYWORDS, ""},
+ {NULL}
+};
+const char pygrpc_Server_doc[] = "See grpc._adapter._types.Server.";
+PyTypeObject pygrpc_Server_type = {
+ PyObject_HEAD_INIT(NULL)
+ 0, /* ob_size */
+ "Server", /* tp_name */
+ sizeof(Server), /* tp_basicsize */
+ 0, /* tp_itemsize */
+ (destructor)pygrpc_Server_dealloc, /* tp_dealloc */
+ 0, /* tp_print */
+ 0, /* tp_getattr */
+ 0, /* tp_setattr */
+ 0, /* tp_compare */
+ 0, /* tp_repr */
+ 0, /* tp_as_number */
+ 0, /* tp_as_sequence */
+ 0, /* tp_as_mapping */
+ 0, /* tp_hash */
+ 0, /* tp_call */
+ 0, /* tp_str */
+ 0, /* tp_getattro */
+ 0, /* tp_setattro */
+ 0, /* tp_as_buffer */
+ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
+ pygrpc_Server_doc, /* tp_doc */
+ 0, /* tp_traverse */
+ 0, /* tp_clear */
+ 0, /* tp_richcompare */
+ 0, /* tp_weaklistoffset */
+ 0, /* tp_iter */
+ 0, /* tp_iternext */
+ pygrpc_Server_methods, /* tp_methods */
+ 0, /* tp_members */
+ 0, /* tp_getset */
+ 0, /* tp_base */
+ 0, /* tp_dict */
+ 0, /* tp_descr_get */
+ 0, /* tp_descr_set */
+ 0, /* tp_dictoffset */
+ 0, /* tp_init */
+ 0, /* tp_alloc */
+ (newfunc)pygrpc_Server_new /* tp_new */
+};
+
+Server *pygrpc_Server_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) {
+ Server *self;
+ CompletionQueue *cq;
+ PyObject *py_args;
+ grpc_channel_args c_args;
+ char *keywords[] = {"cq", "args", NULL};
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O!O:Channel", keywords,
+ &pygrpc_CompletionQueue_type, &cq, &py_args)) {
+ return NULL;
+ }
+ if (!pygrpc_produce_channel_args(py_args, &c_args)) {
+ return NULL;
+ }
+ self = (Server *)type->tp_alloc(type, 0);
+ self->c_serv = grpc_server_create(&c_args);
+ pygrpc_discard_channel_args(c_args);
+ self->cq = cq;
+ Py_INCREF(self->cq);
+ return self;
+}
+
+void pygrpc_Server_dealloc(Server *self) {
+ grpc_server_destroy(self->c_serv);
+ Py_XDECREF(self->cq);
+ self->ob_type->tp_free((PyObject *)self);
+}
+
+PyObject *pygrpc_Server_request_call(
+ Server *self, PyObject *args, PyObject *kwargs) {
+ CompletionQueue *cq;
+ PyObject *user_tag;
+ pygrpc_tag *tag;
+ Call *empty_call;
+ grpc_call_error errcode;
+ static char *keywords[] = {"cq", "tag", NULL};
+ if (!PyArg_ParseTupleAndKeywords(
+ args, kwargs, "O!O", keywords,
+ &pygrpc_CompletionQueue_type, &cq, &user_tag)) {
+ return NULL;
+ }
+ empty_call = pygrpc_Call_new_empty(cq);
+ tag = pygrpc_produce_request_tag(user_tag, empty_call);
+ errcode = grpc_server_request_call(
+ self->c_serv, &tag->call->c_call, &tag->request_call_details,
+ &tag->request_metadata, tag->call->cq->c_cq, self->cq->c_cq, tag);
+ Py_DECREF(empty_call);
+ return PyInt_FromLong(errcode);
+}
+
+PyObject *pygrpc_Server_add_http2_port(
+ Server *self, PyObject *args, PyObject *kwargs) {
+ const char *addr;
+ ServerCredentials *creds = NULL;
+ int port;
+ static char *keywords[] = {"addr", "creds", NULL};
+ if (!PyArg_ParseTupleAndKeywords(
+ args, kwargs, "s|O!:add_http2_port", keywords,
+ &addr, &pygrpc_ServerCredentials_type, &creds)) {
+ return NULL;
+ }
+ if (creds) {
+ port = grpc_server_add_secure_http2_port(
+ self->c_serv, addr, creds->c_creds);
+ } else {
+ port = grpc_server_add_http2_port(self->c_serv, addr);
+ }
+ return PyInt_FromLong(port);
+
+}
+
+PyObject *pygrpc_Server_start(Server *self, PyObject *ignored) {
+ grpc_server_start(self->c_serv);
+ Py_RETURN_NONE;
+}
+
+PyObject *pygrpc_Server_shutdown(
+ Server *self, PyObject *args, PyObject *kwargs) {
+ PyObject *user_tag = NULL;
+ pygrpc_tag *tag;
+ static char *keywords[] = {"tag", NULL};
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|O", keywords, &user_tag)) {
+ return NULL;
+ }
+ if (user_tag) {
+ tag = pygrpc_produce_server_shutdown_tag(user_tag);
+ grpc_server_shutdown_and_notify(self->c_serv, tag);
+ } else {
+ grpc_server_shutdown(self->c_serv);
+ }
+ Py_RETURN_NONE;
+}
diff --git a/src/python/src/grpc/_adapter/_c/types/server_credentials.c b/src/python/src/grpc/_adapter/_c/types/server_credentials.c
new file mode 100644
index 0000000000..2e02c8fe81
--- /dev/null
+++ b/src/python/src/grpc/_adapter/_c/types/server_credentials.c
@@ -0,0 +1,146 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "grpc/_adapter/_c/types.h"
+
+#define PY_SSIZE_T_CLEAN
+#include <Python.h>
+#include <grpc/grpc.h>
+#include <grpc/grpc_security.h>
+#include <grpc/support/alloc.h>
+
+
+PyMethodDef pygrpc_ServerCredentials_methods[] = {
+ {"ssl", (PyCFunction)pygrpc_ServerCredentials_ssl,
+ METH_CLASS|METH_KEYWORDS, ""},
+ {"fake_transport_security",
+ (PyCFunction)pygrpc_ServerCredentials_fake_transport_security,
+ METH_CLASS|METH_NOARGS, ""},
+ {NULL}
+};
+const char pygrpc_ServerCredentials_doc[] = "";
+PyTypeObject pygrpc_ServerCredentials_type = {
+ PyObject_HEAD_INIT(NULL)
+ 0, /* ob_size */
+ "ServerCredentials", /* tp_name */
+ sizeof(ServerCredentials), /* tp_basicsize */
+ 0, /* tp_itemsize */
+ (destructor)pygrpc_ServerCredentials_dealloc, /* tp_dealloc */
+ 0, /* tp_print */
+ 0, /* tp_getattr */
+ 0, /* tp_setattr */
+ 0, /* tp_compare */
+ 0, /* tp_repr */
+ 0, /* tp_as_number */
+ 0, /* tp_as_sequence */
+ 0, /* tp_as_mapping */
+ 0, /* tp_hash */
+ 0, /* tp_call */
+ 0, /* tp_str */
+ 0, /* tp_getattro */
+ 0, /* tp_setattro */
+ 0, /* tp_as_buffer */
+ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
+ pygrpc_ServerCredentials_doc, /* tp_doc */
+ 0, /* tp_traverse */
+ 0, /* tp_clear */
+ 0, /* tp_richcompare */
+ 0, /* tp_weaklistoffset */
+ 0, /* tp_iter */
+ 0, /* tp_iternext */
+ pygrpc_ServerCredentials_methods, /* tp_methods */
+ 0, /* tp_members */
+ 0, /* tp_getset */
+ 0, /* tp_base */
+ 0, /* tp_dict */
+ 0, /* tp_descr_get */
+ 0, /* tp_descr_set */
+ 0, /* tp_dictoffset */
+ 0, /* tp_init */
+ 0, /* tp_alloc */
+ 0 /* tp_new */
+};
+
+void pygrpc_ServerCredentials_dealloc(ServerCredentials *self) {
+ grpc_server_credentials_release(self->c_creds);
+ self->ob_type->tp_free((PyObject *)self);
+}
+
+ServerCredentials *pygrpc_ServerCredentials_ssl(
+ PyTypeObject *type, PyObject *args, PyObject *kwargs) {
+ ServerCredentials *self;
+ const char *root_certs;
+ PyObject *py_key_cert_pairs;
+ grpc_ssl_pem_key_cert_pair *key_cert_pairs;
+ size_t num_key_cert_pairs;
+ size_t i;
+ static char *keywords[] = {"root_certs", "key_cert_pairs", NULL};
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "zO:ssl", keywords,
+ &root_certs, &py_key_cert_pairs)) {
+ return NULL;
+ }
+ if (!PyList_Check(py_key_cert_pairs)) {
+ PyErr_SetString(PyExc_TypeError, "expected a list of 2-tuples of strings");
+ return NULL;
+ }
+ num_key_cert_pairs = PyList_Size(py_key_cert_pairs);
+ key_cert_pairs =
+ gpr_malloc(sizeof(grpc_ssl_pem_key_cert_pair) * num_key_cert_pairs);
+ for (i = 0; i < num_key_cert_pairs; ++i) {
+ PyObject *item = PyList_GET_ITEM(py_key_cert_pairs, i);
+ const char *key;
+ const char *cert;
+ if (!PyArg_ParseTuple(item, "zz", &key, &cert)) {
+ gpr_free(key_cert_pairs);
+ PyErr_SetString(PyExc_TypeError,
+ "expected a list of 2-tuples of strings");
+ return NULL;
+ }
+ key_cert_pairs[i].private_key = key;
+ key_cert_pairs[i].cert_chain = cert;
+ }
+
+ self = (ServerCredentials *)type->tp_alloc(type, 0);
+ self->c_creds = grpc_ssl_server_credentials_create(
+ root_certs, key_cert_pairs, num_key_cert_pairs);
+ gpr_free(key_cert_pairs);
+ return self;
+}
+
+ServerCredentials *pygrpc_ServerCredentials_fake_transport_security(
+ PyTypeObject *type, PyObject *ignored) {
+ ServerCredentials *self = (ServerCredentials *)type->tp_alloc(type, 0);
+ self->c_creds = grpc_fake_transport_security_server_credentials_create();
+ return self;
+}
+
diff --git a/src/python/src/grpc/_adapter/_c/utility.c b/src/python/src/grpc/_adapter/_c/utility.c
new file mode 100644
index 0000000000..6d228c73fe
--- /dev/null
+++ b/src/python/src/grpc/_adapter/_c/utility.c
@@ -0,0 +1,460 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <math.h>
+
+#define PY_SSIZE_T_CLEAN
+#include <Python.h>
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+#include <grpc/support/slice.h>
+#include <grpc/support/time.h>
+
+#include "grpc/_adapter/_c/types.h"
+
+pygrpc_tag *pygrpc_produce_batch_tag(
+ PyObject *user_tag, Call *call, grpc_op *ops, size_t nops) {
+ pygrpc_tag *tag = gpr_malloc(sizeof(pygrpc_tag));
+ tag->user_tag = user_tag;
+ Py_XINCREF(tag->user_tag);
+ tag->call = call;
+ Py_XINCREF(tag->call);
+ tag->ops = gpr_malloc(sizeof(grpc_op)*nops);
+ memcpy(tag->ops, ops, sizeof(grpc_op)*nops);
+ tag->nops = nops;
+ grpc_call_details_init(&tag->request_call_details);
+ grpc_metadata_array_init(&tag->request_metadata);
+ tag->is_new_call = 0;
+ return tag;
+}
+
+pygrpc_tag *pygrpc_produce_request_tag(PyObject *user_tag, Call *empty_call) {
+ pygrpc_tag *tag = gpr_malloc(sizeof(pygrpc_tag));
+ tag->user_tag = user_tag;
+ Py_XINCREF(tag->user_tag);
+ tag->call = empty_call;
+ Py_XINCREF(tag->call);
+ tag->ops = NULL;
+ tag->nops = 0;
+ grpc_call_details_init(&tag->request_call_details);
+ grpc_metadata_array_init(&tag->request_metadata);
+ tag->is_new_call = 1;
+ return tag;
+}
+
+pygrpc_tag *pygrpc_produce_server_shutdown_tag(PyObject *user_tag) {
+ pygrpc_tag *tag = gpr_malloc(sizeof(pygrpc_tag));
+ tag->user_tag = user_tag;
+ Py_XINCREF(tag->user_tag);
+ tag->call = NULL;
+ tag->ops = NULL;
+ tag->nops = 0;
+ grpc_call_details_init(&tag->request_call_details);
+ grpc_metadata_array_init(&tag->request_metadata);
+ tag->is_new_call = 0;
+ return tag;
+}
+
+void pygrpc_discard_tag(pygrpc_tag *tag) {
+ if (!tag) {
+ return;
+ }
+ Py_XDECREF(tag->user_tag);
+ Py_XDECREF(tag->call);
+ gpr_free(tag->ops);
+ grpc_call_details_destroy(&tag->request_call_details);
+ grpc_metadata_array_destroy(&tag->request_metadata);
+ gpr_free(tag);
+}
+
+PyObject *pygrpc_consume_event(grpc_event event) {
+ pygrpc_tag *tag;
+ PyObject *result;
+ if (event.type == GRPC_QUEUE_TIMEOUT) {
+ Py_RETURN_NONE;
+ }
+ tag = event.tag;
+ switch (event.type) {
+ case GRPC_QUEUE_SHUTDOWN:
+ result = Py_BuildValue("iOOOOO", GRPC_QUEUE_SHUTDOWN,
+ Py_None, Py_None, Py_None, Py_None, Py_True);
+ break;
+ case GRPC_OP_COMPLETE:
+ if (tag->is_new_call) {
+ result = Py_BuildValue(
+ "iOO(ssd)[(iNOOOO)]O", GRPC_OP_COMPLETE, tag->user_tag, tag->call,
+ tag->request_call_details.method, tag->request_call_details.host,
+ pygrpc_cast_gpr_timespec_to_double(tag->request_call_details.deadline),
+ GRPC_OP_RECV_INITIAL_METADATA,
+ pygrpc_cast_metadata_array_to_pylist(tag->request_metadata), Py_None,
+ Py_None, Py_None, Py_None,
+ event.success ? Py_True : Py_False);
+ } else {
+ result = Py_BuildValue("iOOONO", GRPC_OP_COMPLETE, tag->user_tag,
+ tag->call, Py_None, pygrpc_consume_ops(tag->ops, tag->nops),
+ event.success ? Py_True : Py_False);
+ }
+ break;
+ default:
+ PyErr_SetString(PyExc_ValueError,
+ "unknown completion type; could not translate event");
+ return NULL;
+ }
+ pygrpc_discard_tag(tag);
+ return result;
+}
+
+int pygrpc_produce_op(PyObject *op, grpc_op *result) {
+ static const int OP_TUPLE_SIZE = 5;
+ static const int STATUS_TUPLE_SIZE = 2;
+ static const int TYPE_INDEX = 0;
+ static const int INITIAL_METADATA_INDEX = 1;
+ static const int TRAILING_METADATA_INDEX = 2;
+ static const int MESSAGE_INDEX = 3;
+ static const int STATUS_INDEX = 4;
+ static const int STATUS_CODE_INDEX = 0;
+ static const int STATUS_DETAILS_INDEX = 1;
+ grpc_op c_op;
+ if (!PyTuple_Check(op)) {
+ PyErr_SetString(PyExc_TypeError, "expected tuple op");
+ return 0;
+ }
+ if (PyTuple_Size(op) != OP_TUPLE_SIZE) {
+ char buf[64];
+ snprintf(buf, sizeof(buf), "expected tuple op of length %d", OP_TUPLE_SIZE);
+ PyErr_SetString(PyExc_ValueError, buf);
+ return 0;
+ }
+ int type = PyInt_AsLong(PyTuple_GET_ITEM(op, TYPE_INDEX));
+ if (PyErr_Occurred()) {
+ return 0;
+ }
+ Py_ssize_t message_size;
+ char *message;
+ char *status_details;
+ gpr_slice message_slice;
+ c_op.op = type;
+ switch (type) {
+ case GRPC_OP_SEND_INITIAL_METADATA:
+ if (!pygrpc_cast_pylist_to_send_metadata(
+ PyTuple_GetItem(op, INITIAL_METADATA_INDEX),
+ &c_op.data.send_initial_metadata.metadata,
+ &c_op.data.send_initial_metadata.count)) {
+ return 0;
+ }
+ break;
+ case GRPC_OP_SEND_MESSAGE:
+ PyString_AsStringAndSize(
+ PyTuple_GET_ITEM(op, MESSAGE_INDEX), &message, &message_size);
+ message_slice = gpr_slice_from_copied_buffer(message, message_size);
+ c_op.data.send_message = grpc_byte_buffer_create(&message_slice, 1);
+ gpr_slice_unref(message_slice);
+ break;
+ case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
+ /* Don't need to fill in any other fields. */
+ break;
+ case GRPC_OP_SEND_STATUS_FROM_SERVER:
+ if (!pygrpc_cast_pylist_to_send_metadata(
+ PyTuple_GetItem(op, TRAILING_METADATA_INDEX),
+ &c_op.data.send_status_from_server.trailing_metadata,
+ &c_op.data.send_status_from_server.trailing_metadata_count)) {
+ return 0;
+ }
+ if (!PyTuple_Check(PyTuple_GET_ITEM(op, STATUS_INDEX))) {
+ char buf[64];
+ snprintf(buf, sizeof(buf), "expected tuple status in op of length %d",
+ STATUS_TUPLE_SIZE);
+ PyErr_SetString(PyExc_TypeError, buf);
+ return 0;
+ }
+ c_op.data.send_status_from_server.status = PyInt_AsLong(
+ PyTuple_GET_ITEM(PyTuple_GET_ITEM(op, STATUS_INDEX), STATUS_CODE_INDEX));
+ status_details = PyString_AsString(
+ PyTuple_GET_ITEM(PyTuple_GET_ITEM(op, STATUS_INDEX), STATUS_DETAILS_INDEX));
+ if (PyErr_Occurred()) {
+ return 0;
+ }
+ c_op.data.send_status_from_server.status_details =
+ gpr_malloc(strlen(status_details) + 1);
+ strcpy((char *)c_op.data.send_status_from_server.status_details,
+ status_details);
+ break;
+ case GRPC_OP_RECV_INITIAL_METADATA:
+ c_op.data.recv_initial_metadata = gpr_malloc(sizeof(grpc_metadata_array));
+ grpc_metadata_array_init(c_op.data.recv_initial_metadata);
+ break;
+ case GRPC_OP_RECV_MESSAGE:
+ c_op.data.recv_message = gpr_malloc(sizeof(grpc_byte_buffer *));
+ break;
+ case GRPC_OP_RECV_STATUS_ON_CLIENT:
+ c_op.data.recv_status_on_client.trailing_metadata =
+ gpr_malloc(sizeof(grpc_metadata_array));
+ grpc_metadata_array_init(c_op.data.recv_status_on_client.trailing_metadata);
+ c_op.data.recv_status_on_client.status =
+ gpr_malloc(sizeof(grpc_status_code *));
+ c_op.data.recv_status_on_client.status_details =
+ gpr_malloc(sizeof(char *));
+ *c_op.data.recv_status_on_client.status_details = NULL;
+ c_op.data.recv_status_on_client.status_details_capacity =
+ gpr_malloc(sizeof(size_t));
+ *c_op.data.recv_status_on_client.status_details_capacity = 0;
+ break;
+ case GRPC_OP_RECV_CLOSE_ON_SERVER:
+ c_op.data.recv_close_on_server.cancelled = gpr_malloc(sizeof(int));
+ break;
+ default:
+ return 0;
+ }
+ *result = c_op;
+ return 1;
+}
+
+void pygrpc_discard_op(grpc_op op) {
+ switch(op.op) {
+ case GRPC_OP_SEND_INITIAL_METADATA:
+ gpr_free(op.data.send_initial_metadata.metadata);
+ break;
+ case GRPC_OP_SEND_MESSAGE:
+ grpc_byte_buffer_destroy(op.data.send_message);
+ break;
+ case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
+ /* Don't need to free any fields. */
+ break;
+ case GRPC_OP_SEND_STATUS_FROM_SERVER:
+ gpr_free(op.data.send_status_from_server.trailing_metadata);
+ gpr_free((char *)op.data.send_status_from_server.status_details);
+ break;
+ case GRPC_OP_RECV_INITIAL_METADATA:
+ grpc_metadata_array_destroy(op.data.recv_initial_metadata);
+ gpr_free(op.data.recv_initial_metadata);
+ break;
+ case GRPC_OP_RECV_MESSAGE:
+ grpc_byte_buffer_destroy(*op.data.recv_message);
+ gpr_free(op.data.recv_message);
+ break;
+ case GRPC_OP_RECV_STATUS_ON_CLIENT:
+ grpc_metadata_array_destroy(op.data.recv_status_on_client.trailing_metadata);
+ gpr_free(op.data.recv_status_on_client.trailing_metadata);
+ gpr_free(op.data.recv_status_on_client.status);
+ gpr_free(*op.data.recv_status_on_client.status_details);
+ gpr_free(op.data.recv_status_on_client.status_details);
+ gpr_free(op.data.recv_status_on_client.status_details_capacity);
+ break;
+ case GRPC_OP_RECV_CLOSE_ON_SERVER:
+ gpr_free(op.data.recv_close_on_server.cancelled);
+ break;
+ }
+}
+
+PyObject *pygrpc_consume_ops(grpc_op *op, size_t nops) {
+ static const int TYPE_INDEX = 0;
+ static const int INITIAL_METADATA_INDEX = 1;
+ static const int TRAILING_METADATA_INDEX = 2;
+ static const int MESSAGE_INDEX = 3;
+ static const int STATUS_INDEX = 4;
+ static const int CANCELLED_INDEX = 5;
+ static const int OPRESULT_LENGTH = 6;
+ PyObject *list;
+ size_t i;
+ size_t j;
+ char *bytes;
+ size_t bytes_size;
+ PyObject *results = PyList_New(nops);
+ if (!results) {
+ return NULL;
+ }
+ for (i = 0; i < nops; ++i) {
+ PyObject *result = PyTuple_Pack(OPRESULT_LENGTH, Py_None, Py_None, Py_None,
+ Py_None, Py_None, Py_None);
+ PyTuple_SetItem(result, TYPE_INDEX, PyInt_FromLong(op[i].op));
+ switch(op[i].op) {
+ case GRPC_OP_RECV_INITIAL_METADATA:
+ PyTuple_SetItem(result, INITIAL_METADATA_INDEX,
+ list=PyList_New(op[i].data.recv_initial_metadata->count));
+ for (j = 0; j < op[i].data.recv_initial_metadata->count; ++j) {
+ grpc_metadata md = op[i].data.recv_initial_metadata->metadata[j];
+ PyList_SetItem(list, j, Py_BuildValue("ss#", md.key, md.value,
+ (Py_ssize_t)md.value_length));
+ }
+ break;
+ case GRPC_OP_RECV_MESSAGE:
+ if (*op[i].data.recv_message) {
+ pygrpc_byte_buffer_to_bytes(
+ *op[i].data.recv_message, &bytes, &bytes_size);
+ PyTuple_SetItem(result, MESSAGE_INDEX,
+ PyString_FromStringAndSize(bytes, bytes_size));
+ gpr_free(bytes);
+ } else {
+ PyTuple_SetItem(result, MESSAGE_INDEX, Py_BuildValue(""));
+ }
+ break;
+ case GRPC_OP_RECV_STATUS_ON_CLIENT:
+ PyTuple_SetItem(
+ result, TRAILING_METADATA_INDEX,
+ list = PyList_New(op[i].data.recv_status_on_client.trailing_metadata->count));
+ for (j = 0; j < op[i].data.recv_status_on_client.trailing_metadata->count; ++j) {
+ grpc_metadata md =
+ op[i].data.recv_status_on_client.trailing_metadata->metadata[j];
+ PyList_SetItem(list, j, Py_BuildValue("ss#", md.key, md.value,
+ (Py_ssize_t)md.value_length));
+ }
+ PyTuple_SetItem(
+ result, STATUS_INDEX, Py_BuildValue(
+ "is", *op[i].data.recv_status_on_client.status,
+ *op[i].data.recv_status_on_client.status_details));
+ break;
+ case GRPC_OP_RECV_CLOSE_ON_SERVER:
+ PyTuple_SetItem(
+ result, CANCELLED_INDEX,
+ PyBool_FromLong(*op[i].data.recv_close_on_server.cancelled));
+ break;
+ default:
+ break;
+ }
+ pygrpc_discard_op(op[i]);
+ PyList_SetItem(results, i, result);
+ }
+ return results;
+}
+
+double pygrpc_cast_gpr_timespec_to_double(gpr_timespec timespec) {
+ return timespec.tv_sec + 1e-9*timespec.tv_nsec;
+}
+
+gpr_timespec pygrpc_cast_double_to_gpr_timespec(double seconds) {
+ gpr_timespec result;
+ if isinf(seconds) {
+ result = seconds > 0.0 ? gpr_inf_future : gpr_inf_past;
+ } else {
+ result.tv_sec = (time_t)seconds;
+ result.tv_nsec = ((seconds - result.tv_sec) * 1e9);
+ }
+ return result;
+}
+
+int pygrpc_produce_channel_args(PyObject *py_args, grpc_channel_args *c_args) {
+ size_t num_args = PyList_Size(py_args);
+ size_t i;
+ grpc_channel_args args = {num_args, gpr_malloc(sizeof(grpc_arg) * num_args)};
+ for (i = 0; i < args.num_args; ++i) {
+ char *key;
+ PyObject *value;
+ if (!PyArg_ParseTuple(PyList_GetItem(py_args, i), "zO", &key, &value)) {
+ gpr_free(args.args);
+ args.num_args = 0;
+ args.args = NULL;
+ PyErr_SetString(PyExc_TypeError,
+ "expected a list of 2-tuple of str and str|int|None");
+ return 0;
+ }
+ args.args[i].key = key;
+ if (PyInt_Check(value)) {
+ args.args[i].type = GRPC_ARG_INTEGER;
+ args.args[i].value.integer = PyInt_AsLong(value);
+ } else if (PyString_Check(value)) {
+ args.args[i].type = GRPC_ARG_STRING;
+ args.args[i].value.string = PyString_AsString(value);
+ } else if (value == Py_None) {
+ --args.num_args;
+ --i;
+ continue;
+ } else {
+ gpr_free(args.args);
+ args.num_args = 0;
+ args.args = NULL;
+ PyErr_SetString(PyExc_TypeError,
+ "expected a list of 2-tuple of str and str|int|None");
+ return 0;
+ }
+ }
+ *c_args = args;
+ return 1;
+}
+
+void pygrpc_discard_channel_args(grpc_channel_args args) {
+ gpr_free(args.args);
+}
+
+int pygrpc_cast_pylist_to_send_metadata(
+ PyObject *pylist, grpc_metadata **metadata, size_t *count) {
+ size_t i;
+ Py_ssize_t value_length;
+ *count = PyList_Size(pylist);
+ *metadata = gpr_malloc(sizeof(grpc_metadata) * *count);
+ for (i = 0; i < *count; ++i) {
+ if (!PyArg_ParseTuple(
+ PyList_GetItem(pylist, i), "ss#",
+ &(*metadata)[i].key, &(*metadata)[i].value, &value_length)) {
+ gpr_free(*metadata);
+ *count = 0;
+ *metadata = NULL;
+ return 0;
+ }
+ (*metadata)[i].value_length = value_length;
+ }
+ return 1;
+}
+
+PyObject *pygrpc_cast_metadata_array_to_pylist(grpc_metadata_array metadata) {
+ PyObject *result = PyList_New(metadata.count);
+ size_t i;
+ for (i = 0; i < metadata.count; ++i) {
+ PyList_SetItem(
+ result, i, Py_BuildValue(
+ "ss#", metadata.metadata[i].key, metadata.metadata[i].value,
+ (Py_ssize_t)metadata.metadata[i].value_length));
+ if (PyErr_Occurred()) {
+ Py_DECREF(result);
+ return NULL;
+ }
+ }
+ return result;
+}
+
+void pygrpc_byte_buffer_to_bytes(
+ grpc_byte_buffer *buffer, char **result, size_t *result_size) {
+ grpc_byte_buffer_reader *reader = grpc_byte_buffer_reader_create(buffer);
+ gpr_slice slice;
+ char *read_result = NULL;
+ size_t size = 0;
+ while (grpc_byte_buffer_reader_next(reader, &slice)) {
+ read_result = gpr_realloc(read_result, size + GPR_SLICE_LENGTH(slice));
+ memcpy(read_result + size, GPR_SLICE_START_PTR(slice),
+ GPR_SLICE_LENGTH(slice));
+ size = size + GPR_SLICE_LENGTH(slice);
+ gpr_slice_unref(slice);
+ }
+ grpc_byte_buffer_reader_destroy(reader);
+ *result_size = size;
+ *result = read_result;
+}
diff --git a/src/python/src/grpc/_adapter/_c_test.py b/src/python/src/grpc/_adapter/_c_test.py
index b06215f0e5..133b124072 100644
--- a/src/python/src/grpc/_adapter/_c_test.py
+++ b/src/python/src/grpc/_adapter/_c_test.py
@@ -27,192 +27,40 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-"""Tests for _adapter._c."""
-
-import threading
import time
import unittest
from grpc._adapter import _c
-from grpc._adapter import _datatypes
-
-_TIMEOUT = 3
-_FUTURE = time.time() + 60 * 60 * 24
-_IDEMPOTENCE_DEMONSTRATION = 7
-
-
-class _CTest(unittest.TestCase):
-
- def testUpAndDown(self):
- _c.init()
- _c.shut_down()
-
- def testCompletionQueue(self):
- _c.init()
-
- completion_queue = _c.CompletionQueue()
- event = completion_queue.get(0)
- self.assertIsNone(event)
- event = completion_queue.get(time.time())
- self.assertIsNone(event)
- event = completion_queue.get(time.time() + _TIMEOUT)
- self.assertIsNone(event)
- completion_queue.stop()
- for _ in range(_IDEMPOTENCE_DEMONSTRATION):
- event = completion_queue.get(time.time() + _TIMEOUT)
- self.assertIs(event.kind, _datatypes.Event.Kind.STOP)
-
- del completion_queue
- del event
+from grpc._adapter import _types
- _c.shut_down()
- def testChannel(self):
- _c.init()
-
- channel = _c.Channel(
- 'test host:12345', None, server_host_override='ignored')
- del channel
-
- _c.shut_down()
-
- def testCall(self):
- method = 'test method'
- host = 'test host'
-
- _c.init()
-
- channel = _c.Channel('%s:%d' % (host, 12345), None)
- completion_queue = _c.CompletionQueue()
- call = _c.Call(channel, completion_queue, method, host,
- time.time() + _TIMEOUT)
- del call
- del completion_queue
- del channel
+class CTypeSmokeTest(unittest.TestCase):
- _c.shut_down()
+ def testClientCredentialsUpDown(self):
+ credentials = _c.ClientCredentials.fake_transport_security()
+ del credentials
- def testServer(self):
- _c.init()
+ def testServerCredentialsUpDown(self):
+ credentials = _c.ServerCredentials.fake_transport_security()
+ del credentials
+ def testCompletionQueueUpDown(self):
completion_queue = _c.CompletionQueue()
- server = _c.Server(completion_queue)
- server.add_http2_addr('[::]:0')
- server.start()
- server.stop()
- completion_queue.stop()
- del server
del completion_queue
- service_tag = object()
+ def testServerUpDown(self):
completion_queue = _c.CompletionQueue()
- server = _c.Server(completion_queue)
- server.add_http2_addr('[::]:0')
- server.start()
- server.service(service_tag)
- server.stop()
- completion_queue.stop()
- event = completion_queue.get(time.time() + _TIMEOUT)
- self.assertIs(event.kind, _datatypes.Event.Kind.SERVICE_ACCEPTED)
- self.assertIs(event.tag, service_tag)
- self.assertIsNone(event.service_acceptance)
- for _ in range(_IDEMPOTENCE_DEMONSTRATION):
- event = completion_queue.get(time.time() + _TIMEOUT)
- self.assertIs(event.kind, _datatypes.Event.Kind.STOP)
- del server
+ serv = _c.Server(completion_queue, [])
+ del serv
del completion_queue
- completion_queue = _c.CompletionQueue()
- server = _c.Server(completion_queue)
- server.add_http2_addr('[::]:0')
- server.start()
- thread = threading.Thread(target=completion_queue.get, args=(_FUTURE,))
- thread.start()
- time.sleep(1)
- server.stop()
- completion_queue.stop()
- for _ in range(_IDEMPOTENCE_DEMONSTRATION):
- event = completion_queue.get(time.time() + _TIMEOUT)
- self.assertIs(event.kind, _datatypes.Event.Kind.STOP)
- thread.join()
- del server
- del completion_queue
-
- _c.shut_down()
-
- def test_client_credentials(self):
- root_certificates = b'Trust starts here. Really.'
- private_key = b'This is a really bad private key, yo.'
- certificate_chain = b'Trust me! Do I not look trustworty?'
-
- _c.init()
-
- client_credentials = _c.ClientCredentials(
- None, None, None)
- self.assertIsNotNone(client_credentials)
- client_credentials = _c.ClientCredentials(
- root_certificates, None, None)
- self.assertIsNotNone(client_credentials)
- client_credentials = _c.ClientCredentials(
- None, private_key, certificate_chain)
- self.assertIsNotNone(client_credentials)
- client_credentials = _c.ClientCredentials(
- root_certificates, private_key, certificate_chain)
- self.assertIsNotNone(client_credentials)
- del client_credentials
-
- _c.shut_down()
-
- def test_server_credentials(self):
- root_certificates = b'Trust starts here. Really.'
- first_private_key = b'This is a really bad private key, yo.'
- first_certificate_chain = b'Trust me! Do I not look trustworty?'
- second_private_key = b'This is another bad private key, yo.'
- second_certificate_chain = b'Look into my eyes; you can totes trust me.'
-
- _c.init()
-
- server_credentials = _c.ServerCredentials(
- None, ((first_private_key, first_certificate_chain),))
- del server_credentials
- server_credentials = _c.ServerCredentials(
- root_certificates, ((first_private_key, first_certificate_chain),))
- del server_credentials
- server_credentials = _c.ServerCredentials(
- root_certificates,
- ((first_private_key, first_certificate_chain),
- (second_private_key, second_certificate_chain),))
- del server_credentials
- with self.assertRaises(TypeError):
- _c.ServerCredentials(
- root_certificates, first_private_key, second_certificate_chain)
-
- _c.shut_down()
-
- @unittest.skip('TODO(nathaniel): find and use real-enough test credentials')
- def test_secure_server(self):
- _c.init()
-
- server_credentials = _c.ServerCredentials(
- 'root certificate', (('private key', 'certificate chain'),))
-
- completion_queue = _c.CompletionQueue()
- server = _c.Server(completion_queue, server_credentials)
- server.add_http2_addr('[::]:0')
- server.start()
- thread = threading.Thread(target=completion_queue.get, args=(_FUTURE,))
- thread.start()
- time.sleep(1)
- server.stop()
- completion_queue.stop()
- for _ in range(_IDEMPOTENCE_DEMONSTRATION):
- event = completion_queue.get(time.time() + _TIMEOUT)
- self.assertIs(event.kind, _datatypes.Event.Kind.STOP)
- thread.join()
- del server
- del completion_queue
+ def testChannelUpDown(self):
+ channel = _c.Channel('[::]:0', [])
+ del channel
- _c.shut_down()
+ def testSecureChannelUpDown(self):
+ channel = _c.Channel('[::]:0', [], _c.ClientCredentials.fake_transport_security())
+ del channel
if __name__ == '__main__':
diff --git a/src/python/src/grpc/_adapter/_call.c b/src/python/src/grpc/_adapter/_call.c
deleted file mode 100644
index d833268fc9..0000000000
--- a/src/python/src/grpc/_adapter/_call.c
+++ /dev/null
@@ -1,438 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "grpc/_adapter/_call.h"
-
-#include <math.h>
-#include <Python.h>
-#include <grpc/grpc.h>
-#include <grpc/support/alloc.h>
-
-#include "grpc/_adapter/_channel.h"
-#include "grpc/_adapter/_completion_queue.h"
-#include "grpc/_adapter/_error.h"
-#include "grpc/_adapter/_tag.h"
-
-static PyObject *pygrpc_call_new(PyTypeObject *type, PyObject *args, PyObject *kwds) {
- Call *self = (Call *)type->tp_alloc(type, 0);
- Channel *channel;
- CompletionQueue *completion_queue;
- const char *method;
- const char *host;
- double deadline;
- static char *kwlist[] = {"channel", "completion_queue",
- "method", "host", "deadline", NULL};
-
- if (!PyArg_ParseTupleAndKeywords(
- args, kwds, "O!O!ssd:Call", kwlist,
- &pygrpc_ChannelType, &channel,
- &pygrpc_CompletionQueueType, &completion_queue,
- &method, &host, &deadline)) {
- return NULL;
- }
-
- /* TODO(nathaniel): Hoist the gpr_timespec <-> PyFloat arithmetic into its own
- * function with its own test coverage.
- */
- self->c_call = grpc_channel_create_call(
- channel->c_channel, completion_queue->c_completion_queue, method, host,
- gpr_time_from_nanos(deadline * GPR_NS_PER_SEC));
- self->completion_queue = completion_queue;
- Py_INCREF(self->completion_queue);
- self->channel = channel;
- Py_INCREF(self->channel);
- grpc_call_details_init(&self->call_details);
- grpc_metadata_array_init(&self->recv_metadata);
- grpc_metadata_array_init(&self->recv_trailing_metadata);
- self->send_metadata = NULL;
- self->send_metadata_count = 0;
- self->send_trailing_metadata = NULL;
- self->send_trailing_metadata_count = 0;
- self->send_message = NULL;
- self->recv_message = NULL;
- self->adding_to_trailing = 0;
-
- return (PyObject *)self;
-}
-
-static void pygrpc_call_dealloc(Call *self) {
- if (self->c_call != NULL) {
- grpc_call_destroy(self->c_call);
- }
- Py_XDECREF(self->completion_queue);
- Py_XDECREF(self->channel);
- Py_XDECREF(self->server);
- grpc_call_details_destroy(&self->call_details);
- grpc_metadata_array_destroy(&self->recv_metadata);
- grpc_metadata_array_destroy(&self->recv_trailing_metadata);
- if (self->send_message) {
- grpc_byte_buffer_destroy(self->send_message);
- }
- if (self->recv_message) {
- grpc_byte_buffer_destroy(self->recv_message);
- }
- gpr_free(self->status_details);
- gpr_free(self->send_metadata);
- gpr_free(self->send_trailing_metadata);
- self->ob_type->tp_free((PyObject *)self);
-}
-
-static const PyObject *pygrpc_call_invoke(Call *self, PyObject *args) {
- PyObject *completion_queue;
- PyObject *metadata_tag;
- PyObject *finish_tag;
- grpc_call_error call_error;
- const PyObject *result;
- pygrpc_tag *c_init_metadata_tag;
- pygrpc_tag *c_metadata_tag;
- pygrpc_tag *c_finish_tag;
- grpc_op send_initial_metadata;
- grpc_op recv_initial_metadata;
- grpc_op recv_status_on_client;
-
- if (!(PyArg_ParseTuple(args, "O!OO:invoke", &pygrpc_CompletionQueueType,
- &completion_queue, &metadata_tag, &finish_tag))) {
- return NULL;
- }
- send_initial_metadata.op = GRPC_OP_SEND_INITIAL_METADATA;
- send_initial_metadata.data.send_initial_metadata.metadata = self->send_metadata;
- send_initial_metadata.data.send_initial_metadata.count = self->send_metadata_count;
- recv_initial_metadata.op = GRPC_OP_RECV_INITIAL_METADATA;
- recv_initial_metadata.data.recv_initial_metadata = &self->recv_metadata;
- recv_status_on_client.op = GRPC_OP_RECV_STATUS_ON_CLIENT;
- recv_status_on_client.data.recv_status_on_client.trailing_metadata = &self->recv_trailing_metadata;
- recv_status_on_client.data.recv_status_on_client.status = &self->status;
- recv_status_on_client.data.recv_status_on_client.status_details = &self->status_details;
- recv_status_on_client.data.recv_status_on_client.status_details_capacity = &self->status_details_capacity;
- c_init_metadata_tag = pygrpc_tag_new(PYGRPC_INITIAL_METADATA, NULL, self);
- c_metadata_tag = pygrpc_tag_new(PYGRPC_CLIENT_METADATA_READ, metadata_tag, self);
- c_finish_tag = pygrpc_tag_new(PYGRPC_FINISHED_CLIENT, finish_tag, self);
-
- call_error = grpc_call_start_batch(self->c_call, &send_initial_metadata, 1, c_init_metadata_tag);
- result = pygrpc_translate_call_error(call_error);
- if (result == NULL) {
- pygrpc_tag_destroy(c_init_metadata_tag);
- pygrpc_tag_destroy(c_metadata_tag);
- pygrpc_tag_destroy(c_finish_tag);
- return result;
- }
- call_error = grpc_call_start_batch(self->c_call, &recv_initial_metadata, 1, c_metadata_tag);
- result = pygrpc_translate_call_error(call_error);
- if (result == NULL) {
- pygrpc_tag_destroy(c_metadata_tag);
- pygrpc_tag_destroy(c_finish_tag);
- return result;
- }
- call_error = grpc_call_start_batch(self->c_call, &recv_status_on_client, 1, c_finish_tag);
- result = pygrpc_translate_call_error(call_error);
- if (result == NULL) {
- pygrpc_tag_destroy(c_finish_tag);
- return result;
- }
-
- return result;
-}
-
-static const PyObject *pygrpc_call_write(Call *self, PyObject *args) {
- const char *bytes;
- int length;
- PyObject *tag;
- gpr_slice slice;
- grpc_byte_buffer *byte_buffer;
- grpc_call_error call_error;
- const PyObject *result;
- pygrpc_tag *c_tag;
- grpc_op op;
-
- if (!(PyArg_ParseTuple(args, "s#O:write", &bytes, &length, &tag))) {
- return NULL;
- }
- c_tag = pygrpc_tag_new(PYGRPC_WRITE_ACCEPTED, tag, self);
-
- slice = gpr_slice_from_copied_buffer(bytes, length);
- byte_buffer = grpc_byte_buffer_create(&slice, 1);
- gpr_slice_unref(slice);
-
- if (self->send_message) {
- grpc_byte_buffer_destroy(self->send_message);
- }
- self->send_message = byte_buffer;
-
- op.op = GRPC_OP_SEND_MESSAGE;
- op.data.send_message = self->send_message;
-
- call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag);
-
- result = pygrpc_translate_call_error(call_error);
- if (result == NULL) {
- pygrpc_tag_destroy(c_tag);
- }
- return result;
-}
-
-static const PyObject *pygrpc_call_complete(Call *self, PyObject *tag) {
- grpc_call_error call_error;
- const PyObject *result;
- pygrpc_tag *c_tag = pygrpc_tag_new(PYGRPC_FINISH_ACCEPTED, tag, self);
- grpc_op op;
-
- op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
-
- call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag);
-
- result = pygrpc_translate_call_error(call_error);
- if (result == NULL) {
- pygrpc_tag_destroy(c_tag);
- }
- return result;
-}
-
-static const PyObject *pygrpc_call_accept(Call *self, PyObject *args) {
- PyObject *completion_queue;
- PyObject *tag;
- grpc_call_error call_error;
- const PyObject *result;
- pygrpc_tag *c_tag;
- grpc_op op;
-
- if (!(PyArg_ParseTuple(args, "O!O:accept", &pygrpc_CompletionQueueType,
- &completion_queue, &tag))) {
- return NULL;
- }
-
- op.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
- op.data.recv_close_on_server.cancelled = &self->cancelled;
- c_tag = pygrpc_tag_new(PYGRPC_FINISHED_SERVER, tag, self);
-
- call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag);
- result = pygrpc_translate_call_error(call_error);
- if (result == NULL) {
- pygrpc_tag_destroy(c_tag);
- }
- return result;
-}
-
-static const PyObject *pygrpc_call_add_metadata(Call *self, PyObject *args) {
- const char* key = NULL;
- const char* value = NULL;
- int value_length = 0;
- grpc_metadata metadata;
- if (!PyArg_ParseTuple(args, "ss#", &key, &value, &value_length)) {
- return NULL;
- }
- metadata.key = key;
- metadata.value = value;
- metadata.value_length = value_length;
- if (self->adding_to_trailing) {
- self->send_trailing_metadata = gpr_realloc(self->send_trailing_metadata, (self->send_trailing_metadata_count + 1) * sizeof(grpc_metadata));
- self->send_trailing_metadata[self->send_trailing_metadata_count] = metadata;
- self->send_trailing_metadata_count = self->send_trailing_metadata_count + 1;
- } else {
- self->send_metadata = gpr_realloc(self->send_metadata, (self->send_metadata_count + 1) * sizeof(grpc_metadata));
- self->send_metadata[self->send_metadata_count] = metadata;
- self->send_metadata_count = self->send_metadata_count + 1;
- }
- return pygrpc_translate_call_error(GRPC_CALL_OK);
-}
-
-static const PyObject *pygrpc_call_premetadata(Call *self) {
- grpc_op op;
- grpc_call_error call_error;
- const PyObject *result;
- pygrpc_tag *c_tag = pygrpc_tag_new(PYGRPC_INITIAL_METADATA, NULL, self);
- op.op = GRPC_OP_SEND_INITIAL_METADATA;
- op.data.send_initial_metadata.metadata = self->send_metadata;
- op.data.send_initial_metadata.count = self->send_metadata_count;
- self->adding_to_trailing = 1;
-
- call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag);
- result = pygrpc_translate_call_error(call_error);
- if (result == NULL) {
- pygrpc_tag_destroy(c_tag);
- }
- return result;
-}
-
-static const PyObject *pygrpc_call_read(Call *self, PyObject *tag) {
- grpc_op op;
- grpc_call_error call_error;
- const PyObject *result;
- pygrpc_tag *c_tag = pygrpc_tag_new(PYGRPC_READ, tag, self);
-
- op.op = GRPC_OP_RECV_MESSAGE;
- if (self->recv_message) {
- grpc_byte_buffer_destroy(self->recv_message);
- self->recv_message = NULL;
- }
- op.data.recv_message = &self->recv_message;
- call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag);
- result = pygrpc_translate_call_error(call_error);
- if (result == NULL) {
- pygrpc_tag_destroy(c_tag);
- }
- return result;
-}
-
-static const PyObject *pygrpc_call_status(Call *self, PyObject *args) {
- PyObject *status;
- PyObject *code;
- PyObject *details;
- PyObject *tag;
- grpc_status_code c_code;
- char *c_message;
- grpc_call_error call_error;
- const PyObject *result;
- pygrpc_tag *c_tag;
- grpc_op op;
-
- if (!(PyArg_ParseTuple(args, "OO:status", &status, &tag))) {
- return NULL;
- }
- c_tag = pygrpc_tag_new(PYGRPC_FINISH_ACCEPTED, tag, self);
-
- code = PyObject_GetAttrString(status, "code");
- if (code == NULL) {
- return NULL;
- }
- details = PyObject_GetAttrString(status, "details");
- if (details == NULL) {
- Py_DECREF(code);
- return NULL;
- }
- c_code = PyInt_AsLong(code);
- Py_DECREF(code);
- if (c_code == -1 && PyErr_Occurred()) {
- Py_DECREF(details);
- return NULL;
- }
- c_message = PyBytes_AsString(details);
- Py_DECREF(details);
- if (c_message == NULL) {
- return NULL;
- }
- if (self->status_details) {
- gpr_free(self->status_details);
- }
- self->status_details = gpr_malloc(strlen(c_message)+1);
- strcpy(self->status_details, c_message);
- op.op = GRPC_OP_SEND_STATUS_FROM_SERVER;
- op.data.send_status_from_server.trailing_metadata_count = self->send_trailing_metadata_count;
- op.data.send_status_from_server.trailing_metadata = self->send_trailing_metadata;
- op.data.send_status_from_server.status = c_code;
- op.data.send_status_from_server.status_details = self->status_details;
-
- call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag);
- result = pygrpc_translate_call_error(call_error);
- if (result == NULL) {
- pygrpc_tag_destroy(c_tag);
- }
- return result;
-}
-
-static const PyObject *pygrpc_call_cancel(Call *self) {
- return pygrpc_translate_call_error(grpc_call_cancel(self->c_call));
-}
-
-static PyMethodDef methods[] = {
- {"invoke", (PyCFunction)pygrpc_call_invoke, METH_VARARGS,
- "Invoke this call."},
- {"write", (PyCFunction)pygrpc_call_write, METH_VARARGS,
- "Write bytes to this call."},
- {"complete", (PyCFunction)pygrpc_call_complete, METH_O,
- "Complete writes to this call."},
- {"accept", (PyCFunction)pygrpc_call_accept, METH_VARARGS, "Accept an RPC."},
- {"add_metadata", (PyCFunction)pygrpc_call_add_metadata, METH_VARARGS,
- "Add metadata to the call. May not be called after invoke on the client "
- "side. On the server side: when called before premetadata it provides "
- "'leading' metadata, when called after premetadata but before status it "
- "provides 'trailing metadata'; may not be called after status."},
- {"premetadata", (PyCFunction)pygrpc_call_premetadata, METH_VARARGS,
- "Indicate the end of leading metadata in the response."},
- {"read", (PyCFunction)pygrpc_call_read, METH_O,
- "Read bytes from this call."},
- {"status", (PyCFunction)pygrpc_call_status, METH_VARARGS,
- "Report this call's status."},
- {"cancel", (PyCFunction)pygrpc_call_cancel, METH_NOARGS,
- "Cancel this call."},
- {NULL}};
-
-PyTypeObject pygrpc_CallType = {
- PyVarObject_HEAD_INIT(NULL, 0)
- "_grpc.Call", /*tp_name*/
- sizeof(Call), /*tp_basicsize*/
- 0, /*tp_itemsize*/
- (destructor)pygrpc_call_dealloc, /*tp_dealloc*/
- 0, /*tp_print*/
- 0, /*tp_getattr*/
- 0, /*tp_setattr*/
- 0, /*tp_compare*/
- 0, /*tp_repr*/
- 0, /*tp_as_number*/
- 0, /*tp_as_sequence*/
- 0, /*tp_as_mapping*/
- 0, /*tp_hash */
- 0, /*tp_call*/
- 0, /*tp_str*/
- 0, /*tp_getattro*/
- 0, /*tp_setattro*/
- 0, /*tp_as_buffer*/
- Py_TPFLAGS_DEFAULT, /*tp_flags*/
- "Wrapping of grpc_call.", /* tp_doc */
- 0, /* tp_traverse */
- 0, /* tp_clear */
- 0, /* tp_richcompare */
- 0, /* tp_weaklistoffset */
- 0, /* tp_iter */
- 0, /* tp_iternext */
- methods, /* tp_methods */
- 0, /* tp_members */
- 0, /* tp_getset */
- 0, /* tp_base */
- 0, /* tp_dict */
- 0, /* tp_descr_get */
- 0, /* tp_descr_set */
- 0, /* tp_dictoffset */
- 0, /* tp_init */
- 0, /* tp_alloc */
- pygrpc_call_new, /* tp_new */
-};
-
-int pygrpc_add_call(PyObject *module) {
- if (PyType_Ready(&pygrpc_CallType) < 0) {
- return -1;
- }
- if (PyModule_AddObject(module, "Call", (PyObject *)&pygrpc_CallType) == -1) {
- return -1;
- }
- return 0;
-}
diff --git a/src/python/src/grpc/_adapter/_channel.c b/src/python/src/grpc/_adapter/_channel.c
deleted file mode 100644
index 6be8f1c364..0000000000
--- a/src/python/src/grpc/_adapter/_channel.c
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "grpc/_adapter/_channel.h"
-
-#include <Python.h>
-#include <grpc/grpc.h>
-#include <grpc/grpc_security.h>
-
-#include "grpc/_adapter/_client_credentials.h"
-
-static int pygrpc_channel_init(Channel *self, PyObject *args, PyObject *kwds) {
- const char *hostport;
- PyObject *client_credentials;
- char *server_host_override = NULL;
- static char *kwlist[] = {"hostport", "client_credentials",
- "server_host_override", NULL};
- grpc_arg server_host_override_arg;
- grpc_channel_args channel_args;
-
- if (!(PyArg_ParseTupleAndKeywords(args, kwds, "sO|z:Channel", kwlist,
- &hostport, &client_credentials,
- &server_host_override))) {
- return -1;
- }
- if (client_credentials == Py_None) {
- self->c_channel = grpc_channel_create(hostport, NULL);
- return 0;
- } else {
- if (server_host_override == NULL) {
- self->c_channel = grpc_secure_channel_create(
- ((ClientCredentials *)client_credentials)->c_client_credentials,
- hostport, NULL);
- } else {
- server_host_override_arg.type = GRPC_ARG_STRING;
- server_host_override_arg.key = GRPC_SSL_TARGET_NAME_OVERRIDE_ARG;
- server_host_override_arg.value.string = server_host_override;
- channel_args.num_args = 1;
- channel_args.args = &server_host_override_arg;
- self->c_channel = grpc_secure_channel_create(
- ((ClientCredentials *)client_credentials)->c_client_credentials,
- hostport, &channel_args);
- }
- return 0;
- }
-}
-
-static void pygrpc_channel_dealloc(Channel *self) {
- if (self->c_channel != NULL) {
- grpc_channel_destroy(self->c_channel);
- }
- self->ob_type->tp_free((PyObject *)self);
-}
-
-PyTypeObject pygrpc_ChannelType = {
- PyVarObject_HEAD_INIT(NULL, 0)
- "_grpc.Channel", /*tp_name*/
- sizeof(Channel), /*tp_basicsize*/
- 0, /*tp_itemsize*/
- (destructor)pygrpc_channel_dealloc, /*tp_dealloc*/
- 0, /*tp_print*/
- 0, /*tp_getattr*/
- 0, /*tp_setattr*/
- 0, /*tp_compare*/
- 0, /*tp_repr*/
- 0, /*tp_as_number*/
- 0, /*tp_as_sequence*/
- 0, /*tp_as_mapping*/
- 0, /*tp_hash */
- 0, /*tp_call*/
- 0, /*tp_str*/
- 0, /*tp_getattro*/
- 0, /*tp_setattro*/
- 0, /*tp_as_buffer*/
- Py_TPFLAGS_DEFAULT, /*tp_flags*/
- "Wrapping of grpc_channel.", /* tp_doc */
- 0, /* tp_traverse */
- 0, /* tp_clear */
- 0, /* tp_richcompare */
- 0, /* tp_weaklistoffset */
- 0, /* tp_iter */
- 0, /* tp_iternext */
- 0, /* tp_methods */
- 0, /* tp_members */
- 0, /* tp_getset */
- 0, /* tp_base */
- 0, /* tp_dict */
- 0, /* tp_descr_get */
- 0, /* tp_descr_set */
- 0, /* tp_dictoffset */
- (initproc)pygrpc_channel_init, /* tp_init */
- 0, /* tp_alloc */
- PyType_GenericNew, /* tp_new */
-};
-
-int pygrpc_add_channel(PyObject *module) {
- if (PyType_Ready(&pygrpc_ChannelType) < 0) {
- return -1;
- }
- if (PyModule_AddObject(module, "Channel", (PyObject *)&pygrpc_ChannelType) ==
- -1) {
- return -1;
- }
- return 0;
-}
diff --git a/src/python/src/grpc/_adapter/_client_credentials.c b/src/python/src/grpc/_adapter/_client_credentials.c
deleted file mode 100644
index e8ccff8d17..0000000000
--- a/src/python/src/grpc/_adapter/_client_credentials.c
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "grpc/_adapter/_client_credentials.h"
-
-#include <Python.h>
-#include <grpc/grpc_security.h>
-#include <grpc/support/alloc.h>
-
-static int pygrpc_client_credentials_init(ClientCredentials *self,
- PyObject *args, PyObject *kwds) {
- char *root_certificates;
- grpc_ssl_pem_key_cert_pair key_certificate_pair;
- static char *kwlist[] = {"root_certificates", "private_key",
- "certificate_chain", NULL};
-
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "zzz:ClientCredentials", kwlist,
- &root_certificates,
- &key_certificate_pair.private_key,
- &key_certificate_pair.cert_chain)) {
- return -1;
- }
-
- if (key_certificate_pair.private_key != NULL && key_certificate_pair.cert_chain != NULL) {
- self->c_client_credentials =
- grpc_ssl_credentials_create(root_certificates, &key_certificate_pair);
- } else {
- self->c_client_credentials =
- grpc_ssl_credentials_create(root_certificates, NULL);
- }
- return 0;
-}
-
-static void pygrpc_client_credentials_dealloc(ClientCredentials *self) {
- if (self->c_client_credentials != NULL) {
- grpc_credentials_release(self->c_client_credentials);
- }
- self->ob_type->tp_free((PyObject *)self);
-}
-
-PyTypeObject pygrpc_ClientCredentialsType = {
- PyVarObject_HEAD_INIT(NULL, 0)
- "_grpc.ClientCredencials", /*tp_name*/
- sizeof(ClientCredentials), /*tp_basicsize*/
- 0, /*tp_itemsize*/
- (destructor)pygrpc_client_credentials_dealloc, /*tp_dealloc*/
- 0, /*tp_print*/
- 0, /*tp_getattr*/
- 0, /*tp_setattr*/
- 0, /*tp_compare*/
- 0, /*tp_repr*/
- 0, /*tp_as_number*/
- 0, /*tp_as_sequence*/
- 0, /*tp_as_mapping*/
- 0, /*tp_hash */
- 0, /*tp_call*/
- 0, /*tp_str*/
- 0, /*tp_getattro*/
- 0, /*tp_setattro*/
- 0, /*tp_as_buffer*/
- Py_TPFLAGS_DEFAULT, /*tp_flags*/
- "Wrapping of grpc_credentials.", /* tp_doc */
- 0, /* tp_traverse */
- 0, /* tp_clear */
- 0, /* tp_richcompare */
- 0, /* tp_weaklistoffset */
- 0, /* tp_iter */
- 0, /* tp_iternext */
- 0, /* tp_methods */
- 0, /* tp_members */
- 0, /* tp_getset */
- 0, /* tp_base */
- 0, /* tp_dict */
- 0, /* tp_descr_get */
- 0, /* tp_descr_set */
- 0, /* tp_dictoffset */
- (initproc)pygrpc_client_credentials_init, /* tp_init */
- 0, /* tp_alloc */
- PyType_GenericNew, /* tp_new */
-};
-
-int pygrpc_add_client_credentials(PyObject *module) {
- if (PyType_Ready(&pygrpc_ClientCredentialsType) < 0) {
- return -1;
- }
- if (PyModule_AddObject(module, "ClientCredentials",
- (PyObject *)&pygrpc_ClientCredentialsType) == -1) {
- return -1;
- }
- return 0;
-}
diff --git a/src/python/src/grpc/_adapter/_completion_queue.c b/src/python/src/grpc/_adapter/_completion_queue.c
deleted file mode 100644
index 97828e67ad..0000000000
--- a/src/python/src/grpc/_adapter/_completion_queue.c
+++ /dev/null
@@ -1,653 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "grpc/_adapter/_completion_queue.h"
-
-#include <Python.h>
-#include <grpc/grpc.h>
-#include <grpc/support/alloc.h>
-
-#include "grpc/_adapter/_call.h"
-#include "grpc/_adapter/_tag.h"
-
-static PyObject *status_class;
-static PyObject *service_acceptance_class;
-static PyObject *event_class;
-
-static PyObject *ok_status_code;
-static PyObject *cancelled_status_code;
-static PyObject *unknown_status_code;
-static PyObject *invalid_argument_status_code;
-static PyObject *expired_status_code;
-static PyObject *not_found_status_code;
-static PyObject *already_exists_status_code;
-static PyObject *permission_denied_status_code;
-static PyObject *unauthenticated_status_code;
-static PyObject *resource_exhausted_status_code;
-static PyObject *failed_precondition_status_code;
-static PyObject *aborted_status_code;
-static PyObject *out_of_range_status_code;
-static PyObject *unimplemented_status_code;
-static PyObject *internal_error_status_code;
-static PyObject *unavailable_status_code;
-static PyObject *data_loss_status_code;
-
-static PyObject *stop_event_kind;
-static PyObject *write_event_kind;
-static PyObject *complete_event_kind;
-static PyObject *service_event_kind;
-static PyObject *read_event_kind;
-static PyObject *metadata_event_kind;
-static PyObject *finish_event_kind;
-
-static PyObject *pygrpc_as_py_time(gpr_timespec *timespec) {
- return PyFloat_FromDouble(
- timespec->tv_sec + ((double)timespec->tv_nsec) / 1.0E9);
-}
-
-static PyObject *pygrpc_status_code(grpc_status_code c_status_code) {
- switch (c_status_code) {
- case GRPC_STATUS_OK:
- return ok_status_code;
- case GRPC_STATUS_CANCELLED:
- return cancelled_status_code;
- case GRPC_STATUS_UNKNOWN:
- return unknown_status_code;
- case GRPC_STATUS_INVALID_ARGUMENT:
- return invalid_argument_status_code;
- case GRPC_STATUS_DEADLINE_EXCEEDED:
- return expired_status_code;
- case GRPC_STATUS_NOT_FOUND:
- return not_found_status_code;
- case GRPC_STATUS_ALREADY_EXISTS:
- return already_exists_status_code;
- case GRPC_STATUS_PERMISSION_DENIED:
- return permission_denied_status_code;
- case GRPC_STATUS_UNAUTHENTICATED:
- return unauthenticated_status_code;
- case GRPC_STATUS_RESOURCE_EXHAUSTED:
- return resource_exhausted_status_code;
- case GRPC_STATUS_FAILED_PRECONDITION:
- return failed_precondition_status_code;
- case GRPC_STATUS_ABORTED:
- return aborted_status_code;
- case GRPC_STATUS_OUT_OF_RANGE:
- return out_of_range_status_code;
- case GRPC_STATUS_UNIMPLEMENTED:
- return unimplemented_status_code;
- case GRPC_STATUS_INTERNAL:
- return internal_error_status_code;
- case GRPC_STATUS_UNAVAILABLE:
- return unavailable_status_code;
- case GRPC_STATUS_DATA_LOSS:
- return data_loss_status_code;
- default:
- return NULL;
- }
-}
-
-static PyObject *pygrpc_metadata_collection_get(
- grpc_metadata *metadata_elements, size_t count) {
- PyObject *metadata = PyList_New(count);
- size_t i;
- for (i = 0; i < count; ++i) {
- grpc_metadata elem = metadata_elements[i];
- PyObject *key = PyString_FromString(elem.key);
- PyObject *value = PyString_FromStringAndSize(elem.value, elem.value_length);
- PyObject* kvp = PyTuple_Pack(2, key, value);
- /* n.b. PyList_SetItem *steals* a reference to the set element. */
- PyList_SetItem(metadata, i, kvp);
- Py_DECREF(key);
- Py_DECREF(value);
- }
- return metadata;
-}
-
-static PyObject *pygrpc_stop_event_args(grpc_event *c_event) {
- return PyTuple_Pack(8, stop_event_kind, Py_None, Py_None, Py_None,
- Py_None, Py_None, Py_None, Py_None);
-}
-
-static PyObject *pygrpc_write_event_args(grpc_event *c_event) {
- pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag);
- PyObject *user_tag = tag->user_tag;
- PyObject *write_accepted = Py_True;
- return PyTuple_Pack(8, write_event_kind, user_tag,
- write_accepted, Py_None, Py_None, Py_None, Py_None,
- Py_None);
-}
-
-static PyObject *pygrpc_complete_event_args(grpc_event *c_event) {
- pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag);
- PyObject *user_tag = tag->user_tag;
- PyObject *complete_accepted = Py_True;
- return PyTuple_Pack(8, complete_event_kind, user_tag,
- Py_None, complete_accepted, Py_None, Py_None, Py_None,
- Py_None);
-}
-
-static PyObject *pygrpc_service_event_args(grpc_event *c_event) {
- pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag);
- PyObject *user_tag = tag->user_tag;
- if (tag->call->call_details.method == NULL) {
- return PyTuple_Pack(
- 8, service_event_kind, user_tag, Py_None, Py_None, Py_None, Py_None,
- Py_None, Py_None);
- } else {
- PyObject *method = NULL;
- PyObject *host = NULL;
- PyObject *service_deadline = NULL;
- PyObject *service_acceptance = NULL;
- PyObject *metadata = NULL;
- PyObject *event_args = NULL;
-
- method = PyBytes_FromString(tag->call->call_details.method);
- if (method == NULL) {
- goto error;
- }
- host = PyBytes_FromString(tag->call->call_details.host);
- if (host == NULL) {
- goto error;
- }
- service_deadline =
- pygrpc_as_py_time(&tag->call->call_details.deadline);
- if (service_deadline == NULL) {
- goto error;
- }
-
- service_acceptance =
- PyObject_CallFunctionObjArgs(service_acceptance_class, tag->call,
- method, host, service_deadline, NULL);
- if (service_acceptance == NULL) {
- goto error;
- }
-
- metadata = pygrpc_metadata_collection_get(
- tag->call->recv_metadata.metadata,
- tag->call->recv_metadata.count);
- event_args = PyTuple_Pack(8, service_event_kind,
- user_tag, Py_None, Py_None,
- service_acceptance, Py_None, Py_None,
- metadata);
-
- Py_DECREF(service_acceptance);
- Py_DECREF(metadata);
-error:
- Py_XDECREF(method);
- Py_XDECREF(host);
- Py_XDECREF(service_deadline);
-
- return event_args;
- }
-}
-
-static PyObject *pygrpc_read_event_args(grpc_event *c_event) {
- pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag);
- PyObject *user_tag = tag->user_tag;
- if (tag->call->recv_message == NULL) {
- return PyTuple_Pack(8, read_event_kind, user_tag,
- Py_None, Py_None, Py_None, Py_None, Py_None, Py_None);
- } else {
- size_t length;
- size_t offset;
- grpc_byte_buffer_reader *reader;
- gpr_slice slice;
- char *c_bytes;
- PyObject *bytes;
- PyObject *event_args;
-
- length = grpc_byte_buffer_length(tag->call->recv_message);
- reader = grpc_byte_buffer_reader_create(tag->call->recv_message);
- c_bytes = gpr_malloc(length);
- offset = 0;
- while (grpc_byte_buffer_reader_next(reader, &slice)) {
- memcpy(c_bytes + offset, GPR_SLICE_START_PTR(slice),
- GPR_SLICE_LENGTH(slice));
- offset += GPR_SLICE_LENGTH(slice);
- }
- grpc_byte_buffer_reader_destroy(reader);
- bytes = PyBytes_FromStringAndSize(c_bytes, length);
- gpr_free(c_bytes);
- if (bytes == NULL) {
- return NULL;
- }
- event_args = PyTuple_Pack(8, read_event_kind, user_tag,
- Py_None, Py_None, Py_None, bytes, Py_None,
- Py_None);
- Py_DECREF(bytes);
- return event_args;
- }
-}
-
-static PyObject *pygrpc_metadata_event_args(grpc_event *c_event) {
- pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag);
- PyObject *user_tag = tag->user_tag;
- PyObject *metadata = pygrpc_metadata_collection_get(
- tag->call->recv_metadata.metadata,
- tag->call->recv_metadata.count);
- PyObject* result = PyTuple_Pack(
- 8, metadata_event_kind, user_tag, Py_None, Py_None,
- Py_None, Py_None, Py_None, metadata);
- Py_DECREF(metadata);
- return result;
-}
-
-static PyObject *pygrpc_finished_server_event_args(grpc_event *c_event) {
- PyObject *code;
- PyObject *details;
- PyObject *status;
- PyObject *event_args;
- pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag);
- PyObject *user_tag = tag->user_tag;
-
- code = pygrpc_status_code(tag->call->cancelled ? GRPC_STATUS_CANCELLED : GRPC_STATUS_OK);
- if (code == NULL) {
- PyErr_SetString(PyExc_RuntimeError, "Unrecognized status code!");
- return NULL;
- }
- details = PyBytes_FromString("");
- if (details == NULL) {
- return NULL;
- }
- status = PyObject_CallFunctionObjArgs(status_class, code, details, NULL);
- Py_DECREF(details);
- if (status == NULL) {
- return NULL;
- }
- event_args = PyTuple_Pack(8, finish_event_kind, user_tag,
- Py_None, Py_None, Py_None, Py_None, status,
- Py_None);
- Py_DECREF(status);
- return event_args;
-}
-
-static PyObject *pygrpc_finished_client_event_args(grpc_event *c_event) {
- PyObject *code;
- PyObject *details;
- PyObject *status;
- PyObject *event_args;
- PyObject *metadata;
- pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag);
- PyObject *user_tag = tag->user_tag;
-
- code = pygrpc_status_code(tag->call->status);
- if (code == NULL) {
- PyErr_SetString(PyExc_RuntimeError, "Unrecognized status code!");
- return NULL;
- }
- if (tag->call->status_details == NULL) {
- details = PyBytes_FromString("");
- } else {
- details = PyBytes_FromString(tag->call->status_details);
- }
- if (details == NULL) {
- return NULL;
- }
- status = PyObject_CallFunctionObjArgs(status_class, code, details, NULL);
- Py_DECREF(details);
- if (status == NULL) {
- return NULL;
- }
- metadata = pygrpc_metadata_collection_get(
- tag->call->recv_trailing_metadata.metadata,
- tag->call->recv_trailing_metadata.count);
- event_args = PyTuple_Pack(8, finish_event_kind, user_tag,
- Py_None, Py_None, Py_None, Py_None, status,
- metadata);
- Py_DECREF(status);
- Py_DECREF(metadata);
- return event_args;
-}
-
-static int pygrpc_completion_queue_init(CompletionQueue *self, PyObject *args,
- PyObject *kwds) {
- static char *kwlist[] = {NULL};
- if (!PyArg_ParseTupleAndKeywords(args, kwds, ":CompletionQueue", kwlist)) {
- return -1;
- }
- self->c_completion_queue = grpc_completion_queue_create();
- return 0;
-}
-
-static void pygrpc_completion_queue_dealloc(CompletionQueue *self) {
- grpc_completion_queue_destroy(self->c_completion_queue);
- self->ob_type->tp_free((PyObject *)self);
-}
-
-static PyObject *pygrpc_completion_queue_get(CompletionQueue *self,
- PyObject *args) {
- PyObject *deadline;
- double double_deadline;
- gpr_timespec deadline_timespec;
- grpc_event c_event;
-
- PyObject *event_args;
- PyObject *event;
-
- pygrpc_tag *tag;
-
- if (!(PyArg_ParseTuple(args, "O:get", &deadline))) {
- return NULL;
- }
-
- if (deadline == Py_None) {
- deadline_timespec = gpr_inf_future;
- } else {
- double_deadline = PyFloat_AsDouble(deadline);
- if (PyErr_Occurred()) {
- return NULL;
- }
- deadline_timespec = gpr_time_from_nanos((long)(double_deadline * 1.0E9));
- }
-
- /* TODO(nathaniel): Suppress clang-format in this block and remove the
- unnecessary and unPythonic semicolons trailing the _ALLOW_THREADS macros.
- (Right now clang-format only understands //-demarcated suppressions.) */
- Py_BEGIN_ALLOW_THREADS;
- c_event =
- grpc_completion_queue_next(self->c_completion_queue, deadline_timespec);
- Py_END_ALLOW_THREADS;
-
- tag = (pygrpc_tag *)c_event.tag;
-
- switch (c_event.type) {
- case GRPC_QUEUE_TIMEOUT:
- Py_RETURN_NONE;
- break;
- case GRPC_QUEUE_SHUTDOWN:
- event_args = pygrpc_stop_event_args(&c_event);
- break;
- case GRPC_OP_COMPLETE: {
- if (!tag) {
- PyErr_SetString(PyExc_Exception, "Unrecognized event type!");
- return NULL;
- }
- switch (tag->type) {
- case PYGRPC_INITIAL_METADATA:
- if (tag) {
- pygrpc_tag_destroy(tag);
- }
- return pygrpc_completion_queue_get(self, args);
- case PYGRPC_WRITE_ACCEPTED:
- event_args = pygrpc_write_event_args(&c_event);
- break;
- case PYGRPC_FINISH_ACCEPTED:
- event_args = pygrpc_complete_event_args(&c_event);
- break;
- case PYGRPC_SERVER_RPC_NEW:
- event_args = pygrpc_service_event_args(&c_event);
- break;
- case PYGRPC_READ:
- event_args = pygrpc_read_event_args(&c_event);
- break;
- case PYGRPC_CLIENT_METADATA_READ:
- event_args = pygrpc_metadata_event_args(&c_event);
- break;
- case PYGRPC_FINISHED_CLIENT:
- event_args = pygrpc_finished_client_event_args(&c_event);
- break;
- case PYGRPC_FINISHED_SERVER:
- event_args = pygrpc_finished_server_event_args(&c_event);
- break;
- default:
- PyErr_SetString(PyExc_Exception, "Unrecognized op event type!");
- return NULL;
- }
- break;
- }
- default:
- PyErr_SetString(PyExc_Exception, "Unrecognized event type!");
- return NULL;
- }
-
- if (event_args == NULL) {
- return NULL;
- }
-
- event = PyObject_CallObject(event_class, event_args);
-
- Py_DECREF(event_args);
- if (tag) {
- pygrpc_tag_destroy(tag);
- }
-
- return event;
-}
-
-static PyObject *pygrpc_completion_queue_stop(CompletionQueue *self) {
- grpc_completion_queue_shutdown(self->c_completion_queue);
-
- Py_RETURN_NONE;
-}
-
-static PyMethodDef methods[] = {
- {"get", (PyCFunction)pygrpc_completion_queue_get, METH_VARARGS,
- "Get the next event."},
- {"stop", (PyCFunction)pygrpc_completion_queue_stop, METH_NOARGS,
- "Stop this completion queue."},
- {NULL}};
-
-PyTypeObject pygrpc_CompletionQueueType = {
- PyVarObject_HEAD_INIT(NULL, 0)
- "_gprc.CompletionQueue", /*tp_name*/
- sizeof(CompletionQueue), /*tp_basicsize*/
- 0, /*tp_itemsize*/
- (destructor)pygrpc_completion_queue_dealloc, /*tp_dealloc*/
- 0, /*tp_print*/
- 0, /*tp_getattr*/
- 0, /*tp_setattr*/
- 0, /*tp_compare*/
- 0, /*tp_repr*/
- 0, /*tp_as_number*/
- 0, /*tp_as_sequence*/
- 0, /*tp_as_mapping*/
- 0, /*tp_hash */
- 0, /*tp_call*/
- 0, /*tp_str*/
- 0, /*tp_getattro*/
- 0, /*tp_setattro*/
- 0, /*tp_as_buffer*/
- Py_TPFLAGS_DEFAULT, /*tp_flags*/
- "Wrapping of grpc_completion_queue.", /* tp_doc */
- 0, /* tp_traverse */
- 0, /* tp_clear */
- 0, /* tp_richcompare */
- 0, /* tp_weaklistoffset */
- 0, /* tp_iter */
- 0, /* tp_iternext */
- methods, /* tp_methods */
- 0, /* tp_members */
- 0, /* tp_getset */
- 0, /* tp_base */
- 0, /* tp_dict */
- 0, /* tp_descr_get */
- 0, /* tp_descr_set */
- 0, /* tp_dictoffset */
- (initproc)pygrpc_completion_queue_init, /* tp_init */
- 0, /* tp_alloc */
- PyType_GenericNew, /* tp_new */
-};
-
-static int pygrpc_get_status_codes(PyObject *datatypes_module) {
- PyObject *code_class = PyObject_GetAttrString(datatypes_module, "Code");
- if (code_class == NULL) {
- return -1;
- }
- ok_status_code = PyObject_GetAttrString(code_class, "OK");
- if (ok_status_code == NULL) {
- return -1;
- }
- cancelled_status_code = PyObject_GetAttrString(code_class, "CANCELLED");
- if (cancelled_status_code == NULL) {
- return -1;
- }
- unknown_status_code = PyObject_GetAttrString(code_class, "UNKNOWN");
- if (unknown_status_code == NULL) {
- return -1;
- }
- invalid_argument_status_code =
- PyObject_GetAttrString(code_class, "INVALID_ARGUMENT");
- if (invalid_argument_status_code == NULL) {
- return -1;
- }
- expired_status_code = PyObject_GetAttrString(code_class, "EXPIRED");
- if (expired_status_code == NULL) {
- return -1;
- }
- not_found_status_code = PyObject_GetAttrString(code_class, "NOT_FOUND");
- if (not_found_status_code == NULL) {
- return -1;
- }
- already_exists_status_code =
- PyObject_GetAttrString(code_class, "ALREADY_EXISTS");
- if (already_exists_status_code == NULL) {
- return -1;
- }
- permission_denied_status_code =
- PyObject_GetAttrString(code_class, "PERMISSION_DENIED");
- if (permission_denied_status_code == NULL) {
- return -1;
- }
- unauthenticated_status_code =
- PyObject_GetAttrString(code_class, "UNAUTHENTICATED");
- if (unauthenticated_status_code == NULL) {
- return -1;
- }
- resource_exhausted_status_code =
- PyObject_GetAttrString(code_class, "RESOURCE_EXHAUSTED");
- if (resource_exhausted_status_code == NULL) {
- return -1;
- }
- failed_precondition_status_code =
- PyObject_GetAttrString(code_class, "FAILED_PRECONDITION");
- if (failed_precondition_status_code == NULL) {
- return -1;
- }
- aborted_status_code = PyObject_GetAttrString(code_class, "ABORTED");
- if (aborted_status_code == NULL) {
- return -1;
- }
- out_of_range_status_code = PyObject_GetAttrString(code_class, "OUT_OF_RANGE");
- if (out_of_range_status_code == NULL) {
- return -1;
- }
- unimplemented_status_code =
- PyObject_GetAttrString(code_class, "UNIMPLEMENTED");
- if (unimplemented_status_code == NULL) {
- return -1;
- }
- internal_error_status_code =
- PyObject_GetAttrString(code_class, "INTERNAL_ERROR");
- if (internal_error_status_code == NULL) {
- return -1;
- }
- unavailable_status_code = PyObject_GetAttrString(code_class, "UNAVAILABLE");
- if (unavailable_status_code == NULL) {
- return -1;
- }
- data_loss_status_code = PyObject_GetAttrString(code_class, "DATA_LOSS");
- if (data_loss_status_code == NULL) {
- return -1;
- }
- Py_DECREF(code_class);
- return 0;
-}
-
-static int pygrpc_get_event_kinds(PyObject *event_class) {
- PyObject *kind_class = PyObject_GetAttrString(event_class, "Kind");
- if (kind_class == NULL) {
- return -1;
- }
- stop_event_kind = PyObject_GetAttrString(kind_class, "STOP");
- if (stop_event_kind == NULL) {
- return -1;
- }
- write_event_kind = PyObject_GetAttrString(kind_class, "WRITE_ACCEPTED");
- if (write_event_kind == NULL) {
- return -1;
- }
- complete_event_kind = PyObject_GetAttrString(kind_class, "COMPLETE_ACCEPTED");
- if (complete_event_kind == NULL) {
- return -1;
- }
- service_event_kind = PyObject_GetAttrString(kind_class, "SERVICE_ACCEPTED");
- if (service_event_kind == NULL) {
- return -1;
- }
- read_event_kind = PyObject_GetAttrString(kind_class, "READ_ACCEPTED");
- if (read_event_kind == NULL) {
- return -1;
- }
- metadata_event_kind = PyObject_GetAttrString(kind_class, "METADATA_ACCEPTED");
- if (metadata_event_kind == NULL) {
- return -1;
- }
- finish_event_kind = PyObject_GetAttrString(kind_class, "FINISH");
- if (finish_event_kind == NULL) {
- return -1;
- }
- Py_DECREF(kind_class);
- return 0;
-}
-
-int pygrpc_add_completion_queue(PyObject *module) {
- char *datatypes_module_path = "grpc._adapter._datatypes";
- PyObject *datatypes_module = PyImport_ImportModule(datatypes_module_path);
- if (datatypes_module == NULL) {
- return -1;
- }
- status_class = PyObject_GetAttrString(datatypes_module, "Status");
- service_acceptance_class =
- PyObject_GetAttrString(datatypes_module, "ServiceAcceptance");
- event_class = PyObject_GetAttrString(datatypes_module, "Event");
- if (status_class == NULL || service_acceptance_class == NULL ||
- event_class == NULL) {
- return -1;
- }
- if (pygrpc_get_status_codes(datatypes_module) == -1) {
- return -1;
- }
- if (pygrpc_get_event_kinds(event_class) == -1) {
- return -1;
- }
- Py_DECREF(datatypes_module);
-
- if (PyType_Ready(&pygrpc_CompletionQueueType) < 0) {
- return -1;
- }
- if (PyModule_AddObject(module, "CompletionQueue",
- (PyObject *)&pygrpc_CompletionQueueType) == -1) {
- return -1;
- }
- return 0;
-}
diff --git a/src/python/src/grpc/_adapter/_datatypes.py b/src/python/src/grpc/_adapter/_datatypes.py
deleted file mode 100644
index 3b22784243..0000000000
--- a/src/python/src/grpc/_adapter/_datatypes.py
+++ /dev/null
@@ -1,86 +0,0 @@
-# Copyright 2015, Google Inc.
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above
-# copyright notice, this list of conditions and the following disclaimer
-# in the documentation and/or other materials provided with the
-# distribution.
-# * Neither the name of Google Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-"""Datatypes passed between Python and C code."""
-
-import collections
-import enum
-
-
-@enum.unique
-class Code(enum.IntEnum):
- """One Platform error codes (see status.h and codes.proto)."""
-
- OK = 0
- CANCELLED = 1
- UNKNOWN = 2
- INVALID_ARGUMENT = 3
- EXPIRED = 4
- NOT_FOUND = 5
- ALREADY_EXISTS = 6
- PERMISSION_DENIED = 7
- UNAUTHENTICATED = 16
- RESOURCE_EXHAUSTED = 8
- FAILED_PRECONDITION = 9
- ABORTED = 10
- OUT_OF_RANGE = 11
- UNIMPLEMENTED = 12
- INTERNAL_ERROR = 13
- UNAVAILABLE = 14
- DATA_LOSS = 15
-
-
-class Status(collections.namedtuple('Status', ['code', 'details'])):
- """Describes an RPC's overall status."""
-
-
-class ServiceAcceptance(
- collections.namedtuple(
- 'ServiceAcceptance', ['call', 'method', 'host', 'deadline'])):
- """Describes an RPC on the service side at the start of service."""
-
-
-class Event(
- collections.namedtuple(
- 'Event',
- ['kind', 'tag', 'write_accepted', 'complete_accepted',
- 'service_acceptance', 'bytes', 'status', 'metadata'])):
- """Describes an event emitted from a completion queue."""
-
- @enum.unique
- class Kind(enum.Enum):
- """Describes the kind of an event."""
-
- STOP = object()
- WRITE_ACCEPTED = object()
- COMPLETE_ACCEPTED = object()
- SERVICE_ACCEPTED = object()
- READ_ACCEPTED = object()
- METADATA_ACCEPTED = object()
- FINISH = object()
diff --git a/src/python/src/grpc/_adapter/_error.c b/src/python/src/grpc/_adapter/_error.c
deleted file mode 100644
index a8a1dbc1bb..0000000000
--- a/src/python/src/grpc/_adapter/_error.c
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "grpc/_adapter/_error.h"
-
-#include <Python.h>
-#include <grpc/grpc.h>
-
-const PyObject *pygrpc_translate_call_error(grpc_call_error call_error) {
- switch (call_error) {
- case GRPC_CALL_OK:
- Py_RETURN_NONE;
- case GRPC_CALL_ERROR:
- PyErr_SetString(PyExc_Exception, "Defect: unknown defect!");
- return NULL;
- case GRPC_CALL_ERROR_NOT_ON_SERVER:
- PyErr_SetString(PyExc_Exception,
- "Defect: client-only method called on server!");
- return NULL;
- case GRPC_CALL_ERROR_NOT_ON_CLIENT:
- PyErr_SetString(PyExc_Exception,
- "Defect: server-only method called on client!");
- return NULL;
- case GRPC_CALL_ERROR_ALREADY_ACCEPTED:
- PyErr_SetString(PyExc_Exception,
- "Defect: attempted to accept already-accepted call!");
- return NULL;
- case GRPC_CALL_ERROR_ALREADY_INVOKED:
- PyErr_SetString(PyExc_Exception,
- "Defect: attempted to invoke already-invoked call!");
- return NULL;
- case GRPC_CALL_ERROR_NOT_INVOKED:
- PyErr_SetString(PyExc_Exception, "Defect: Call not yet invoked!");
- return NULL;
- case GRPC_CALL_ERROR_ALREADY_FINISHED:
- PyErr_SetString(PyExc_Exception, "Defect: Call already finished!");
- return NULL;
- case GRPC_CALL_ERROR_TOO_MANY_OPERATIONS:
- PyErr_SetString(PyExc_Exception,
- "Defect: Attempted extra read or extra write on call!");
- return NULL;
- case GRPC_CALL_ERROR_INVALID_FLAGS:
- PyErr_SetString(PyExc_Exception, "Defect: invalid flags!");
- return NULL;
- default:
- PyErr_SetString(PyExc_Exception, "Defect: Unknown call error!");
- return NULL;
- }
-}
diff --git a/src/python/src/grpc/_adapter/_intermediary_low.py b/src/python/src/grpc/_adapter/_intermediary_low.py
new file mode 100644
index 0000000000..a6e325c4e5
--- /dev/null
+++ b/src/python/src/grpc/_adapter/_intermediary_low.py
@@ -0,0 +1,258 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Temporary old _low-like layer.
+
+Eases refactoring burden while we overhaul the Python framework.
+
+Plan:
+ The layers used to look like:
+ ... # outside _adapter
+ fore.py + rear.py # visible outside _adapter
+ _low
+ _c
+ The layers currently look like:
+ ... # outside _adapter
+ fore.py + rear.py # visible outside _adapter
+ _low_intermediary # adapter for new '_low' to old '_low'
+ _low # new '_low'
+ _c # new '_c'
+ We will later remove _low_intermediary after refactoring of fore.py and
+ rear.py according to the ticket system refactoring and get:
+ ... # outside _adapter, refactored
+ fore.py + rear.py # visible outside _adapter, refactored
+ _low # new '_low'
+ _c # new '_c'
+"""
+
+import collections
+import enum
+
+from grpc._adapter import _low
+from grpc._adapter import _types
+
+_IGNORE_ME_TAG = object()
+Code = _types.StatusCode
+
+
+class Status(collections.namedtuple('Status', ['code', 'details'])):
+ """Describes an RPC's overall status."""
+
+
+class ServiceAcceptance(
+ collections.namedtuple(
+ 'ServiceAcceptance', ['call', 'method', 'host', 'deadline'])):
+ """Describes an RPC on the service side at the start of service."""
+
+
+class Event(
+ collections.namedtuple(
+ 'Event',
+ ['kind', 'tag', 'write_accepted', 'complete_accepted',
+ 'service_acceptance', 'bytes', 'status', 'metadata'])):
+ """Describes an event emitted from a completion queue."""
+
+ @enum.unique
+ class Kind(enum.Enum):
+ """Describes the kind of an event."""
+
+ STOP = object()
+ WRITE_ACCEPTED = object()
+ COMPLETE_ACCEPTED = object()
+ SERVICE_ACCEPTED = object()
+ READ_ACCEPTED = object()
+ METADATA_ACCEPTED = object()
+ FINISH = object()
+
+
+class _TagAdapter(collections.namedtuple('_TagAdapter', [
+ 'user_tag',
+ 'kind'
+ ])):
+ pass
+
+
+class Call(object):
+ """Adapter from old _low.Call interface to new _low.Call."""
+
+ def __init__(self, channel, completion_queue, method, host, deadline):
+ self._internal = channel._internal.create_call(
+ completion_queue._internal, method, host, deadline)
+ self._metadata = []
+
+ @staticmethod
+ def _from_internal(internal):
+ call = Call.__new__(Call)
+ call._internal = internal
+ call._metadata = []
+ return call
+
+ def invoke(self, completion_queue, metadata_tag, finish_tag):
+ err0 = self._internal.start_batch([
+ _types.OpArgs.send_initial_metadata(self._metadata)
+ ], _IGNORE_ME_TAG)
+ err1 = self._internal.start_batch([
+ _types.OpArgs.recv_initial_metadata()
+ ], _TagAdapter(metadata_tag, Event.Kind.METADATA_ACCEPTED))
+ err2 = self._internal.start_batch([
+ _types.OpArgs.recv_status_on_client()
+ ], _TagAdapter(finish_tag, Event.Kind.FINISH))
+ return err0 if err0 != _types.CallError.OK else err1 if err1 != _types.CallError.OK else err2 if err2 != _types.CallError.OK else _types.CallError.OK
+
+ def write(self, message, tag):
+ return self._internal.start_batch([
+ _types.OpArgs.send_message(message)
+ ], _TagAdapter(tag, Event.Kind.WRITE_ACCEPTED))
+
+ def complete(self, tag):
+ return self._internal.start_batch([
+ _types.OpArgs.send_close_from_client()
+ ], _TagAdapter(tag, Event.Kind.COMPLETE_ACCEPTED))
+
+ def accept(self, completion_queue, tag):
+ return self._internal.start_batch([
+ _types.OpArgs.recv_close_on_server()
+ ], _TagAdapter(tag, Event.Kind.FINISH))
+
+ def add_metadata(self, key, value):
+ self._metadata.append((key, value))
+
+ def premetadata(self):
+ return self._internal.start_batch([
+ _types.OpArgs.send_initial_metadata(self._metadata)
+ ], _IGNORE_ME_TAG)
+ self._metadata = []
+
+ def read(self, tag):
+ return self._internal.start_batch([
+ _types.OpArgs.recv_message()
+ ], _TagAdapter(tag, Event.Kind.READ_ACCEPTED))
+
+ def status(self, status, tag):
+ return self._internal.start_batch([
+ _types.OpArgs.send_status_from_server(self._metadata, status.code, status.details)
+ ], _TagAdapter(tag, Event.Kind.COMPLETE_ACCEPTED))
+
+ def cancel(self):
+ return self._internal.cancel()
+
+
+class Channel(object):
+ """Adapter from old _low.Channel interface to new _low.Channel."""
+
+ def __init__(self, hostport, client_credentials, server_host_override=None):
+ args = []
+ if server_host_override:
+ args.append((_types.GrpcChannelArgumentKeys.SSL_TARGET_NAME_OVERRIDE.value, server_host_override))
+ creds = None
+ if client_credentials:
+ creds = client_credentials._internal
+ self._internal = _low.Channel(hostport, args, creds)
+
+
+class CompletionQueue(object):
+ """Adapter from old _low.CompletionQueue interface to new _low.CompletionQueue."""
+
+ def __init__(self):
+ self._internal = _low.CompletionQueue()
+
+ def get(self, deadline=None):
+ if deadline is None:
+ ev = self._internal.next()
+ else:
+ ev = self._internal.next(deadline)
+ if ev is None:
+ return None
+ elif ev.tag is _IGNORE_ME_TAG:
+ return self.get(deadline)
+ elif ev.type == _types.EventType.QUEUE_SHUTDOWN:
+ kind = Event.Kind.STOP
+ tag = None
+ write_accepted = None
+ complete_accepted = None
+ service_acceptance = None
+ message_bytes = None
+ status = None
+ metadata = None
+ elif ev.type == _types.EventType.OP_COMPLETE:
+ kind = ev.tag.kind
+ tag = ev.tag.user_tag
+ write_accepted = ev.success if kind == Event.Kind.WRITE_ACCEPTED else None
+ complete_accepted = ev.success if kind == Event.Kind.COMPLETE_ACCEPTED else None
+ service_acceptance = ServiceAcceptance(Call._from_internal(ev.call), ev.call_details.method, ev.call_details.host, ev.call_details.deadline) if kind == Event.Kind.SERVICE_ACCEPTED else None
+ message_bytes = ev.results[0].message if kind == Event.Kind.READ_ACCEPTED else None
+ status = Status(ev.results[0].status.code, ev.results[0].status.details) if (kind == Event.Kind.FINISH and ev.results[0].status) else Status(_types.StatusCode.CANCELLED if ev.results[0].cancelled else _types.StatusCode.OK, '') if ev.results[0].cancelled is not None else None
+ metadata = ev.results[0].initial_metadata if (kind in [Event.Kind.SERVICE_ACCEPTED, Event.Kind.METADATA_ACCEPTED]) else (ev.results[0].trailing_metadata if kind == Event.Kind.FINISH else None)
+ else:
+ raise RuntimeError('unknown event')
+ result_ev = Event(kind=kind, tag=tag, write_accepted=write_accepted, complete_accepted=complete_accepted, service_acceptance=service_acceptance, bytes=message_bytes, status=status, metadata=metadata)
+ return result_ev
+
+ def stop(self):
+ self._internal.shutdown()
+
+
+class Server(object):
+ """Adapter from old _low.Server interface to new _low.Server."""
+
+ def __init__(self, completion_queue):
+ self._internal = _low.Server(completion_queue._internal, [])
+ self._internal_cq = completion_queue._internal
+
+ def add_http2_addr(self, addr):
+ return self._internal.add_http2_port(addr)
+
+ def add_secure_http2_addr(self, addr, server_credentials):
+ if server_credentials is None:
+ return self._internal.add_http2_port(addr, None)
+ else:
+ return self._internal.add_http2_port(addr, server_credentials._internal)
+
+ def start(self):
+ return self._internal.start()
+
+ def service(self, tag):
+ return self._internal.request_call(self._internal_cq, _TagAdapter(tag, Event.Kind.SERVICE_ACCEPTED))
+
+ def stop(self):
+ return self._internal.shutdown()
+
+
+class ClientCredentials(object):
+ """Adapter from old _low.ClientCredentials interface to new _low.ClientCredentials."""
+
+ def __init__(self, root_certificates, private_key, certificate_chain):
+ self._internal = _low.ClientCredentials.ssl(root_certificates, private_key, certificate_chain)
+
+
+class ServerCredentials(object):
+ """Adapter from old _low.ServerCredentials interface to new _low.ServerCredentials."""
+
+ def __init__(self, root_credentials, pair_sequence):
+ self._internal = _low.ServerCredentials.ssl(root_credentials, list(pair_sequence))
diff --git a/src/python/src/grpc/_adapter/_intermediary_low_test.py b/src/python/src/grpc/_adapter/_intermediary_low_test.py
new file mode 100644
index 0000000000..6ff51c43a6
--- /dev/null
+++ b/src/python/src/grpc/_adapter/_intermediary_low_test.py
@@ -0,0 +1,421 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+"""Tests for the old '_low'."""
+
+import time
+import unittest
+
+from grpc._adapter import _intermediary_low as _low
+
+_STREAM_LENGTH = 300
+_TIMEOUT = 5
+_AFTER_DELAY = 2
+_FUTURE = time.time() + 60 * 60 * 24
+_BYTE_SEQUENCE = b'\abcdefghijklmnopqrstuvwxyz0123456789' * 200
+_BYTE_SEQUENCE_SEQUENCE = tuple(
+ bytes(bytearray((row + column) % 256 for column in range(row)))
+ for row in range(_STREAM_LENGTH))
+
+class LonelyClientTest(unittest.TestCase):
+
+ def testLonelyClient(self):
+ host = 'nosuchhostexists'
+ port = 54321
+ method = 'test method'
+ deadline = time.time() + _TIMEOUT
+ after_deadline = deadline + _AFTER_DELAY
+ metadata_tag = object()
+ finish_tag = object()
+
+ completion_queue = _low.CompletionQueue()
+ channel = _low.Channel('%s:%d' % (host, port), None)
+ client_call = _low.Call(channel, completion_queue, method, host, deadline)
+
+ client_call.invoke(completion_queue, metadata_tag, finish_tag)
+ first_event = completion_queue.get(after_deadline)
+ self.assertIsNotNone(first_event)
+ second_event = completion_queue.get(after_deadline)
+ self.assertIsNotNone(second_event)
+ kinds = [event.kind for event in (first_event, second_event)]
+ self.assertItemsEqual(
+ (_low.Event.Kind.METADATA_ACCEPTED, _low.Event.Kind.FINISH),
+ kinds)
+
+ self.assertIsNone(completion_queue.get(after_deadline))
+
+ completion_queue.stop()
+ stop_event = completion_queue.get(_FUTURE)
+ self.assertEqual(_low.Event.Kind.STOP, stop_event.kind)
+
+ del client_call
+ del channel
+ del completion_queue
+
+
+class EchoTest(unittest.TestCase):
+
+ def setUp(self):
+ self.host = 'localhost'
+
+ self.server_completion_queue = _low.CompletionQueue()
+ self.server = _low.Server(self.server_completion_queue)
+ port = self.server.add_http2_addr('[::]:0')
+ self.server.start()
+
+ self.client_completion_queue = _low.CompletionQueue()
+ self.channel = _low.Channel('%s:%d' % (self.host, port), None)
+
+ def tearDown(self):
+ self.server.stop()
+ # NOTE(nathaniel): Yep, this is weird; it's a consequence of
+ # grpc_server_destroy's being what has the effect of telling the server's
+ # completion queue to pump out all pending events/tags immediately rather
+ # than gracefully completing all outstanding RPCs while accepting no new
+ # ones.
+ # TODO(nathaniel): Deallocation of a Python object shouldn't have this kind
+ # of observable side effect let alone such an important one.
+ del self.server
+ self.server_completion_queue.stop()
+ self.client_completion_queue.stop()
+ while True:
+ event = self.server_completion_queue.get(_FUTURE)
+ if event is not None and event.kind is _low.Event.Kind.STOP:
+ break
+ while True:
+ event = self.client_completion_queue.get(_FUTURE)
+ if event is not None and event.kind is _low.Event.Kind.STOP:
+ break
+ self.server_completion_queue = None
+ self.client_completion_queue = None
+
+ def _perform_echo_test(self, test_data):
+ method = 'test method'
+ details = 'test details'
+ server_leading_metadata_key = 'my_server_leading_key'
+ server_leading_metadata_value = 'my_server_leading_value'
+ server_trailing_metadata_key = 'my_server_trailing_key'
+ server_trailing_metadata_value = 'my_server_trailing_value'
+ client_metadata_key = 'my_client_key'
+ client_metadata_value = 'my_client_value'
+ server_leading_binary_metadata_key = 'my_server_leading_key-bin'
+ server_leading_binary_metadata_value = b'\0'*2047
+ server_trailing_binary_metadata_key = 'my_server_trailing_key-bin'
+ server_trailing_binary_metadata_value = b'\0'*2047
+ client_binary_metadata_key = 'my_client_key-bin'
+ client_binary_metadata_value = b'\0'*2047
+ deadline = _FUTURE
+ metadata_tag = object()
+ finish_tag = object()
+ write_tag = object()
+ complete_tag = object()
+ service_tag = object()
+ read_tag = object()
+ status_tag = object()
+
+ server_data = []
+ client_data = []
+
+ client_call = _low.Call(self.channel, self.client_completion_queue,
+ method, self.host, deadline)
+ client_call.add_metadata(client_metadata_key, client_metadata_value)
+ client_call.add_metadata(client_binary_metadata_key,
+ client_binary_metadata_value)
+
+ client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag)
+
+ self.server.service(service_tag)
+ service_accepted = self.server_completion_queue.get(_FUTURE)
+ self.assertIsNotNone(service_accepted)
+ self.assertIs(service_accepted.kind, _low.Event.Kind.SERVICE_ACCEPTED)
+ self.assertIs(service_accepted.tag, service_tag)
+ self.assertEqual(method, service_accepted.service_acceptance.method)
+ self.assertEqual(self.host, service_accepted.service_acceptance.host)
+ self.assertIsNotNone(service_accepted.service_acceptance.call)
+ metadata = dict(service_accepted.metadata)
+ self.assertIn(client_metadata_key, metadata)
+ self.assertEqual(client_metadata_value, metadata[client_metadata_key])
+ self.assertIn(client_binary_metadata_key, metadata)
+ self.assertEqual(client_binary_metadata_value,
+ metadata[client_binary_metadata_key])
+ server_call = service_accepted.service_acceptance.call
+ server_call.accept(self.server_completion_queue, finish_tag)
+ server_call.add_metadata(server_leading_metadata_key,
+ server_leading_metadata_value)
+ server_call.add_metadata(server_leading_binary_metadata_key,
+ server_leading_binary_metadata_value)
+ server_call.premetadata()
+
+ metadata_accepted = self.client_completion_queue.get(_FUTURE)
+ self.assertIsNotNone(metadata_accepted)
+ self.assertEqual(_low.Event.Kind.METADATA_ACCEPTED, metadata_accepted.kind)
+ self.assertEqual(metadata_tag, metadata_accepted.tag)
+ metadata = dict(metadata_accepted.metadata)
+ self.assertIn(server_leading_metadata_key, metadata)
+ self.assertEqual(server_leading_metadata_value,
+ metadata[server_leading_metadata_key])
+ self.assertIn(server_leading_binary_metadata_key, metadata)
+ self.assertEqual(server_leading_binary_metadata_value,
+ metadata[server_leading_binary_metadata_key])
+
+ for datum in test_data:
+ client_call.write(datum, write_tag)
+ write_accepted = self.client_completion_queue.get(_FUTURE)
+ self.assertIsNotNone(write_accepted)
+ self.assertIs(write_accepted.kind, _low.Event.Kind.WRITE_ACCEPTED)
+ self.assertIs(write_accepted.tag, write_tag)
+ self.assertIs(write_accepted.write_accepted, True)
+
+ server_call.read(read_tag)
+ read_accepted = self.server_completion_queue.get(_FUTURE)
+ self.assertIsNotNone(read_accepted)
+ self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
+ self.assertEqual(read_tag, read_accepted.tag)
+ self.assertIsNotNone(read_accepted.bytes)
+ server_data.append(read_accepted.bytes)
+
+ server_call.write(read_accepted.bytes, write_tag)
+ write_accepted = self.server_completion_queue.get(_FUTURE)
+ self.assertIsNotNone(write_accepted)
+ self.assertEqual(_low.Event.Kind.WRITE_ACCEPTED, write_accepted.kind)
+ self.assertEqual(write_tag, write_accepted.tag)
+ self.assertTrue(write_accepted.write_accepted)
+
+ client_call.read(read_tag)
+ read_accepted = self.client_completion_queue.get(_FUTURE)
+ self.assertIsNotNone(read_accepted)
+ self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
+ self.assertEqual(read_tag, read_accepted.tag)
+ self.assertIsNotNone(read_accepted.bytes)
+ client_data.append(read_accepted.bytes)
+
+ client_call.complete(complete_tag)
+ complete_accepted = self.client_completion_queue.get(_FUTURE)
+ self.assertIsNotNone(complete_accepted)
+ self.assertIs(complete_accepted.kind, _low.Event.Kind.COMPLETE_ACCEPTED)
+ self.assertIs(complete_accepted.tag, complete_tag)
+ self.assertIs(complete_accepted.complete_accepted, True)
+
+ server_call.read(read_tag)
+ read_accepted = self.server_completion_queue.get(_FUTURE)
+ self.assertIsNotNone(read_accepted)
+ self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
+ self.assertEqual(read_tag, read_accepted.tag)
+ self.assertIsNone(read_accepted.bytes)
+
+ server_call.add_metadata(server_trailing_metadata_key,
+ server_trailing_metadata_value)
+ server_call.add_metadata(server_trailing_binary_metadata_key,
+ server_trailing_binary_metadata_value)
+
+ server_call.status(_low.Status(_low.Code.OK, details), status_tag)
+ server_terminal_event_one = self.server_completion_queue.get(_FUTURE)
+ server_terminal_event_two = self.server_completion_queue.get(_FUTURE)
+ if server_terminal_event_one.kind == _low.Event.Kind.COMPLETE_ACCEPTED:
+ status_accepted = server_terminal_event_one
+ rpc_accepted = server_terminal_event_two
+ else:
+ status_accepted = server_terminal_event_two
+ rpc_accepted = server_terminal_event_one
+ self.assertIsNotNone(status_accepted)
+ self.assertIsNotNone(rpc_accepted)
+ self.assertEqual(_low.Event.Kind.COMPLETE_ACCEPTED, status_accepted.kind)
+ self.assertEqual(status_tag, status_accepted.tag)
+ self.assertTrue(status_accepted.complete_accepted)
+ self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind)
+ self.assertEqual(finish_tag, rpc_accepted.tag)
+ self.assertEqual(_low.Status(_low.Code.OK, ''), rpc_accepted.status)
+
+ client_call.read(read_tag)
+ client_terminal_event_one = self.client_completion_queue.get(_FUTURE)
+ client_terminal_event_two = self.client_completion_queue.get(_FUTURE)
+ if client_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED:
+ read_accepted = client_terminal_event_one
+ finish_accepted = client_terminal_event_two
+ else:
+ read_accepted = client_terminal_event_two
+ finish_accepted = client_terminal_event_one
+ self.assertIsNotNone(read_accepted)
+ self.assertIsNotNone(finish_accepted)
+ self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
+ self.assertEqual(read_tag, read_accepted.tag)
+ self.assertIsNone(read_accepted.bytes)
+ self.assertEqual(_low.Event.Kind.FINISH, finish_accepted.kind)
+ self.assertEqual(finish_tag, finish_accepted.tag)
+ self.assertEqual(_low.Status(_low.Code.OK, details), finish_accepted.status)
+ metadata = dict(finish_accepted.metadata)
+ self.assertIn(server_trailing_metadata_key, metadata)
+ self.assertEqual(server_trailing_metadata_value,
+ metadata[server_trailing_metadata_key])
+ self.assertIn(server_trailing_binary_metadata_key, metadata)
+ self.assertEqual(server_trailing_binary_metadata_value,
+ metadata[server_trailing_binary_metadata_key])
+
+ server_timeout_none_event = self.server_completion_queue.get(0)
+ self.assertIsNone(server_timeout_none_event)
+ client_timeout_none_event = self.client_completion_queue.get(0)
+ self.assertIsNone(client_timeout_none_event)
+
+ self.assertSequenceEqual(test_data, server_data)
+ self.assertSequenceEqual(test_data, client_data)
+
+ def testNoEcho(self):
+ self._perform_echo_test(())
+
+ def testOneByteEcho(self):
+ self._perform_echo_test([b'\x07'])
+
+ def testOneManyByteEcho(self):
+ self._perform_echo_test([_BYTE_SEQUENCE])
+
+ def testManyOneByteEchoes(self):
+ self._perform_echo_test(_BYTE_SEQUENCE)
+
+ def testManyManyByteEchoes(self):
+ self._perform_echo_test(_BYTE_SEQUENCE_SEQUENCE)
+
+
+class CancellationTest(unittest.TestCase):
+
+ def setUp(self):
+ self.host = 'localhost'
+
+ self.server_completion_queue = _low.CompletionQueue()
+ self.server = _low.Server(self.server_completion_queue)
+ port = self.server.add_http2_addr('[::]:0')
+ self.server.start()
+
+ self.client_completion_queue = _low.CompletionQueue()
+ self.channel = _low.Channel('%s:%d' % (self.host, port), None)
+
+ def tearDown(self):
+ self.server.stop()
+ del self.server
+ self.server_completion_queue.stop()
+ self.client_completion_queue.stop()
+ while True:
+ event = self.server_completion_queue.get(0)
+ if event is not None and event.kind is _low.Event.Kind.STOP:
+ break
+ while True:
+ event = self.client_completion_queue.get(0)
+ if event is not None and event.kind is _low.Event.Kind.STOP:
+ break
+
+ def testCancellation(self):
+ method = 'test method'
+ deadline = _FUTURE
+ metadata_tag = object()
+ finish_tag = object()
+ write_tag = object()
+ service_tag = object()
+ read_tag = object()
+ test_data = _BYTE_SEQUENCE_SEQUENCE
+
+ server_data = []
+ client_data = []
+
+ client_call = _low.Call(self.channel, self.client_completion_queue,
+ method, self.host, deadline)
+
+ client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag)
+
+ self.server.service(service_tag)
+ service_accepted = self.server_completion_queue.get(_FUTURE)
+ server_call = service_accepted.service_acceptance.call
+
+ server_call.accept(self.server_completion_queue, finish_tag)
+ server_call.premetadata()
+
+ metadata_accepted = self.client_completion_queue.get(_FUTURE)
+ self.assertIsNotNone(metadata_accepted)
+
+ for datum in test_data:
+ client_call.write(datum, write_tag)
+ write_accepted = self.client_completion_queue.get(_FUTURE)
+
+ server_call.read(read_tag)
+ read_accepted = self.server_completion_queue.get(_FUTURE)
+ server_data.append(read_accepted.bytes)
+
+ server_call.write(read_accepted.bytes, write_tag)
+ write_accepted = self.server_completion_queue.get(_FUTURE)
+ self.assertIsNotNone(write_accepted)
+
+ client_call.read(read_tag)
+ read_accepted = self.client_completion_queue.get(_FUTURE)
+ client_data.append(read_accepted.bytes)
+
+ client_call.cancel()
+ # cancel() is idempotent.
+ client_call.cancel()
+ client_call.cancel()
+ client_call.cancel()
+
+ server_call.read(read_tag)
+
+ server_terminal_event_one = self.server_completion_queue.get(_FUTURE)
+ server_terminal_event_two = self.server_completion_queue.get(_FUTURE)
+ if server_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED:
+ read_accepted = server_terminal_event_one
+ rpc_accepted = server_terminal_event_two
+ else:
+ read_accepted = server_terminal_event_two
+ rpc_accepted = server_terminal_event_one
+ self.assertIsNotNone(read_accepted)
+ self.assertIsNotNone(rpc_accepted)
+ self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
+ self.assertIsNone(read_accepted.bytes)
+ self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind)
+ self.assertEqual(_low.Status(_low.Code.CANCELLED, ''), rpc_accepted.status)
+
+ finish_event = self.client_completion_queue.get(_FUTURE)
+ self.assertEqual(_low.Event.Kind.FINISH, finish_event.kind)
+ self.assertEqual(_low.Status(_low.Code.CANCELLED, 'Cancelled'),
+ finish_event.status)
+
+ server_timeout_none_event = self.server_completion_queue.get(0)
+ self.assertIsNone(server_timeout_none_event)
+ client_timeout_none_event = self.client_completion_queue.get(0)
+ self.assertIsNone(client_timeout_none_event)
+
+ self.assertSequenceEqual(test_data, server_data)
+ self.assertSequenceEqual(test_data, client_data)
+
+
+class ExpirationTest(unittest.TestCase):
+
+ @unittest.skip('TODO(nathaniel): Expiration test!')
+ def testExpiration(self):
+ pass
+
+
+if __name__ == '__main__':
+ unittest.main(verbosity=2)
+
diff --git a/src/python/src/grpc/_adapter/_low.py b/src/python/src/grpc/_adapter/_low.py
index a24baaeb3e..0c1d3b40a5 100644
--- a/src/python/src/grpc/_adapter/_low.py
+++ b/src/python/src/grpc/_adapter/_low.py
@@ -27,31 +27,85 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-"""A Python interface for GRPC C core structures and behaviors."""
-
-import atexit
-import gc
-
from grpc._adapter import _c
-from grpc._adapter import _datatypes
-
-def _shut_down():
- # force garbage collection before shutting down grpc, to ensure all grpc
- # objects are cleaned up
- gc.collect()
- _c.shut_down()
-
-_c.init()
-atexit.register(_shut_down)
-
-# pylint: disable=invalid-name
-Code = _datatypes.Code
-Status = _datatypes.Status
-Event = _datatypes.Event
-Call = _c.Call
-Channel = _c.Channel
-CompletionQueue = _c.CompletionQueue
-Server = _c.Server
+from grpc._adapter import _types
+
ClientCredentials = _c.ClientCredentials
ServerCredentials = _c.ServerCredentials
-# pylint: enable=invalid-name
+
+
+class CompletionQueue(_types.CompletionQueue):
+
+ def __init__(self):
+ self.completion_queue = _c.CompletionQueue()
+
+ def next(self, deadline=float('+inf')):
+ raw_event = self.completion_queue.next(deadline)
+ if raw_event is None:
+ return None
+ event = _types.Event(*raw_event)
+ if event.call is not None:
+ event = event._replace(call=Call(event.call))
+ if event.call_details is not None:
+ event = event._replace(call_details=_types.CallDetails(*event.call_details))
+ if event.results is not None:
+ new_results = [_types.OpResult(*r) for r in event.results]
+ new_results = [r if r.status is None else r._replace(status=_types.Status(_types.StatusCode(r.status[0]), r.status[1])) for r in new_results]
+ event = event._replace(results=new_results)
+ return event
+
+ def shutdown(self):
+ self.completion_queue.shutdown()
+
+
+class Call(_types.Call):
+
+ def __init__(self, call):
+ self.call = call
+
+ def start_batch(self, ops, tag):
+ return self.call.start_batch(ops, tag)
+
+ def cancel(self, code=None, details=None):
+ if code is None and details is None:
+ return self.call.cancel()
+ else:
+ return self.call.cancel(code, details)
+
+
+class Channel(_types.Channel):
+
+ def __init__(self, target, args, creds=None):
+ if creds is None:
+ self.channel = _c.Channel(target, args)
+ else:
+ self.channel = _c.Channel(target, args, creds)
+
+ def create_call(self, completion_queue, method, host, deadline=None):
+ return Call(self.channel.create_call(completion_queue.completion_queue, method, host, deadline))
+
+
+_NO_TAG = object()
+
+class Server(_types.Server):
+
+ def __init__(self, completion_queue, args):
+ self.server = _c.Server(completion_queue.completion_queue, args)
+
+ def add_http2_port(self, addr, creds=None):
+ if creds is None:
+ return self.server.add_http2_port(addr)
+ else:
+ return self.server.add_http2_port(addr, creds)
+
+ def start(self):
+ return self.server.start()
+
+ def shutdown(self, tag=_NO_TAG):
+ if tag is _NO_TAG:
+ return self.server.shutdown()
+ else:
+ return self.server.shutdown(tag)
+
+ def request_call(self, completion_queue, tag):
+ return self.server.request_call(completion_queue.completion_queue, tag)
diff --git a/src/python/src/grpc/_adapter/_low_test.py b/src/python/src/grpc/_adapter/_low_test.py
index 22006c36b8..e53b176caf 100644
--- a/src/python/src/grpc/_adapter/_low_test.py
+++ b/src/python/src/grpc/_adapter/_low_test.py
@@ -27,375 +27,141 @@
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-"""Tests for _adapter._low."""
-
import time
import unittest
+from grpc._adapter import _types
from grpc._adapter import _low
-_STREAM_LENGTH = 300
-_TIMEOUT = 5
-_AFTER_DELAY = 2
-_FUTURE = time.time() + 60 * 60 * 24
-_BYTE_SEQUENCE = b'\abcdefghijklmnopqrstuvwxyz0123456789' * 200
-_BYTE_SEQUENCE_SEQUENCE = tuple(
- bytes(bytearray((row + column) % 256 for column in range(row)))
- for row in range(_STREAM_LENGTH))
-
-class LonelyClientTest(unittest.TestCase):
-
- def testLonelyClient(self):
- host = 'nosuchhostexists'
- port = 54321
- method = 'test method'
- deadline = time.time() + _TIMEOUT
- after_deadline = deadline + _AFTER_DELAY
- metadata_tag = object()
- finish_tag = object()
-
- completion_queue = _low.CompletionQueue()
- channel = _low.Channel('%s:%d' % (host, port), None)
- client_call = _low.Call(channel, completion_queue, method, host, deadline)
-
- client_call.invoke(completion_queue, metadata_tag, finish_tag)
- first_event = completion_queue.get(after_deadline)
- self.assertIsNotNone(first_event)
- second_event = completion_queue.get(after_deadline)
- self.assertIsNotNone(second_event)
- kinds = [event.kind for event in (first_event, second_event)]
- self.assertItemsEqual(
- (_low.Event.Kind.METADATA_ACCEPTED, _low.Event.Kind.FINISH),
- kinds)
-
- self.assertIsNone(completion_queue.get(after_deadline))
-
- completion_queue.stop()
- stop_event = completion_queue.get(_FUTURE)
- self.assertEqual(_low.Event.Kind.STOP, stop_event.kind)
-
-
-class EchoTest(unittest.TestCase):
-
- def setUp(self):
- self.host = 'localhost'
-
- self.completion_queue = _low.CompletionQueue()
-
- self.server = _low.Server(self.completion_queue)
- port = self.server.add_http2_addr('[::]:0')
- self.server.start()
-
- self.channel = _low.Channel('%s:%d' % (self.host, port), None)
-
- def tearDown(self):
- self.server.stop()
- # NOTE(nathaniel): Yep, this is weird; it's a consequence of
- # grpc_server_destroy's being what has the effect of telling the server's
- # completion queue to pump out all pending events/tags immediately rather
- # than gracefully completing all outstanding RPCs while accepting no new
- # ones.
- # TODO(nathaniel): Deallocation of a Python object shouldn't have this kind
- # of observable side effect let alone such an important one.
- del self.server
- self.completion_queue.stop()
- while True:
- event = self.completion_queue.get(_FUTURE)
- if event is not None and event.kind is _low.Event.Kind.STOP:
- break
- self.completion_queue = None
-
- def _perform_echo_test(self, test_data):
- method = 'test method'
- details = 'test details'
- server_leading_metadata_key = 'my_server_leading_key'
- server_leading_metadata_value = 'my_server_leading_value'
- server_trailing_metadata_key = 'my_server_trailing_key'
- server_trailing_metadata_value = 'my_server_trailing_value'
- client_metadata_key = 'my_client_key'
- client_metadata_value = 'my_client_value'
- server_leading_binary_metadata_key = 'my_server_leading_key-bin'
- server_leading_binary_metadata_value = b'\0'*2047
- server_trailing_binary_metadata_key = 'my_server_trailing_key-bin'
- server_trailing_binary_metadata_value = b'\0'*2047
- client_binary_metadata_key = 'my_client_key-bin'
- client_binary_metadata_value = b'\0'*2047
- deadline = _FUTURE
- metadata_tag = object()
- finish_tag = object()
- write_tag = object()
- complete_tag = object()
- service_tag = object()
- read_tag = object()
- status_tag = object()
-
- server_data = []
- client_data = []
-
- client_call = _low.Call(self.channel, self.completion_queue,
- method, self.host, deadline)
- client_call.add_metadata(client_metadata_key, client_metadata_value)
- client_call.add_metadata(client_binary_metadata_key,
- client_binary_metadata_value)
-
- client_call.invoke(self.completion_queue, metadata_tag, finish_tag)
-
- self.server.service(service_tag)
- service_accepted = self.completion_queue.get(_FUTURE)
- self.assertIsNotNone(service_accepted)
- self.assertIs(service_accepted.kind, _low.Event.Kind.SERVICE_ACCEPTED)
- self.assertIs(service_accepted.tag, service_tag)
- self.assertEqual(method, service_accepted.service_acceptance.method)
- self.assertEqual(self.host, service_accepted.service_acceptance.host)
- self.assertIsNotNone(service_accepted.service_acceptance.call)
- metadata = dict(service_accepted.metadata)
- self.assertIn(client_metadata_key, metadata)
- self.assertEqual(client_metadata_value, metadata[client_metadata_key])
- self.assertIn(client_binary_metadata_key, metadata)
- self.assertEqual(client_binary_metadata_value,
- metadata[client_binary_metadata_key])
- server_call = service_accepted.service_acceptance.call
- server_call.accept(self.completion_queue, finish_tag)
- server_call.add_metadata(server_leading_metadata_key,
- server_leading_metadata_value)
- server_call.add_metadata(server_leading_binary_metadata_key,
- server_leading_binary_metadata_value)
- server_call.premetadata()
-
- metadata_accepted = self.completion_queue.get(_FUTURE)
- self.assertIsNotNone(metadata_accepted)
- self.assertEqual(_low.Event.Kind.METADATA_ACCEPTED, metadata_accepted.kind)
- self.assertEqual(metadata_tag, metadata_accepted.tag)
- metadata = dict(metadata_accepted.metadata)
- self.assertIn(server_leading_metadata_key, metadata)
- self.assertEqual(server_leading_metadata_value,
- metadata[server_leading_metadata_key])
- self.assertIn(server_leading_binary_metadata_key, metadata)
- self.assertEqual(server_leading_binary_metadata_value,
- metadata[server_leading_binary_metadata_key])
-
- for datum in test_data:
- client_call.write(datum, write_tag)
- write_accepted = self.completion_queue.get(_FUTURE)
- self.assertIsNotNone(write_accepted)
- self.assertIs(write_accepted.kind, _low.Event.Kind.WRITE_ACCEPTED)
- self.assertIs(write_accepted.tag, write_tag)
- self.assertIs(write_accepted.write_accepted, True)
-
- server_call.read(read_tag)
- read_accepted = self.completion_queue.get(_FUTURE)
- self.assertIsNotNone(read_accepted)
- self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
- self.assertEqual(read_tag, read_accepted.tag)
- self.assertIsNotNone(read_accepted.bytes)
- server_data.append(read_accepted.bytes)
-
- server_call.write(read_accepted.bytes, write_tag)
- write_accepted = self.completion_queue.get(_FUTURE)
- self.assertIsNotNone(write_accepted)
- self.assertEqual(_low.Event.Kind.WRITE_ACCEPTED, write_accepted.kind)
- self.assertEqual(write_tag, write_accepted.tag)
- self.assertTrue(write_accepted.write_accepted)
-
- client_call.read(read_tag)
- read_accepted = self.completion_queue.get(_FUTURE)
- self.assertIsNotNone(read_accepted)
- self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
- self.assertEqual(read_tag, read_accepted.tag)
- self.assertIsNotNone(read_accepted.bytes)
- client_data.append(read_accepted.bytes)
-
- client_call.complete(complete_tag)
- complete_accepted = self.completion_queue.get(_FUTURE)
- self.assertIsNotNone(complete_accepted)
- self.assertIs(complete_accepted.kind, _low.Event.Kind.COMPLETE_ACCEPTED)
- self.assertIs(complete_accepted.tag, complete_tag)
- self.assertIs(complete_accepted.complete_accepted, True)
-
- server_call.read(read_tag)
- read_accepted = self.completion_queue.get(_FUTURE)
- self.assertIsNotNone(read_accepted)
- self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
- self.assertEqual(read_tag, read_accepted.tag)
- self.assertIsNone(read_accepted.bytes)
- server_call.add_metadata(server_trailing_metadata_key,
- server_trailing_metadata_value)
- server_call.add_metadata(server_trailing_binary_metadata_key,
- server_trailing_binary_metadata_value)
-
- server_call.status(_low.Status(_low.Code.OK, details), status_tag)
- server_terminal_event_one = self.completion_queue.get(_FUTURE)
- server_terminal_event_two = self.completion_queue.get(_FUTURE)
- if server_terminal_event_one.kind == _low.Event.Kind.COMPLETE_ACCEPTED:
- status_accepted = server_terminal_event_one
- rpc_accepted = server_terminal_event_two
- else:
- status_accepted = server_terminal_event_two
- rpc_accepted = server_terminal_event_one
- self.assertIsNotNone(status_accepted)
- self.assertIsNotNone(rpc_accepted)
- self.assertEqual(_low.Event.Kind.COMPLETE_ACCEPTED, status_accepted.kind)
- self.assertEqual(status_tag, status_accepted.tag)
- self.assertTrue(status_accepted.complete_accepted)
- self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind)
- self.assertEqual(finish_tag, rpc_accepted.tag)
- self.assertEqual(_low.Status(_low.Code.OK, ''), rpc_accepted.status)
-
- client_call.read(read_tag)
- client_terminal_event_one = self.completion_queue.get(_FUTURE)
- client_terminal_event_two = self.completion_queue.get(_FUTURE)
- if client_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED:
- read_accepted = client_terminal_event_one
- finish_accepted = client_terminal_event_two
- else:
- read_accepted = client_terminal_event_two
- finish_accepted = client_terminal_event_one
- self.assertIsNotNone(read_accepted)
- self.assertIsNotNone(finish_accepted)
- self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
- self.assertEqual(read_tag, read_accepted.tag)
- self.assertIsNone(read_accepted.bytes)
- self.assertEqual(_low.Event.Kind.FINISH, finish_accepted.kind)
- self.assertEqual(finish_tag, finish_accepted.tag)
- self.assertEqual(_low.Status(_low.Code.OK, details), finish_accepted.status)
- metadata = dict(finish_accepted.metadata)
- self.assertIn(server_trailing_metadata_key, metadata)
- self.assertEqual(server_trailing_metadata_value,
- metadata[server_trailing_metadata_key])
- self.assertIn(server_trailing_binary_metadata_key, metadata)
- self.assertEqual(server_trailing_binary_metadata_value,
- metadata[server_trailing_binary_metadata_key])
-
- server_timeout_none_event = self.completion_queue.get(0)
- self.assertIsNone(server_timeout_none_event)
- client_timeout_none_event = self.completion_queue.get(0)
- self.assertIsNone(client_timeout_none_event)
-
- self.assertSequenceEqual(test_data, server_data)
- self.assertSequenceEqual(test_data, client_data)
-
- def testNoEcho(self):
- self._perform_echo_test(())
-
- def testOneByteEcho(self):
- self._perform_echo_test([b'\x07'])
-
- def testOneManyByteEcho(self):
- self._perform_echo_test([_BYTE_SEQUENCE])
-
- def testManyOneByteEchoes(self):
- self._perform_echo_test(_BYTE_SEQUENCE)
-
- def testManyManyByteEchoes(self):
- self._perform_echo_test(_BYTE_SEQUENCE_SEQUENCE)
-
-class CancellationTest(unittest.TestCase):
+class InsecureServerInsecureClient(unittest.TestCase):
def setUp(self):
- self.host = 'localhost'
+ self.server_completion_queue = _low.CompletionQueue()
+ self.server = _low.Server(self.server_completion_queue, [])
+ self.port = self.server.add_http2_port('[::]:0')
+ self.client_completion_queue = _low.CompletionQueue()
+ self.client_channel = _low.Channel('localhost:%d'%self.port, [])
- self.completion_queue = _low.CompletionQueue()
- self.server = _low.Server(self.completion_queue)
- port = self.server.add_http2_addr('[::]:0')
self.server.start()
- self.channel = _low.Channel('%s:%d' % (self.host, port), None)
-
def tearDown(self):
- self.server.stop()
+ self.server.shutdown()
+ del self.client_channel
del self.server
- self.completion_queue.stop()
- while True:
- event = self.completion_queue.get(0)
- if event is not None and event.kind is _low.Event.Kind.STOP:
- break
-
- def testCancellation(self):
- method = 'test method'
- deadline = _FUTURE
- metadata_tag = object()
- client_finish_tag = object()
- server_finish_tag = object()
- write_tag = object()
- service_tag = object()
- read_tag = object()
- test_data = _BYTE_SEQUENCE_SEQUENCE
-
- server_data = []
- client_data = []
-
- client_call = _low.Call(self.channel, self.completion_queue,
- method, self.host, deadline)
-
- client_call.invoke(self.completion_queue, metadata_tag, client_finish_tag)
-
- self.server.service(service_tag)
- service_accepted = self.completion_queue.get(_FUTURE)
- server_call = service_accepted.service_acceptance.call
-
- server_call.accept(self.completion_queue, server_finish_tag)
- server_call.premetadata()
-
- metadata_accepted = self.completion_queue.get(_FUTURE)
- self.assertIsNotNone(metadata_accepted)
-
- for datum in test_data:
- client_call.write(datum, write_tag)
- write_accepted = self.completion_queue.get(_FUTURE)
-
- server_call.read(read_tag)
- read_accepted = self.completion_queue.get(_FUTURE)
- server_data.append(read_accepted.bytes)
-
- server_call.write(read_accepted.bytes, write_tag)
- write_accepted = self.completion_queue.get(_FUTURE)
- self.assertIsNotNone(write_accepted)
-
- client_call.read(read_tag)
- read_accepted = self.completion_queue.get(_FUTURE)
- client_data.append(read_accepted.bytes)
-
- client_call.cancel()
- # cancel() is idempotent.
- client_call.cancel()
- client_call.cancel()
- client_call.cancel()
-
- server_call.read(read_tag)
-
- events = dict((ev.tag, ev) for ev in [
- self.completion_queue.get(_FUTURE),
- self.completion_queue.get(_FUTURE),
- self.completion_queue.get(_FUTURE)])
- read_accepted = events[read_tag]
- rpc_accepted = events[server_finish_tag]
- finish_event = events[client_finish_tag]
- self.assertIsNotNone(read_accepted)
- self.assertIsNotNone(rpc_accepted)
- self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
- self.assertIsNone(read_accepted.bytes)
- self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind)
- self.assertEqual(_low.Status(_low.Code.CANCELLED, ''), rpc_accepted.status)
-
- self.assertEqual(_low.Event.Kind.FINISH, finish_event.kind)
- self.assertEqual(_low.Status(_low.Code.CANCELLED, 'Cancelled'),
- finish_event.status)
-
- server_timeout_none_event = self.completion_queue.get(0)
- self.assertIsNone(server_timeout_none_event)
- client_timeout_none_event = self.completion_queue.get(0)
- self.assertIsNone(client_timeout_none_event)
-
- self.assertSequenceEqual(test_data, server_data)
- self.assertSequenceEqual(test_data, client_data)
-
-
-class ExpirationTest(unittest.TestCase):
- @unittest.skip('TODO(nathaniel): Expiration test!')
- def testExpiration(self):
- pass
+ self.client_completion_queue.shutdown()
+ while self.client_completion_queue.next().type != _types.EventType.QUEUE_SHUTDOWN:
+ pass
+ self.server_completion_queue.shutdown()
+ while self.server_completion_queue.next().type != _types.EventType.QUEUE_SHUTDOWN:
+ pass
+
+ del self.client_completion_queue
+ del self.server_completion_queue
+
+ def testEcho(self):
+ DEADLINE = time.time()+5
+ DEADLINE_TOLERANCE = 0.25
+ CLIENT_METADATA_ASCII_KEY = 'key'
+ CLIENT_METADATA_ASCII_VALUE = 'val'
+ CLIENT_METADATA_BIN_KEY = 'key-bin'
+ CLIENT_METADATA_BIN_VALUE = b'\0'*1000
+ SERVER_INITIAL_METADATA_KEY = 'init_me_me_me'
+ SERVER_INITIAL_METADATA_VALUE = 'whodawha?'
+ SERVER_TRAILING_METADATA_KEY = 'California_is_in_a_drought'
+ SERVER_TRAILING_METADATA_VALUE = 'zomg it is'
+ SERVER_STATUS_CODE = _types.StatusCode.OK
+ SERVER_STATUS_DETAILS = 'our work is never over'
+ REQUEST = 'in death a member of project mayhem has a name'
+ RESPONSE = 'his name is robert paulson'
+ METHOD = 'twinkies'
+ HOST = 'hostess'
+ server_request_tag = object()
+ request_call_result = self.server.request_call(self.server_completion_queue, server_request_tag)
+
+ self.assertEquals(_types.CallError.OK, request_call_result)
+
+ client_call_tag = object()
+ client_call = self.client_channel.create_call(self.client_completion_queue, METHOD, HOST, DEADLINE)
+ client_initial_metadata = [(CLIENT_METADATA_ASCII_KEY, CLIENT_METADATA_ASCII_VALUE), (CLIENT_METADATA_BIN_KEY, CLIENT_METADATA_BIN_VALUE)]
+ client_start_batch_result = client_call.start_batch([
+ _types.OpArgs.send_initial_metadata(client_initial_metadata),
+ _types.OpArgs.send_message(REQUEST),
+ _types.OpArgs.send_close_from_client(),
+ _types.OpArgs.recv_initial_metadata(),
+ _types.OpArgs.recv_message(),
+ _types.OpArgs.recv_status_on_client()
+ ], client_call_tag)
+ self.assertEquals(_types.CallError.OK, client_start_batch_result)
+
+ request_event = self.server_completion_queue.next(DEADLINE)
+ self.assertEquals(_types.EventType.OP_COMPLETE, request_event.type)
+ self.assertIsInstance(request_event.call, _low.Call)
+ self.assertIs(server_request_tag, request_event.tag)
+ self.assertEquals(1, len(request_event.results))
+ self.assertEquals(dict(client_initial_metadata), dict(request_event.results[0].initial_metadata))
+ self.assertEquals(METHOD, request_event.call_details.method)
+ self.assertEquals(HOST, request_event.call_details.host)
+ self.assertLess(abs(DEADLINE - request_event.call_details.deadline), DEADLINE_TOLERANCE)
+
+ server_call_tag = object()
+ server_call = request_event.call
+ server_initial_metadata = [(SERVER_INITIAL_METADATA_KEY, SERVER_INITIAL_METADATA_VALUE)]
+ server_trailing_metadata = [(SERVER_TRAILING_METADATA_KEY, SERVER_TRAILING_METADATA_VALUE)]
+ server_start_batch_result = server_call.start_batch([
+ _types.OpArgs.send_initial_metadata(server_initial_metadata),
+ _types.OpArgs.recv_message(),
+ _types.OpArgs.send_message(RESPONSE),
+ _types.OpArgs.recv_close_on_server(),
+ _types.OpArgs.send_status_from_server(server_trailing_metadata, SERVER_STATUS_CODE, SERVER_STATUS_DETAILS)
+ ], server_call_tag)
+ self.assertEquals(_types.CallError.OK, server_start_batch_result)
+
+ client_event = self.client_completion_queue.next(DEADLINE)
+ server_event = self.server_completion_queue.next(DEADLINE)
+
+ self.assertEquals(6, len(client_event.results))
+ found_client_op_types = set()
+ for client_result in client_event.results:
+ self.assertNotIn(client_result.type, found_client_op_types) # we expect each op type to be unique
+ found_client_op_types.add(client_result.type)
+ if client_result.type == _types.OpType.RECV_INITIAL_METADATA:
+ self.assertEquals(dict(server_initial_metadata), dict(client_result.initial_metadata))
+ elif client_result.type == _types.OpType.RECV_MESSAGE:
+ self.assertEquals(RESPONSE, client_result.message)
+ elif client_result.type == _types.OpType.RECV_STATUS_ON_CLIENT:
+ self.assertEquals(dict(server_trailing_metadata), dict(client_result.trailing_metadata))
+ self.assertEquals(SERVER_STATUS_DETAILS, client_result.status.details)
+ self.assertEquals(SERVER_STATUS_CODE, client_result.status.code)
+ self.assertEquals(set([
+ _types.OpType.SEND_INITIAL_METADATA,
+ _types.OpType.SEND_MESSAGE,
+ _types.OpType.SEND_CLOSE_FROM_CLIENT,
+ _types.OpType.RECV_INITIAL_METADATA,
+ _types.OpType.RECV_MESSAGE,
+ _types.OpType.RECV_STATUS_ON_CLIENT
+ ]), found_client_op_types)
+
+ self.assertEquals(5, len(server_event.results))
+ found_server_op_types = set()
+ for server_result in server_event.results:
+ self.assertNotIn(client_result.type, found_server_op_types)
+ found_server_op_types.add(server_result.type)
+ if server_result.type == _types.OpType.RECV_MESSAGE:
+ self.assertEquals(REQUEST, server_result.message)
+ elif server_result.type == _types.OpType.RECV_CLOSE_ON_SERVER:
+ self.assertFalse(server_result.cancelled)
+ self.assertEquals(set([
+ _types.OpType.SEND_INITIAL_METADATA,
+ _types.OpType.RECV_MESSAGE,
+ _types.OpType.SEND_MESSAGE,
+ _types.OpType.RECV_CLOSE_ON_SERVER,
+ _types.OpType.SEND_STATUS_FROM_SERVER
+ ]), found_server_op_types)
+
+ del client_call
+ del server_call
if __name__ == '__main__':
diff --git a/src/python/src/grpc/_adapter/_server.c b/src/python/src/grpc/_adapter/_server.c
deleted file mode 100644
index a6c20bf132..0000000000
--- a/src/python/src/grpc/_adapter/_server.c
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "grpc/_adapter/_server.h"
-
-#include <Python.h>
-#include <grpc/grpc.h>
-
-#include "grpc/_adapter/_call.h"
-#include "grpc/_adapter/_completion_queue.h"
-#include "grpc/_adapter/_error.h"
-#include "grpc/_adapter/_server_credentials.h"
-#include "grpc/_adapter/_tag.h"
-
-static int pygrpc_server_init(Server *self, PyObject *args, PyObject *kwds) {
- CompletionQueue *completion_queue;
- static char *kwlist[] = {"completion_queue", NULL};
-
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "O!:Server", kwlist,
- &pygrpc_CompletionQueueType,
- &completion_queue)) {
- return -1;
- }
- self->c_server = grpc_server_create(NULL);
- grpc_server_register_completion_queue(self->c_server,
- completion_queue->c_completion_queue);
- self->completion_queue = completion_queue;
- Py_INCREF(completion_queue);
- return 0;
-}
-
-static void pygrpc_server_dealloc(Server *self) {
- if (self->c_server != NULL) {
- grpc_server_destroy(self->c_server);
- }
- Py_XDECREF(self->completion_queue);
- self->ob_type->tp_free((PyObject *)self);
-}
-
-static PyObject *pygrpc_server_add_http2_addr(Server *self, PyObject *args) {
- const char *addr;
- int port;
- if (!PyArg_ParseTuple(args, "s:add_http2_addr", &addr)) {
- return NULL;
- }
-
- port = grpc_server_add_http2_port(self->c_server, addr);
- if (port == 0) {
- PyErr_SetString(PyExc_RuntimeError, "Couldn't add port to server!");
- return NULL;
- }
-
- return PyInt_FromLong(port);
-}
-
-static PyObject *pygrpc_server_add_secure_http2_addr(Server *self,
- PyObject *args,
- PyObject *kwargs) {
- const char *addr;
- PyObject *server_credentials;
- static char *kwlist[] = {"addr", "server_credentials", NULL};
- int port;
-
- if (!PyArg_ParseTupleAndKeywords(args, kwargs, "sO!:add_secure_http2_addr",
- kwlist, &addr, &pygrpc_ServerCredentialsType,
- &server_credentials)) {
- return NULL;
- }
- port = grpc_server_add_secure_http2_port(
- self->c_server, addr,
- ((ServerCredentials *)server_credentials)->c_server_credentials);
- if (port == 0) {
- PyErr_SetString(PyExc_RuntimeError, "Couldn't add port to server!");
- return NULL;
- }
- return PyInt_FromLong(port);
-}
-
-static PyObject *pygrpc_server_start(Server *self) {
- grpc_server_start(self->c_server);
-
- Py_RETURN_NONE;
-}
-
-static const PyObject *pygrpc_server_service(Server *self, PyObject *tag) {
- grpc_call_error call_error;
- const PyObject *result;
- pygrpc_tag *c_tag = pygrpc_tag_new_server_rpc_call(tag);
- c_tag->call->completion_queue = self->completion_queue;
- c_tag->call->server = self;
- Py_INCREF(c_tag->call->completion_queue);
- Py_INCREF(c_tag->call->server);
- call_error = grpc_server_request_call(
- self->c_server, &c_tag->call->c_call, &c_tag->call->call_details,
- &c_tag->call->recv_metadata, self->completion_queue->c_completion_queue,
- self->completion_queue->c_completion_queue, c_tag);
-
- result = pygrpc_translate_call_error(call_error);
- if (result != NULL) {
- Py_INCREF(tag);
- }
- return result;
-}
-
-static PyObject *pygrpc_server_stop(Server *self) {
- grpc_server_shutdown(self->c_server);
-
- Py_RETURN_NONE;
-}
-
-static PyMethodDef methods[] = {
- {"add_http2_addr", (PyCFunction)pygrpc_server_add_http2_addr, METH_VARARGS,
- "Add an HTTP2 address."},
- {"add_secure_http2_addr", (PyCFunction)pygrpc_server_add_secure_http2_addr,
- METH_VARARGS, "Add a secure HTTP2 address."},
- {"start", (PyCFunction)pygrpc_server_start, METH_NOARGS,
- "Starts the server."},
- {"service", (PyCFunction)pygrpc_server_service, METH_O, "Services a call."},
- {"stop", (PyCFunction)pygrpc_server_stop, METH_NOARGS, "Stops the server."},
- {NULL}};
-
-static PyTypeObject pygrpc_ServerType = {
- PyVarObject_HEAD_INIT(NULL, 0)
- "_gprc.Server", /*tp_name*/
- sizeof(Server), /*tp_basicsize*/
- 0, /*tp_itemsize*/
- (destructor)pygrpc_server_dealloc, /*tp_dealloc*/
- 0, /*tp_print*/
- 0, /*tp_getattr*/
- 0, /*tp_setattr*/
- 0, /*tp_compare*/
- 0, /*tp_repr*/
- 0, /*tp_as_number*/
- 0, /*tp_as_sequence*/
- 0, /*tp_as_mapping*/
- 0, /*tp_hash */
- 0, /*tp_call*/
- 0, /*tp_str*/
- 0, /*tp_getattro*/
- 0, /*tp_setattro*/
- 0, /*tp_as_buffer*/
- Py_TPFLAGS_DEFAULT, /*tp_flags*/
- "Wrapping of grpc_server.", /* tp_doc */
- 0, /* tp_traverse */
- 0, /* tp_clear */
- 0, /* tp_richcompare */
- 0, /* tp_weaklistoffset */
- 0, /* tp_iter */
- 0, /* tp_iternext */
- methods, /* tp_methods */
- 0, /* tp_members */
- 0, /* tp_getset */
- 0, /* tp_base */
- 0, /* tp_dict */
- 0, /* tp_descr_get */
- 0, /* tp_descr_set */
- 0, /* tp_dictoffset */
- (initproc)pygrpc_server_init, /* tp_init */
- 0, /* tp_alloc */
- PyType_GenericNew, /* tp_new */
-};
-
-int pygrpc_add_server(PyObject *module) {
- if (PyType_Ready(&pygrpc_ServerType) < 0) {
- return -1;
- }
- if (PyModule_AddObject(module, "Server", (PyObject *)&pygrpc_ServerType) ==
- -1) {
- return -1;
- }
- return 0;
-}
diff --git a/src/python/src/grpc/_adapter/_server_credentials.c b/src/python/src/grpc/_adapter/_server_credentials.c
deleted file mode 100644
index 06e6b94974..0000000000
--- a/src/python/src/grpc/_adapter/_server_credentials.c
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "grpc/_adapter/_server_credentials.h"
-
-#include <Python.h>
-#include <grpc/grpc_security.h>
-#include <grpc/support/alloc.h>
-
-static int pygrpc_server_credentials_init(ServerCredentials *self,
- PyObject *args, PyObject *kwds) {
- char *root_certificates;
- PyObject *pair_sequence;
- Py_ssize_t pair_count;
- grpc_ssl_pem_key_cert_pair *pairs;
- int error;
- PyObject *iterator;
- int i;
- PyObject *pair;
- static char *kwlist[] = {"root_credentials", "pair_sequence", NULL};
-
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "zO:ServerCredentials", kwlist,
- &root_certificates, &pair_sequence)) {
- return -1;
- }
-
- pair_count = PySequence_Length(pair_sequence);
- if (pair_count == -1) {
- return -1;
- }
-
- iterator = PyObject_GetIter(pair_sequence);
- if (iterator == NULL) {
- return -1;
- }
- pairs = gpr_malloc(pair_count * sizeof(grpc_ssl_pem_key_cert_pair));
- error = 0;
- for (i = 0; i < pair_count; i++) {
- pair = PyIter_Next(iterator);
- if (pair == NULL) {
- error = 1;
- break;
- }
- if (!PyArg_ParseTuple(pair, "ss", &pairs[i].private_key,
- &pairs[i].cert_chain)) {
- error = 1;
- Py_DECREF(pair);
- break;
- }
- Py_DECREF(pair);
- }
- Py_DECREF(iterator);
-
- if (error) {
- gpr_free(pairs);
- return -1;
- } else {
- self->c_server_credentials = grpc_ssl_server_credentials_create(
- root_certificates, pairs, pair_count);
- gpr_free(pairs);
- return 0;
- }
-}
-
-static void pygrpc_server_credentials_dealloc(ServerCredentials *self) {
- if (self->c_server_credentials != NULL) {
- grpc_server_credentials_release(self->c_server_credentials);
- }
- self->ob_type->tp_free((PyObject *)self);
-}
-
-PyTypeObject pygrpc_ServerCredentialsType = {
- PyVarObject_HEAD_INIT(NULL, 0)
- "_grpc.ServerCredencials", /*tp_name*/
- sizeof(ServerCredentials), /*tp_basicsize*/
- 0, /*tp_itemsize*/
- (destructor)pygrpc_server_credentials_dealloc, /*tp_dealloc*/
- 0, /*tp_print*/
- 0, /*tp_getattr*/
- 0, /*tp_setattr*/
- 0, /*tp_compare*/
- 0, /*tp_repr*/
- 0, /*tp_as_number*/
- 0, /*tp_as_sequence*/
- 0, /*tp_as_mapping*/
- 0, /*tp_hash */
- 0, /*tp_call*/
- 0, /*tp_str*/
- 0, /*tp_getattro*/
- 0, /*tp_setattro*/
- 0, /*tp_as_buffer*/
- Py_TPFLAGS_DEFAULT, /*tp_flags*/
- "Wrapping of grpc_server_credentials.", /* tp_doc */
- 0, /* tp_traverse */
- 0, /* tp_clear */
- 0, /* tp_richcompare */
- 0, /* tp_weaklistoffset */
- 0, /* tp_iter */
- 0, /* tp_iternext */
- 0, /* tp_methods */
- 0, /* tp_members */
- 0, /* tp_getset */
- 0, /* tp_base */
- 0, /* tp_dict */
- 0, /* tp_descr_get */
- 0, /* tp_descr_set */
- 0, /* tp_dictoffset */
- (initproc)pygrpc_server_credentials_init, /* tp_init */
- 0, /* tp_alloc */
- PyType_GenericNew, /* tp_new */
-};
-
-int pygrpc_add_server_credentials(PyObject *module) {
- if (PyType_Ready(&pygrpc_ServerCredentialsType) < 0) {
- return -1;
- }
- if (PyModule_AddObject(module, "ServerCredentials",
- (PyObject *)&pygrpc_ServerCredentialsType) == -1) {
- return -1;
- }
- return 0;
-}
diff --git a/src/python/src/grpc/_adapter/_server_credentials.h b/src/python/src/grpc/_adapter/_server_credentials.h
deleted file mode 100644
index 75af934089..0000000000
--- a/src/python/src/grpc/_adapter/_server_credentials.h
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef _ADAPTER__SERVER_CREDENTIALS_H_
-#define _ADAPTER__SERVER_CREDENTIALS_H_
-
-#include <Python.h>
-#include <grpc/grpc_security.h>
-
-typedef struct {
- PyObject_HEAD
- grpc_server_credentials *c_server_credentials;
-} ServerCredentials;
-
-extern PyTypeObject pygrpc_ServerCredentialsType;
-
-int pygrpc_add_server_credentials(PyObject *module);
-
-#endif /* _ADAPTER__SERVER_CREDENTIALS_H_ */
diff --git a/src/python/src/grpc/_adapter/_tag.c b/src/python/src/grpc/_adapter/_tag.c
deleted file mode 100644
index 9c6ee19d79..0000000000
--- a/src/python/src/grpc/_adapter/_tag.c
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "grpc/_adapter/_tag.h"
-
-#include <Python.h>
-#include <grpc/grpc.h>
-#include <grpc/support/alloc.h>
-
-pygrpc_tag *pygrpc_tag_new(pygrpc_tag_type type, PyObject *user_tag,
- Call *call) {
- pygrpc_tag *self = (pygrpc_tag *)gpr_malloc(sizeof(pygrpc_tag));
- memset(self, 0, sizeof(pygrpc_tag));
- if (user_tag == NULL) {
- self->user_tag = Py_None;
- } else {
- self->user_tag = user_tag;
- }
- Py_INCREF(self->user_tag);
- self->type = type;
- self->call = call;
- Py_INCREF(call);
- return self;
-}
-
-pygrpc_tag *pygrpc_tag_new_server_rpc_call(PyObject *user_tag) {
- return pygrpc_tag_new(PYGRPC_SERVER_RPC_NEW, user_tag,
- (Call *)pygrpc_CallType.tp_alloc(&pygrpc_CallType, 0));
-}
-
-void pygrpc_tag_destroy(pygrpc_tag *self) {
- Py_XDECREF(self->user_tag);
- Py_XDECREF(self->call);
- gpr_free(self);
-}
diff --git a/src/python/src/grpc/_adapter/_tag.h b/src/python/src/grpc/_adapter/_tag.h
deleted file mode 100644
index 64812aa7e7..0000000000
--- a/src/python/src/grpc/_adapter/_tag.h
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef _ADAPTER__TAG_H_
-#define _ADAPTER__TAG_H_
-
-#include <Python.h>
-#include <grpc/grpc.h>
-
-#include "grpc/_adapter/_call.h"
-#include "grpc/_adapter/_completion_queue.h"
-
-/* grpc_completion_type is becoming meaningless in grpc_event; this is a partial
- replacement for its descriptive functionality until Python can move its whole
- C and C adapter stack to more closely resemble the core batching API. */
-typedef enum {
- PYGRPC_SERVER_RPC_NEW = 0,
- PYGRPC_INITIAL_METADATA = 1,
- PYGRPC_READ = 2,
- PYGRPC_WRITE_ACCEPTED = 3,
- PYGRPC_FINISH_ACCEPTED = 4,
- PYGRPC_CLIENT_METADATA_READ = 5,
- PYGRPC_FINISHED_CLIENT = 6,
- PYGRPC_FINISHED_SERVER = 7
-} pygrpc_tag_type;
-
-typedef struct {
- pygrpc_tag_type type;
- PyObject *user_tag;
-
- Call *call;
-} pygrpc_tag;
-
-pygrpc_tag *pygrpc_tag_new(pygrpc_tag_type type, PyObject *user_tag,
- Call *call);
-pygrpc_tag *pygrpc_tag_new_server_rpc_call(PyObject *user_tag);
-void pygrpc_tag_destroy(pygrpc_tag *self);
-
-#endif /* _ADAPTER__TAG_H_ */
-
diff --git a/src/python/src/grpc/_adapter/_types.py b/src/python/src/grpc/_adapter/_types.py
new file mode 100644
index 0000000000..5ddb1774ea
--- /dev/null
+++ b/src/python/src/grpc/_adapter/_types.py
@@ -0,0 +1,368 @@
+# Copyright 2015, Google Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above
+# copyright notice, this list of conditions and the following disclaimer
+# in the documentation and/or other materials provided with the
+# distribution.
+# * Neither the name of Google Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+import abc
+import collections
+import enum
+
+# TODO(atash): decide whether or not to move these enums to the _c module to
+# force build errors with upstream changes.
+
+class GrpcChannelArgumentKeys(enum.Enum):
+ """Mirrors keys used in grpc_channel_args for GRPC-specific arguments."""
+ SSL_TARGET_NAME_OVERRIDE = 'grpc.ssl_target_name_override'
+
+@enum.unique
+class CallError(enum.IntEnum):
+ """Mirrors grpc_call_error in the C core."""
+ OK = 0
+ ERROR = 1
+ ERROR_NOT_ON_SERVER = 2
+ ERROR_NOT_ON_CLIENT = 3
+ ERROR_ALREADY_ACCEPTED = 4
+ ERROR_ALREADY_INVOKED = 5
+ ERROR_NOT_INVOKED = 6
+ ERROR_ALREADY_FINISHED = 7
+ ERROR_TOO_MANY_OPERATIONS = 8
+ ERROR_INVALID_FLAGS = 9
+ ERROR_INVALID_METADATA = 10
+
+@enum.unique
+class StatusCode(enum.IntEnum):
+ """Mirrors grpc_status_code in the C core."""
+ OK = 0
+ CANCELLED = 1
+ UNKNOWN = 2
+ INVALID_ARGUMENT = 3
+ DEADLINE_EXCEEDED = 4
+ NOT_FOUND = 5
+ ALREADY_EXISTS = 6
+ PERMISSION_DENIED = 7
+ RESOURCE_EXHAUSTED = 8
+ FAILED_PRECONDITION = 9
+ ABORTED = 10
+ OUT_OF_RANGE = 11
+ UNIMPLEMENTED = 12
+ INTERNAL = 13
+ UNAVAILABLE = 14
+ DATA_LOSS = 15
+ UNAUTHENTICATED = 16
+
+@enum.unique
+class OpType(enum.IntEnum):
+ """Mirrors grpc_op_type in the C core."""
+ SEND_INITIAL_METADATA = 0
+ SEND_MESSAGE = 1
+ SEND_CLOSE_FROM_CLIENT = 2
+ SEND_STATUS_FROM_SERVER = 3
+ RECV_INITIAL_METADATA = 4
+ RECV_MESSAGE = 5
+ RECV_STATUS_ON_CLIENT = 6
+ RECV_CLOSE_ON_SERVER = 7
+
+@enum.unique
+class EventType(enum.IntEnum):
+ """Mirrors grpc_completion_type in the C core."""
+ QUEUE_SHUTDOWN = 0
+ QUEUE_TIMEOUT = 1 # if seen on the Python side, something went horridly wrong
+ OP_COMPLETE = 2
+
+class Status(collections.namedtuple(
+ 'Status', [
+ 'code',
+ 'details',
+ ])):
+ """The end status of a GRPC call.
+
+ Attributes:
+ code (StatusCode): ...
+ details (str): ...
+ """
+
+class CallDetails(collections.namedtuple(
+ 'CallDetails', [
+ 'method',
+ 'host',
+ 'deadline',
+ ])):
+ """Provides information to the server about the client's call.
+
+ Attributes:
+ method (str): ...
+ host (str): ...
+ deadline (float): ...
+ """
+
+class OpArgs(collections.namedtuple(
+ 'OpArgs', [
+ 'type',
+ 'initial_metadata',
+ 'trailing_metadata',
+ 'message',
+ 'status',
+ ])):
+ """Arguments passed into a GRPC operation.
+
+ Attributes:
+ type (OpType): ...
+ initial_metadata (sequence of 2-sequence of str): Only valid if type ==
+ OpType.SEND_INITIAL_METADATA, else is None.
+ trailing_metadata (sequence of 2-sequence of str): Only valid if type ==
+ OpType.SEND_STATUS_FROM_SERVER, else is None.
+ message (bytes): Only valid if type == OpType.SEND_MESSAGE, else is None.
+ status (Status): Only valid if type == OpType.SEND_STATUS_FROM_SERVER, else
+ is None.
+ """
+
+ @staticmethod
+ def send_initial_metadata(initial_metadata):
+ return OpArgs(OpType.SEND_INITIAL_METADATA, initial_metadata, None, None, None)
+
+ @staticmethod
+ def send_message(message):
+ return OpArgs(OpType.SEND_MESSAGE, None, None, message, None)
+
+ @staticmethod
+ def send_close_from_client():
+ return OpArgs(OpType.SEND_CLOSE_FROM_CLIENT, None, None, None, None)
+
+ @staticmethod
+ def send_status_from_server(trailing_metadata, status_code, status_details):
+ return OpArgs(OpType.SEND_STATUS_FROM_SERVER, None, trailing_metadata, None, Status(status_code, status_details))
+
+ @staticmethod
+ def recv_initial_metadata():
+ return OpArgs(OpType.RECV_INITIAL_METADATA, None, None, None, None);
+
+ @staticmethod
+ def recv_message():
+ return OpArgs(OpType.RECV_MESSAGE, None, None, None, None)
+
+ @staticmethod
+ def recv_status_on_client():
+ return OpArgs(OpType.RECV_STATUS_ON_CLIENT, None, None, None, None)
+
+ @staticmethod
+ def recv_close_on_server():
+ return OpArgs(OpType.RECV_CLOSE_ON_SERVER, None, None, None, None)
+
+
+class OpResult(collections.namedtuple(
+ 'OpResult', [
+ 'type',
+ 'initial_metadata',
+ 'trailing_metadata',
+ 'message',
+ 'status',
+ 'cancelled',
+ ])):
+ """Results received from a GRPC operation.
+
+ Attributes:
+ type (OpType): ...
+ initial_metadata (sequence of 2-sequence of str): Only valid if type ==
+ OpType.RECV_INITIAL_METADATA, else is None.
+ trailing_metadata (sequence of 2-sequence of str): Only valid if type ==
+ OpType.RECV_STATUS_ON_CLIENT, else is None.
+ message (bytes): Only valid if type == OpType.RECV_MESSAGE, else is None.
+ status (Status): Only valid if type == OpType.RECV_STATUS_ON_CLIENT, else
+ is None.
+ cancelled (bool): Only valid if type == OpType.RECV_CLOSE_ON_SERVER, else
+ is None.
+ """
+
+
+class Event(collections.namedtuple(
+ 'Event', [
+ 'type',
+ 'tag',
+ 'call',
+ 'call_details',
+ 'results',
+ 'success',
+ ])):
+ """An event received from a GRPC completion queue.
+
+ Attributes:
+ type (EventType): ...
+ tag (object): ...
+ call (Call): The Call object associated with this event (if there is one,
+ else None).
+ call_details (CallDetails): The call details associated with the
+ server-side call (if there is such information, else None).
+ results (list of OpResult): ...
+ success (bool): ...
+ """
+
+
+class CompletionQueue:
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def __init__(self):
+ pass
+
+ def __iter__(self):
+ """This class may be iterated over.
+
+ This is the equivalent of calling next() repeatedly with an absolute
+ deadline of None (i.e. no deadline).
+ """
+ return self
+
+ @abc.abstractmethod
+ def next(self, deadline=float('+inf')):
+ """Get the next event on this completion queue.
+
+ Args:
+ deadline (float): absolute deadline in seconds from the Python epoch, or
+ None for no deadline.
+
+ Returns:
+ Event: ...
+ """
+ pass
+
+ @abc.abstractmethod
+ def shutdown(self):
+ """Begin the shutdown process of this completion queue.
+
+ Note that this does not immediately destroy the completion queue.
+ Nevertheless, user code should not pass it around after invoking this.
+ """
+ return None
+
+
+class Call:
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def start_batch(self, ops, tag):
+ """Start a batch of operations.
+
+ Args:
+ ops (sequence of OpArgs): ...
+ tag (object): ...
+
+ Returns:
+ CallError: ...
+ """
+ return CallError.ERROR
+
+ @abc.abstractmethod
+ def cancel(self, code=None, details=None):
+ """Cancel the call.
+
+ Args:
+ code (int): Status code to cancel with (on the server side). If
+ specified, so must `details`.
+ details (str): Status details to cancel with (on the server side). If
+ specified, so must `code`.
+
+ Returns:
+ CallError: ...
+ """
+ return CallError.ERROR
+
+
+class Channel:
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def __init__(self, target, args, credentials=None):
+ """Initialize a Channel.
+
+ Args:
+ target (str): ...
+ args (sequence of 2-sequence of str, (str|integer)): ...
+ credentials (ClientCredentials): If None, create an insecure channel,
+ else create a secure channel using the client credentials.
+ """
+
+ @abc.abstractmethod
+ def create_call(self, completion_queue, method, host, deadline=float('+inf')):
+ """Create a call from this channel.
+
+ Args:
+ completion_queue (CompletionQueue): ...
+ method (str): ...
+ host (str): ...
+ deadline (float): absolute deadline in seconds from the Python epoch, or
+ None for no deadline.
+
+ Returns:
+ Call: call object associated with this Channel and passed parameters.
+ """
+ return None
+
+
+class Server:
+ __metaclass__ = abc.ABCMeta
+
+ @abc.abstractmethod
+ def __init__(self, completion_queue, args):
+ """Initialize a server.
+
+ Args:
+ completion_queue (CompletionQueue): ...
+ args (sequence of 2-sequence of str, (str|integer)): ...
+ """
+
+ @abc.abstractmethod
+ def add_http2_port(self, address, credentials=None):
+ """Adds an HTTP/2 address+port to the server.
+
+ Args:
+ address (str): ...
+ credentials (ServerCredentials): If None, create an insecure port, else
+ create a secure port using the server credentials.
+ """
+
+ @abc.abstractmethod
+ def start(self):
+ """Starts the server."""
+
+ @abc.abstractmethod
+ def shutdown(self, tag=None):
+ """Shuts down the server. Does not immediately destroy the server.
+
+ Args:
+ tag (object): if not None, have the server place an event on its
+ completion queue notifying it when this server has completely shut down.
+ """
+
+ @abc.abstractmethod
+ def request_call(self, completion_queue, tag):
+ """Requests a call from the server on the server's completion queue.
+
+ Args:
+ completion_queue (CompletionQueue): Completion queue for the call. May be
+ the same as the server's completion queue.
+ tag (object) ...
+ """
diff --git a/src/python/src/grpc/_adapter/fore.py b/src/python/src/grpc/_adapter/fore.py
index 69e145e3f6..7d88bda263 100644
--- a/src/python/src/grpc/_adapter/fore.py
+++ b/src/python/src/grpc/_adapter/fore.py
@@ -35,7 +35,7 @@ import threading
import time
from grpc._adapter import _common
-from grpc._adapter import _low
+from grpc._adapter import _intermediary_low as _low
from grpc.framework.base import interfaces as base_interfaces
from grpc.framework.base import null
from grpc.framework.foundation import activated
@@ -204,7 +204,7 @@ class ForeLink(base_interfaces.ForeLink, activated.Activated):
call, sequence_number,
base_interfaces.FrontToBackTicket.Kind.CANCELLATION, None, None,
None, None, None)
- elif code is _low.Code.EXPIRED:
+ elif code is _low.Code.DEADLINE_EXCEEDED:
ticket = base_interfaces.FrontToBackTicket(
call, sequence_number,
base_interfaces.FrontToBackTicket.Kind.EXPIRATION, None, None, None,
diff --git a/src/python/src/grpc/_adapter/rear.py b/src/python/src/grpc/_adapter/rear.py
index b3b0b4ed32..fd6f45f7a7 100644
--- a/src/python/src/grpc/_adapter/rear.py
+++ b/src/python/src/grpc/_adapter/rear.py
@@ -35,7 +35,7 @@ import threading
import time
from grpc._adapter import _common
-from grpc._adapter import _low
+from grpc._adapter import _intermediary_low as _low
from grpc.framework.base import interfaces as base_interfaces
from grpc.framework.base import null
from grpc.framework.foundation import activated
@@ -195,7 +195,7 @@ class RearLink(base_interfaces.RearLink, activated.Activated):
kind = base_interfaces.BackToFrontTicket.Kind.COMPLETION
elif event.status.code is _low.Code.CANCELLED:
kind = base_interfaces.BackToFrontTicket.Kind.CANCELLATION
- elif event.status.code is _low.Code.EXPIRED:
+ elif event.status.code is _low.Code.DEADLINE_EXCEEDED:
kind = base_interfaces.BackToFrontTicket.Kind.EXPIRATION
else:
kind = base_interfaces.BackToFrontTicket.Kind.TRANSMISSION_FAILURE
diff --git a/src/python/src/setup.py b/src/python/src/setup.py
index d0f4791a1e..dc655a70f9 100644
--- a/src/python/src/setup.py
+++ b/src/python/src/setup.py
@@ -34,15 +34,15 @@ import setuptools
import sys
_EXTENSION_SOURCES = (
- 'grpc/_adapter/_c.c',
- 'grpc/_adapter/_call.c',
- 'grpc/_adapter/_channel.c',
- 'grpc/_adapter/_completion_queue.c',
- 'grpc/_adapter/_error.c',
- 'grpc/_adapter/_server.c',
- 'grpc/_adapter/_client_credentials.c',
- 'grpc/_adapter/_server_credentials.c',
- 'grpc/_adapter/_tag.c'
+ 'grpc/_adapter/_c/module.c',
+ 'grpc/_adapter/_c/types.c',
+ 'grpc/_adapter/_c/utility.c',
+ 'grpc/_adapter/_c/types/client_credentials.c',
+ 'grpc/_adapter/_c/types/server_credentials.c',
+ 'grpc/_adapter/_c/types/completion_queue.c',
+ 'grpc/_adapter/_c/types/call.c',
+ 'grpc/_adapter/_c/types/channel.c',
+ 'grpc/_adapter/_c/types/server.c',
)
_EXTENSION_INCLUDE_DIRECTORIES = (