aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Masood Malekghassemi <soltanmm@users.noreply.github.com>2015-04-27 14:36:45 -0700
committerGravatar Masood Malekghassemi <soltanmm@users.noreply.github.com>2015-04-28 10:14:59 -0700
commitf579e1d1760c509fed1ca7e07f2bb9af826ab505 (patch)
tree181b5b1122a3af5740d0cdd4d3a78348d11cf5a8 /src
parentcac5f1d53248a81594779186765c746bc9b89a41 (diff)
Migrate Python to batch core API
Diffstat (limited to 'src')
-rw-r--r--src/python/src/grpc/_adapter/_c_test.py5
-rw-r--r--src/python/src/grpc/_adapter/_call.c237
-rw-r--r--src/python/src/grpc/_adapter/_call.h28
-rw-r--r--src/python/src/grpc/_adapter/_completion_queue.c171
-rw-r--r--src/python/src/grpc/_adapter/_low_test.py8
-rw-r--r--src/python/src/grpc/_adapter/_server.c20
-rw-r--r--src/python/src/grpc/_adapter/_server.h4
-rw-r--r--src/python/src/grpc/_adapter/_tag.c65
-rw-r--r--src/python/src/grpc/_adapter/_tag.h70
-rw-r--r--src/python/src/grpc/_adapter/rear.py2
-rw-r--r--src/python/src/setup.py1
11 files changed, 487 insertions, 124 deletions
diff --git a/src/python/src/grpc/_adapter/_c_test.py b/src/python/src/grpc/_adapter/_c_test.py
index 437a6730cd..6e15adbda8 100644
--- a/src/python/src/grpc/_adapter/_c_test.py
+++ b/src/python/src/grpc/_adapter/_c_test.py
@@ -83,8 +83,11 @@ class _CTest(unittest.TestCase):
_c.init()
channel = _c.Channel('%s:%d' % (host, 12345), None)
- call = _c.Call(channel, method, host, time.time() + _TIMEOUT)
+ completion_queue = _c.CompletionQueue()
+ call = _c.Call(channel, completion_queue, method, host,
+ time.time() + _TIMEOUT)
del call
+ del completion_queue
del channel
_c.shut_down()
diff --git a/src/python/src/grpc/_adapter/_call.c b/src/python/src/grpc/_adapter/_call.c
index bf96c1a3fa..d833268fc9 100644
--- a/src/python/src/grpc/_adapter/_call.c
+++ b/src/python/src/grpc/_adapter/_call.c
@@ -36,90 +36,166 @@
#include <math.h>
#include <Python.h>
#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
#include "grpc/_adapter/_channel.h"
#include "grpc/_adapter/_completion_queue.h"
#include "grpc/_adapter/_error.h"
+#include "grpc/_adapter/_tag.h"
-static int pygrpc_call_init(Call *self, PyObject *args, PyObject *kwds) {
- const PyObject *channel;
+static PyObject *pygrpc_call_new(PyTypeObject *type, PyObject *args, PyObject *kwds) {
+ Call *self = (Call *)type->tp_alloc(type, 0);
+ Channel *channel;
+ CompletionQueue *completion_queue;
const char *method;
const char *host;
double deadline;
- static char *kwlist[] = {"channel", "method", "host", "deadline", NULL};
-
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "O!ssd:Call", kwlist,
- &pygrpc_ChannelType, &channel, &method,
- &host, &deadline)) {
- return -1;
+ static char *kwlist[] = {"channel", "completion_queue",
+ "method", "host", "deadline", NULL};
+
+ if (!PyArg_ParseTupleAndKeywords(
+ args, kwds, "O!O!ssd:Call", kwlist,
+ &pygrpc_ChannelType, &channel,
+ &pygrpc_CompletionQueueType, &completion_queue,
+ &method, &host, &deadline)) {
+ return NULL;
}
/* TODO(nathaniel): Hoist the gpr_timespec <-> PyFloat arithmetic into its own
* function with its own test coverage.
*/
- self->c_call = grpc_channel_create_call_old(
- ((Channel *)channel)->c_channel, method, host,
+ self->c_call = grpc_channel_create_call(
+ channel->c_channel, completion_queue->c_completion_queue, method, host,
gpr_time_from_nanos(deadline * GPR_NS_PER_SEC));
-
- return 0;
+ self->completion_queue = completion_queue;
+ Py_INCREF(self->completion_queue);
+ self->channel = channel;
+ Py_INCREF(self->channel);
+ grpc_call_details_init(&self->call_details);
+ grpc_metadata_array_init(&self->recv_metadata);
+ grpc_metadata_array_init(&self->recv_trailing_metadata);
+ self->send_metadata = NULL;
+ self->send_metadata_count = 0;
+ self->send_trailing_metadata = NULL;
+ self->send_trailing_metadata_count = 0;
+ self->send_message = NULL;
+ self->recv_message = NULL;
+ self->adding_to_trailing = 0;
+
+ return (PyObject *)self;
}
static void pygrpc_call_dealloc(Call *self) {
if (self->c_call != NULL) {
grpc_call_destroy(self->c_call);
}
+ Py_XDECREF(self->completion_queue);
+ Py_XDECREF(self->channel);
+ Py_XDECREF(self->server);
+ grpc_call_details_destroy(&self->call_details);
+ grpc_metadata_array_destroy(&self->recv_metadata);
+ grpc_metadata_array_destroy(&self->recv_trailing_metadata);
+ if (self->send_message) {
+ grpc_byte_buffer_destroy(self->send_message);
+ }
+ if (self->recv_message) {
+ grpc_byte_buffer_destroy(self->recv_message);
+ }
+ gpr_free(self->status_details);
+ gpr_free(self->send_metadata);
+ gpr_free(self->send_trailing_metadata);
self->ob_type->tp_free((PyObject *)self);
}
static const PyObject *pygrpc_call_invoke(Call *self, PyObject *args) {
- const PyObject *completion_queue;
- const PyObject *metadata_tag;
- const PyObject *finish_tag;
+ PyObject *completion_queue;
+ PyObject *metadata_tag;
+ PyObject *finish_tag;
grpc_call_error call_error;
const PyObject *result;
+ pygrpc_tag *c_init_metadata_tag;
+ pygrpc_tag *c_metadata_tag;
+ pygrpc_tag *c_finish_tag;
+ grpc_op send_initial_metadata;
+ grpc_op recv_initial_metadata;
+ grpc_op recv_status_on_client;
if (!(PyArg_ParseTuple(args, "O!OO:invoke", &pygrpc_CompletionQueueType,
&completion_queue, &metadata_tag, &finish_tag))) {
return NULL;
}
-
- call_error = grpc_call_invoke_old(
- self->c_call, ((CompletionQueue *)completion_queue)->c_completion_queue,
- (void *)metadata_tag, (void *)finish_tag, 0);
-
+ send_initial_metadata.op = GRPC_OP_SEND_INITIAL_METADATA;
+ send_initial_metadata.data.send_initial_metadata.metadata = self->send_metadata;
+ send_initial_metadata.data.send_initial_metadata.count = self->send_metadata_count;
+ recv_initial_metadata.op = GRPC_OP_RECV_INITIAL_METADATA;
+ recv_initial_metadata.data.recv_initial_metadata = &self->recv_metadata;
+ recv_status_on_client.op = GRPC_OP_RECV_STATUS_ON_CLIENT;
+ recv_status_on_client.data.recv_status_on_client.trailing_metadata = &self->recv_trailing_metadata;
+ recv_status_on_client.data.recv_status_on_client.status = &self->status;
+ recv_status_on_client.data.recv_status_on_client.status_details = &self->status_details;
+ recv_status_on_client.data.recv_status_on_client.status_details_capacity = &self->status_details_capacity;
+ c_init_metadata_tag = pygrpc_tag_new(PYGRPC_INITIAL_METADATA, NULL, self);
+ c_metadata_tag = pygrpc_tag_new(PYGRPC_CLIENT_METADATA_READ, metadata_tag, self);
+ c_finish_tag = pygrpc_tag_new(PYGRPC_FINISHED_CLIENT, finish_tag, self);
+
+ call_error = grpc_call_start_batch(self->c_call, &send_initial_metadata, 1, c_init_metadata_tag);
+ result = pygrpc_translate_call_error(call_error);
+ if (result == NULL) {
+ pygrpc_tag_destroy(c_init_metadata_tag);
+ pygrpc_tag_destroy(c_metadata_tag);
+ pygrpc_tag_destroy(c_finish_tag);
+ return result;
+ }
+ call_error = grpc_call_start_batch(self->c_call, &recv_initial_metadata, 1, c_metadata_tag);
+ result = pygrpc_translate_call_error(call_error);
+ if (result == NULL) {
+ pygrpc_tag_destroy(c_metadata_tag);
+ pygrpc_tag_destroy(c_finish_tag);
+ return result;
+ }
+ call_error = grpc_call_start_batch(self->c_call, &recv_status_on_client, 1, c_finish_tag);
result = pygrpc_translate_call_error(call_error);
- if (result != NULL) {
- Py_INCREF(metadata_tag);
- Py_INCREF(finish_tag);
+ if (result == NULL) {
+ pygrpc_tag_destroy(c_finish_tag);
+ return result;
}
+
return result;
}
static const PyObject *pygrpc_call_write(Call *self, PyObject *args) {
const char *bytes;
int length;
- const PyObject *tag;
+ PyObject *tag;
gpr_slice slice;
grpc_byte_buffer *byte_buffer;
grpc_call_error call_error;
const PyObject *result;
+ pygrpc_tag *c_tag;
+ grpc_op op;
if (!(PyArg_ParseTuple(args, "s#O:write", &bytes, &length, &tag))) {
return NULL;
}
+ c_tag = pygrpc_tag_new(PYGRPC_WRITE_ACCEPTED, tag, self);
slice = gpr_slice_from_copied_buffer(bytes, length);
byte_buffer = grpc_byte_buffer_create(&slice, 1);
gpr_slice_unref(slice);
- call_error =
- grpc_call_start_write_old(self->c_call, byte_buffer, (void *)tag, 0);
+ if (self->send_message) {
+ grpc_byte_buffer_destroy(self->send_message);
+ }
+ self->send_message = byte_buffer;
+
+ op.op = GRPC_OP_SEND_MESSAGE;
+ op.data.send_message = self->send_message;
- grpc_byte_buffer_destroy(byte_buffer);
+ call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag);
result = pygrpc_translate_call_error(call_error);
- if (result != NULL) {
- Py_INCREF(tag);
+ if (result == NULL) {
+ pygrpc_tag_destroy(c_tag);
}
return result;
}
@@ -127,36 +203,42 @@ static const PyObject *pygrpc_call_write(Call *self, PyObject *args) {
static const PyObject *pygrpc_call_complete(Call *self, PyObject *tag) {
grpc_call_error call_error;
const PyObject *result;
+ pygrpc_tag *c_tag = pygrpc_tag_new(PYGRPC_FINISH_ACCEPTED, tag, self);
+ grpc_op op;
- call_error = grpc_call_writes_done_old(self->c_call, (void *)tag);
+ op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
+
+ call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag);
result = pygrpc_translate_call_error(call_error);
- if (result != NULL) {
- Py_INCREF(tag);
+ if (result == NULL) {
+ pygrpc_tag_destroy(c_tag);
}
return result;
}
static const PyObject *pygrpc_call_accept(Call *self, PyObject *args) {
- const PyObject *completion_queue;
- const PyObject *tag;
+ PyObject *completion_queue;
+ PyObject *tag;
grpc_call_error call_error;
const PyObject *result;
+ pygrpc_tag *c_tag;
+ grpc_op op;
if (!(PyArg_ParseTuple(args, "O!O:accept", &pygrpc_CompletionQueueType,
&completion_queue, &tag))) {
return NULL;
}
- call_error = grpc_call_server_accept_old(
- self->c_call, ((CompletionQueue *)completion_queue)->c_completion_queue,
- (void *)tag);
- result = pygrpc_translate_call_error(call_error);
+ op.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
+ op.data.recv_close_on_server.cancelled = &self->cancelled;
+ c_tag = pygrpc_tag_new(PYGRPC_FINISHED_SERVER, tag, self);
- if (result != NULL) {
- Py_INCREF(tag);
+ call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag);
+ result = pygrpc_translate_call_error(call_error);
+ if (result == NULL) {
+ pygrpc_tag_destroy(c_tag);
}
-
return result;
}
@@ -171,24 +253,52 @@ static const PyObject *pygrpc_call_add_metadata(Call *self, PyObject *args) {
metadata.key = key;
metadata.value = value;
metadata.value_length = value_length;
- return pygrpc_translate_call_error(
- grpc_call_add_metadata_old(self->c_call, &metadata, 0));
+ if (self->adding_to_trailing) {
+ self->send_trailing_metadata = gpr_realloc(self->send_trailing_metadata, (self->send_trailing_metadata_count + 1) * sizeof(grpc_metadata));
+ self->send_trailing_metadata[self->send_trailing_metadata_count] = metadata;
+ self->send_trailing_metadata_count = self->send_trailing_metadata_count + 1;
+ } else {
+ self->send_metadata = gpr_realloc(self->send_metadata, (self->send_metadata_count + 1) * sizeof(grpc_metadata));
+ self->send_metadata[self->send_metadata_count] = metadata;
+ self->send_metadata_count = self->send_metadata_count + 1;
+ }
+ return pygrpc_translate_call_error(GRPC_CALL_OK);
}
static const PyObject *pygrpc_call_premetadata(Call *self) {
- return pygrpc_translate_call_error(
- grpc_call_server_end_initial_metadata_old(self->c_call, 0));
+ grpc_op op;
+ grpc_call_error call_error;
+ const PyObject *result;
+ pygrpc_tag *c_tag = pygrpc_tag_new(PYGRPC_INITIAL_METADATA, NULL, self);
+ op.op = GRPC_OP_SEND_INITIAL_METADATA;
+ op.data.send_initial_metadata.metadata = self->send_metadata;
+ op.data.send_initial_metadata.count = self->send_metadata_count;
+ self->adding_to_trailing = 1;
+
+ call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag);
+ result = pygrpc_translate_call_error(call_error);
+ if (result == NULL) {
+ pygrpc_tag_destroy(c_tag);
+ }
+ return result;
}
static const PyObject *pygrpc_call_read(Call *self, PyObject *tag) {
+ grpc_op op;
grpc_call_error call_error;
const PyObject *result;
+ pygrpc_tag *c_tag = pygrpc_tag_new(PYGRPC_READ, tag, self);
- call_error = grpc_call_start_read_old(self->c_call, (void *)tag);
-
+ op.op = GRPC_OP_RECV_MESSAGE;
+ if (self->recv_message) {
+ grpc_byte_buffer_destroy(self->recv_message);
+ self->recv_message = NULL;
+ }
+ op.data.recv_message = &self->recv_message;
+ call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag);
result = pygrpc_translate_call_error(call_error);
- if (result != NULL) {
- Py_INCREF(tag);
+ if (result == NULL) {
+ pygrpc_tag_destroy(c_tag);
}
return result;
}
@@ -197,15 +307,18 @@ static const PyObject *pygrpc_call_status(Call *self, PyObject *args) {
PyObject *status;
PyObject *code;
PyObject *details;
- const PyObject *tag;
+ PyObject *tag;
grpc_status_code c_code;
char *c_message;
grpc_call_error call_error;
const PyObject *result;
+ pygrpc_tag *c_tag;
+ grpc_op op;
if (!(PyArg_ParseTuple(args, "OO:status", &status, &tag))) {
return NULL;
}
+ c_tag = pygrpc_tag_new(PYGRPC_FINISH_ACCEPTED, tag, self);
code = PyObject_GetAttrString(status, "code");
if (code == NULL) {
@@ -227,13 +340,21 @@ static const PyObject *pygrpc_call_status(Call *self, PyObject *args) {
if (c_message == NULL) {
return NULL;
}
-
- call_error = grpc_call_start_write_status_old(self->c_call, c_code, c_message,
- (void *)tag);
-
+ if (self->status_details) {
+ gpr_free(self->status_details);
+ }
+ self->status_details = gpr_malloc(strlen(c_message)+1);
+ strcpy(self->status_details, c_message);
+ op.op = GRPC_OP_SEND_STATUS_FROM_SERVER;
+ op.data.send_status_from_server.trailing_metadata_count = self->send_trailing_metadata_count;
+ op.data.send_status_from_server.trailing_metadata = self->send_trailing_metadata;
+ op.data.send_status_from_server.status = c_code;
+ op.data.send_status_from_server.status_details = self->status_details;
+
+ call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag);
result = pygrpc_translate_call_error(call_error);
- if (result != NULL) {
- Py_INCREF(tag);
+ if (result == NULL) {
+ pygrpc_tag_destroy(c_tag);
}
return result;
}
@@ -301,9 +422,9 @@ PyTypeObject pygrpc_CallType = {
0, /* tp_descr_get */
0, /* tp_descr_set */
0, /* tp_dictoffset */
- (initproc)pygrpc_call_init, /* tp_init */
+ 0, /* tp_init */
0, /* tp_alloc */
- PyType_GenericNew, /* tp_new */
+ pygrpc_call_new, /* tp_new */
};
int pygrpc_add_call(PyObject *module) {
diff --git a/src/python/src/grpc/_adapter/_call.h b/src/python/src/grpc/_adapter/_call.h
index c04a2285f7..fb9160901b 100644
--- a/src/python/src/grpc/_adapter/_call.h
+++ b/src/python/src/grpc/_adapter/_call.h
@@ -37,8 +37,36 @@
#include <Python.h>
#include <grpc/grpc.h>
+#include "grpc/_adapter/_completion_queue.h"
+#include "grpc/_adapter/_channel.h"
+#include "grpc/_adapter/_server.h"
+
typedef struct {
PyObject_HEAD
+
+ CompletionQueue *completion_queue;
+ Channel *channel;
+ Server *server;
+
+ /* Legacy state. */
+ grpc_call_details call_details;
+ grpc_metadata_array recv_metadata;
+ grpc_metadata_array recv_trailing_metadata;
+ grpc_metadata *send_metadata;
+ size_t send_metadata_count;
+ grpc_metadata *send_trailing_metadata;
+ size_t send_trailing_metadata_count;
+ int adding_to_trailing;
+
+ grpc_byte_buffer *send_message;
+ grpc_byte_buffer *recv_message;
+
+ grpc_status_code status;
+ char *status_details;
+ size_t status_details_capacity;
+
+ int cancelled;
+
grpc_call *c_call;
} Call;
diff --git a/src/python/src/grpc/_adapter/_completion_queue.c b/src/python/src/grpc/_adapter/_completion_queue.c
index a639eff53e..5f1cb2a3a6 100644
--- a/src/python/src/grpc/_adapter/_completion_queue.c
+++ b/src/python/src/grpc/_adapter/_completion_queue.c
@@ -38,6 +38,7 @@
#include <grpc/support/alloc.h>
#include "grpc/_adapter/_call.h"
+#include "grpc/_adapter/_tag.h"
static PyObject *status_class;
static PyObject *service_acceptance_class;
@@ -138,74 +139,70 @@ static PyObject *pygrpc_stop_event_args(grpc_event *c_event) {
}
static PyObject *pygrpc_write_event_args(grpc_event *c_event) {
- PyObject *write_accepted =
- c_event->data.write_accepted == GRPC_OP_OK ? Py_True : Py_False;
- return PyTuple_Pack(8, write_event_kind, (PyObject *)c_event->tag,
+ pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag);
+ PyObject *user_tag = tag->user_tag;
+ PyObject *write_accepted = Py_True;
+ return PyTuple_Pack(8, write_event_kind, user_tag,
write_accepted, Py_None, Py_None, Py_None, Py_None,
Py_None);
}
static PyObject *pygrpc_complete_event_args(grpc_event *c_event) {
- PyObject *complete_accepted =
- c_event->data.finish_accepted == GRPC_OP_OK ? Py_True : Py_False;
- return PyTuple_Pack(8, complete_event_kind, (PyObject *)c_event->tag,
+ pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag);
+ PyObject *user_tag = tag->user_tag;
+ PyObject *complete_accepted = Py_True;
+ return PyTuple_Pack(8, complete_event_kind, user_tag,
Py_None, complete_accepted, Py_None, Py_None, Py_None,
Py_None);
}
static PyObject *pygrpc_service_event_args(grpc_event *c_event) {
- if (c_event->data.server_rpc_new.method == NULL) {
+ pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag);
+ PyObject *user_tag = tag->user_tag;
+ if (tag->call->call_details.method == NULL) {
return PyTuple_Pack(
- 8, service_event_kind, c_event->tag, Py_None, Py_None, Py_None, Py_None,
+ 8, service_event_kind, user_tag, Py_None, Py_None, Py_None, Py_None,
Py_None, Py_None);
} else {
PyObject *method = NULL;
PyObject *host = NULL;
PyObject *service_deadline = NULL;
- Call *call = NULL;
PyObject *service_acceptance = NULL;
PyObject *metadata = NULL;
PyObject *event_args = NULL;
- method = PyBytes_FromString(c_event->data.server_rpc_new.method);
+ method = PyBytes_FromString(tag->call->call_details.method);
if (method == NULL) {
goto error;
}
- host = PyBytes_FromString(c_event->data.server_rpc_new.host);
+ host = PyBytes_FromString(tag->call->call_details.host);
if (host == NULL) {
goto error;
}
service_deadline =
- pygrpc_as_py_time(&c_event->data.server_rpc_new.deadline);
+ pygrpc_as_py_time(&tag->call->call_details.deadline);
if (service_deadline == NULL) {
goto error;
}
- call = PyObject_New(Call, &pygrpc_CallType);
- if (call == NULL) {
- goto error;
- }
- call->c_call = c_event->call;
-
service_acceptance =
- PyObject_CallFunctionObjArgs(service_acceptance_class, call, method,
- host, service_deadline, NULL);
+ PyObject_CallFunctionObjArgs(service_acceptance_class, tag->call,
+ method, host, service_deadline, NULL);
if (service_acceptance == NULL) {
goto error;
}
metadata = pygrpc_metadata_collection_get(
- c_event->data.server_rpc_new.metadata_elements,
- c_event->data.server_rpc_new.metadata_count);
+ tag->call->recv_metadata.metadata,
+ tag->call->recv_metadata.count);
event_args = PyTuple_Pack(8, service_event_kind,
- (PyObject *)c_event->tag, Py_None, Py_None,
+ user_tag, Py_None, Py_None,
service_acceptance, Py_None, Py_None,
metadata);
Py_DECREF(service_acceptance);
Py_DECREF(metadata);
error:
- Py_XDECREF(call);
Py_XDECREF(method);
Py_XDECREF(host);
Py_XDECREF(service_deadline);
@@ -215,8 +212,10 @@ error:
}
static PyObject *pygrpc_read_event_args(grpc_event *c_event) {
- if (c_event->data.read == NULL) {
- return PyTuple_Pack(8, read_event_kind, (PyObject *)c_event->tag,
+ pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag);
+ PyObject *user_tag = tag->user_tag;
+ if (tag->call->recv_message == NULL) {
+ return PyTuple_Pack(8, read_event_kind, user_tag,
Py_None, Py_None, Py_None, Py_None, Py_None, Py_None);
} else {
size_t length;
@@ -227,8 +226,8 @@ static PyObject *pygrpc_read_event_args(grpc_event *c_event) {
PyObject *bytes;
PyObject *event_args;
- length = grpc_byte_buffer_length(c_event->data.read);
- reader = grpc_byte_buffer_reader_create(c_event->data.read);
+ length = grpc_byte_buffer_length(tag->call->recv_message);
+ reader = grpc_byte_buffer_reader_create(tag->call->recv_message);
c_bytes = gpr_malloc(length);
offset = 0;
while (grpc_byte_buffer_reader_next(reader, &slice)) {
@@ -242,7 +241,7 @@ static PyObject *pygrpc_read_event_args(grpc_event *c_event) {
if (bytes == NULL) {
return NULL;
}
- event_args = PyTuple_Pack(8, read_event_kind, (PyObject *)c_event->tag,
+ event_args = PyTuple_Pack(8, read_event_kind, user_tag,
Py_None, Py_None, Py_None, bytes, Py_None,
Py_None);
Py_DECREF(bytes);
@@ -251,32 +250,65 @@ static PyObject *pygrpc_read_event_args(grpc_event *c_event) {
}
static PyObject *pygrpc_metadata_event_args(grpc_event *c_event) {
+ pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag);
+ PyObject *user_tag = tag->user_tag;
PyObject *metadata = pygrpc_metadata_collection_get(
- c_event->data.client_metadata_read.elements,
- c_event->data.client_metadata_read.count);
+ tag->call->recv_metadata.metadata,
+ tag->call->recv_metadata.count);
PyObject* result = PyTuple_Pack(
- 8, metadata_event_kind, (PyObject *)c_event->tag, Py_None, Py_None,
+ 8, metadata_event_kind, user_tag, Py_None, Py_None,
Py_None, Py_None, Py_None, metadata);
Py_DECREF(metadata);
return result;
}
-static PyObject *pygrpc_finished_event_args(grpc_event *c_event) {
+static PyObject *pygrpc_finished_server_event_args(grpc_event *c_event) {
+ PyObject *code;
+ PyObject *details;
+ PyObject *status;
+ PyObject *event_args;
+ pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag);
+ PyObject *user_tag = tag->user_tag;
+
+ code = pygrpc_status_code(tag->call->cancelled ? GRPC_STATUS_CANCELLED : GRPC_STATUS_OK);
+ if (code == NULL) {
+ PyErr_SetString(PyExc_RuntimeError, "Unrecognized status code!");
+ return NULL;
+ }
+ details = PyBytes_FromString("");
+ if (details == NULL) {
+ return NULL;
+ }
+ status = PyObject_CallFunctionObjArgs(status_class, code, details, NULL);
+ Py_DECREF(details);
+ if (status == NULL) {
+ return NULL;
+ }
+ event_args = PyTuple_Pack(8, finish_event_kind, user_tag,
+ Py_None, Py_None, Py_None, Py_None, status,
+ Py_None);
+ Py_DECREF(status);
+ return event_args;
+}
+
+static PyObject *pygrpc_finished_client_event_args(grpc_event *c_event) {
PyObject *code;
PyObject *details;
PyObject *status;
PyObject *event_args;
PyObject *metadata;
+ pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag);
+ PyObject *user_tag = tag->user_tag;
- code = pygrpc_status_code(c_event->data.finished.status);
+ code = pygrpc_status_code(tag->call->status);
if (code == NULL) {
PyErr_SetString(PyExc_RuntimeError, "Unrecognized status code!");
return NULL;
}
- if (c_event->data.finished.details == NULL) {
+ if (tag->call->status_details == NULL) {
details = PyBytes_FromString("");
} else {
- details = PyBytes_FromString(c_event->data.finished.details);
+ details = PyBytes_FromString(tag->call->status_details);
}
if (details == NULL) {
return NULL;
@@ -287,9 +319,9 @@ static PyObject *pygrpc_finished_event_args(grpc_event *c_event) {
return NULL;
}
metadata = pygrpc_metadata_collection_get(
- c_event->data.finished.metadata_elements,
- c_event->data.finished.metadata_count);
- event_args = PyTuple_Pack(8, finish_event_kind, (PyObject *)c_event->tag,
+ tag->call->recv_trailing_metadata.metadata,
+ tag->call->recv_trailing_metadata.count);
+ event_args = PyTuple_Pack(8, finish_event_kind, user_tag,
Py_None, Py_None, Py_None, Py_None, status,
metadata);
Py_DECREF(status);
@@ -348,28 +380,51 @@ static PyObject *pygrpc_completion_queue_get(CompletionQueue *self,
Py_RETURN_NONE;
}
+ pygrpc_tag *tag = (pygrpc_tag *)c_event->tag;
+
switch (c_event->type) {
case GRPC_QUEUE_SHUTDOWN:
event_args = pygrpc_stop_event_args(c_event);
break;
- case GRPC_WRITE_ACCEPTED:
- event_args = pygrpc_write_event_args(c_event);
- break;
- case GRPC_FINISH_ACCEPTED:
- event_args = pygrpc_complete_event_args(c_event);
- break;
- case GRPC_SERVER_RPC_NEW:
- event_args = pygrpc_service_event_args(c_event);
- break;
- case GRPC_READ:
- event_args = pygrpc_read_event_args(c_event);
- break;
- case GRPC_CLIENT_METADATA_READ:
- event_args = pygrpc_metadata_event_args(c_event);
- break;
- case GRPC_FINISHED:
- event_args = pygrpc_finished_event_args(c_event);
+ case GRPC_OP_COMPLETE: {
+ if (!tag) {
+ PyErr_SetString(PyExc_Exception, "Unrecognized event type!");
+ return NULL;
+ }
+ switch (tag->type) {
+ case PYGRPC_INITIAL_METADATA:
+ if (tag) {
+ pygrpc_tag_destroy(tag);
+ }
+ grpc_event_finish(c_event);
+ return pygrpc_completion_queue_get(self, args);
+ case PYGRPC_WRITE_ACCEPTED:
+ event_args = pygrpc_write_event_args(c_event);
+ break;
+ case PYGRPC_FINISH_ACCEPTED:
+ event_args = pygrpc_complete_event_args(c_event);
+ break;
+ case PYGRPC_SERVER_RPC_NEW:
+ event_args = pygrpc_service_event_args(c_event);
+ break;
+ case PYGRPC_READ:
+ event_args = pygrpc_read_event_args(c_event);
+ break;
+ case PYGRPC_CLIENT_METADATA_READ:
+ event_args = pygrpc_metadata_event_args(c_event);
+ break;
+ case PYGRPC_FINISHED_CLIENT:
+ event_args = pygrpc_finished_client_event_args(c_event);
+ break;
+ case PYGRPC_FINISHED_SERVER:
+ event_args = pygrpc_finished_server_event_args(c_event);
+ break;
+ default:
+ PyErr_SetString(PyExc_Exception, "Unrecognized op event type!");
+ return NULL;
+ }
break;
+ }
default:
PyErr_SetString(PyExc_Exception, "Unrecognized event type!");
return NULL;
@@ -382,7 +437,9 @@ static PyObject *pygrpc_completion_queue_get(CompletionQueue *self,
event = PyObject_CallObject(event_class, event_args);
Py_DECREF(event_args);
- Py_XDECREF((PyObject *)c_event->tag);
+ if (tag) {
+ pygrpc_tag_destroy(tag);
+ }
grpc_event_finish(c_event);
return event;
diff --git a/src/python/src/grpc/_adapter/_low_test.py b/src/python/src/grpc/_adapter/_low_test.py
index 826c586a1e..09c4660a2b 100644
--- a/src/python/src/grpc/_adapter/_low_test.py
+++ b/src/python/src/grpc/_adapter/_low_test.py
@@ -56,7 +56,7 @@ class LonelyClientTest(unittest.TestCase):
completion_queue = _low.CompletionQueue()
channel = _low.Channel('%s:%d' % (host, port), None)
- client_call = _low.Call(channel, method, host, deadline)
+ client_call = _low.Call(channel, completion_queue, method, host, deadline)
client_call.invoke(completion_queue, metadata_tag, finish_tag)
first_event = completion_queue.get(after_deadline)
@@ -138,7 +138,8 @@ class EchoTest(unittest.TestCase):
server_data = []
client_data = []
- client_call = _low.Call(self.channel, method, self.host, deadline)
+ client_call = _low.Call(self.channel, self.client_completion_queue,
+ method, self.host, deadline)
client_call.add_metadata(client_metadata_key, client_metadata_value)
client_call.add_metadata(client_binary_metadata_key,
client_binary_metadata_value)
@@ -335,7 +336,8 @@ class CancellationTest(unittest.TestCase):
server_data = []
client_data = []
- client_call = _low.Call(self.channel, method, self.host, deadline)
+ client_call = _low.Call(self.channel, self.client_completion_queue,
+ method, self.host, deadline)
client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag)
diff --git a/src/python/src/grpc/_adapter/_server.c b/src/python/src/grpc/_adapter/_server.c
index 181b6c21fc..e7c5917724 100644
--- a/src/python/src/grpc/_adapter/_server.c
+++ b/src/python/src/grpc/_adapter/_server.c
@@ -36,12 +36,14 @@
#include <Python.h>
#include <grpc/grpc.h>
+#include "grpc/_adapter/_call.h"
#include "grpc/_adapter/_completion_queue.h"
#include "grpc/_adapter/_error.h"
#include "grpc/_adapter/_server_credentials.h"
+#include "grpc/_adapter/_tag.h"
static int pygrpc_server_init(Server *self, PyObject *args, PyObject *kwds) {
- const PyObject *completion_queue;
+ CompletionQueue *completion_queue;
static char *kwlist[] = {"completion_queue", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwds, "O!:Server", kwlist,
@@ -50,7 +52,9 @@ static int pygrpc_server_init(Server *self, PyObject *args, PyObject *kwds) {
return -1;
}
self->c_server = grpc_server_create(
- ((CompletionQueue *)completion_queue)->c_completion_queue, NULL);
+ completion_queue->c_completion_queue, NULL);
+ self->completion_queue = completion_queue;
+ Py_INCREF(completion_queue);
return 0;
}
@@ -58,6 +62,7 @@ static void pygrpc_server_dealloc(Server *self) {
if (self->c_server != NULL) {
grpc_server_destroy(self->c_server);
}
+ Py_XDECREF(self->completion_queue);
self->ob_type->tp_free((PyObject *)self);
}
@@ -109,8 +114,15 @@ static PyObject *pygrpc_server_start(Server *self) {
static const PyObject *pygrpc_server_service(Server *self, PyObject *tag) {
grpc_call_error call_error;
const PyObject *result;
-
- call_error = grpc_server_request_call_old(self->c_server, (void *)tag);
+ pygrpc_tag *c_tag = pygrpc_tag_new_server_rpc_call(tag);
+ c_tag->call->completion_queue = self->completion_queue;
+ c_tag->call->server = self;
+ Py_INCREF(c_tag->call->completion_queue);
+ Py_INCREF(c_tag->call->server);
+ call_error = grpc_server_request_call(
+ self->c_server, &c_tag->call->c_call, &c_tag->call->call_details,
+ &c_tag->call->recv_metadata, self->completion_queue->c_completion_queue,
+ c_tag);
result = pygrpc_translate_call_error(call_error);
if (result != NULL) {
diff --git a/src/python/src/grpc/_adapter/_server.h b/src/python/src/grpc/_adapter/_server.h
index 4836bb638c..d31d4e678b 100644
--- a/src/python/src/grpc/_adapter/_server.h
+++ b/src/python/src/grpc/_adapter/_server.h
@@ -37,8 +37,12 @@
#include <Python.h>
#include <grpc/grpc.h>
+#include "grpc/_adapter/_completion_queue.h"
+
typedef struct {
PyObject_HEAD
+
+ CompletionQueue *completion_queue;
grpc_server *c_server;
} Server;
diff --git a/src/python/src/grpc/_adapter/_tag.c b/src/python/src/grpc/_adapter/_tag.c
new file mode 100644
index 0000000000..9c6ee19d79
--- /dev/null
+++ b/src/python/src/grpc/_adapter/_tag.c
@@ -0,0 +1,65 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "grpc/_adapter/_tag.h"
+
+#include <Python.h>
+#include <grpc/grpc.h>
+#include <grpc/support/alloc.h>
+
+pygrpc_tag *pygrpc_tag_new(pygrpc_tag_type type, PyObject *user_tag,
+ Call *call) {
+ pygrpc_tag *self = (pygrpc_tag *)gpr_malloc(sizeof(pygrpc_tag));
+ memset(self, 0, sizeof(pygrpc_tag));
+ if (user_tag == NULL) {
+ self->user_tag = Py_None;
+ } else {
+ self->user_tag = user_tag;
+ }
+ Py_INCREF(self->user_tag);
+ self->type = type;
+ self->call = call;
+ Py_INCREF(call);
+ return self;
+}
+
+pygrpc_tag *pygrpc_tag_new_server_rpc_call(PyObject *user_tag) {
+ return pygrpc_tag_new(PYGRPC_SERVER_RPC_NEW, user_tag,
+ (Call *)pygrpc_CallType.tp_alloc(&pygrpc_CallType, 0));
+}
+
+void pygrpc_tag_destroy(pygrpc_tag *self) {
+ Py_XDECREF(self->user_tag);
+ Py_XDECREF(self->call);
+ gpr_free(self);
+}
diff --git a/src/python/src/grpc/_adapter/_tag.h b/src/python/src/grpc/_adapter/_tag.h
new file mode 100644
index 0000000000..82987ea102
--- /dev/null
+++ b/src/python/src/grpc/_adapter/_tag.h
@@ -0,0 +1,70 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef _ADAPTER__TAG_H_
+#define _ADAPTER__TAG_H_
+
+#include <Python.h>
+#include <grpc/grpc.h>
+
+#include "grpc/_adapter/_call.h"
+#include "grpc/_adapter/_completion_queue.h"
+
+/* grpc_completion_type is becoming meaningless in grpc_event; this is a partial
+ replacement for its descriptive functionality until Python can move its whole
+ C and C adapter stack to more closely resemble the core batching API. */
+typedef enum {
+ PYGRPC_SERVER_RPC_NEW = 0,
+ PYGRPC_INITIAL_METADATA = 1,
+ PYGRPC_READ = 2,
+ PYGRPC_WRITE_ACCEPTED = 3,
+ PYGRPC_FINISH_ACCEPTED = 4,
+ PYGRPC_CLIENT_METADATA_READ = 5,
+ PYGRPC_FINISHED_CLIENT = 6,
+ PYGRPC_FINISHED_SERVER = 7,
+} pygrpc_tag_type;
+
+typedef struct {
+ pygrpc_tag_type type;
+ PyObject *user_tag;
+
+ Call *call;
+} pygrpc_tag;
+
+pygrpc_tag *pygrpc_tag_new(pygrpc_tag_type type, PyObject *user_tag,
+ Call *call);
+pygrpc_tag *pygrpc_tag_new_server_rpc_call(PyObject *user_tag);
+void pygrpc_tag_destroy(pygrpc_tag *self);
+
+#endif /* _ADAPTER__TAG_H_ */
+
diff --git a/src/python/src/grpc/_adapter/rear.py b/src/python/src/grpc/_adapter/rear.py
index 2b93aa6331..dd0a486117 100644
--- a/src/python/src/grpc/_adapter/rear.py
+++ b/src/python/src/grpc/_adapter/rear.py
@@ -246,7 +246,7 @@ class RearLink(base_interfaces.RearLink, activated.Activated):
timeout: A duration of time in seconds to allow for the RPC.
"""
request_serializer = self._request_serializers[name]
- call = _low.Call(self._channel, name, self._host, time.time() + timeout)
+ call = _low.Call(self._channel, self._completion_queue, name, self._host, time.time() + timeout)
if self._metadata_transformer is not None:
metadata = self._metadata_transformer([])
for metadata_key, metadata_value in metadata:
diff --git a/src/python/src/setup.py b/src/python/src/setup.py
index 32ac41e285..7149a9eb85 100644
--- a/src/python/src/setup.py
+++ b/src/python/src/setup.py
@@ -42,6 +42,7 @@ _EXTENSION_SOURCES = (
'grpc/_adapter/_server.c',
'grpc/_adapter/_client_credentials.c',
'grpc/_adapter/_server_credentials.c',
+ 'grpc/_adapter/_tag.c'
)
_EXTENSION_INCLUDE_DIRECTORIES = (