diff options
author | Masood Malekghassemi <soltanmm@users.noreply.github.com> | 2015-04-27 14:36:45 -0700 |
---|---|---|
committer | Masood Malekghassemi <soltanmm@users.noreply.github.com> | 2015-04-28 10:14:59 -0700 |
commit | f579e1d1760c509fed1ca7e07f2bb9af826ab505 (patch) | |
tree | 181b5b1122a3af5740d0cdd4d3a78348d11cf5a8 /src | |
parent | cac5f1d53248a81594779186765c746bc9b89a41 (diff) |
Migrate Python to batch core API
Diffstat (limited to 'src')
-rw-r--r-- | src/python/src/grpc/_adapter/_c_test.py | 5 | ||||
-rw-r--r-- | src/python/src/grpc/_adapter/_call.c | 237 | ||||
-rw-r--r-- | src/python/src/grpc/_adapter/_call.h | 28 | ||||
-rw-r--r-- | src/python/src/grpc/_adapter/_completion_queue.c | 171 | ||||
-rw-r--r-- | src/python/src/grpc/_adapter/_low_test.py | 8 | ||||
-rw-r--r-- | src/python/src/grpc/_adapter/_server.c | 20 | ||||
-rw-r--r-- | src/python/src/grpc/_adapter/_server.h | 4 | ||||
-rw-r--r-- | src/python/src/grpc/_adapter/_tag.c | 65 | ||||
-rw-r--r-- | src/python/src/grpc/_adapter/_tag.h | 70 | ||||
-rw-r--r-- | src/python/src/grpc/_adapter/rear.py | 2 | ||||
-rw-r--r-- | src/python/src/setup.py | 1 |
11 files changed, 487 insertions, 124 deletions
diff --git a/src/python/src/grpc/_adapter/_c_test.py b/src/python/src/grpc/_adapter/_c_test.py index 437a6730cd..6e15adbda8 100644 --- a/src/python/src/grpc/_adapter/_c_test.py +++ b/src/python/src/grpc/_adapter/_c_test.py @@ -83,8 +83,11 @@ class _CTest(unittest.TestCase): _c.init() channel = _c.Channel('%s:%d' % (host, 12345), None) - call = _c.Call(channel, method, host, time.time() + _TIMEOUT) + completion_queue = _c.CompletionQueue() + call = _c.Call(channel, completion_queue, method, host, + time.time() + _TIMEOUT) del call + del completion_queue del channel _c.shut_down() diff --git a/src/python/src/grpc/_adapter/_call.c b/src/python/src/grpc/_adapter/_call.c index bf96c1a3fa..d833268fc9 100644 --- a/src/python/src/grpc/_adapter/_call.c +++ b/src/python/src/grpc/_adapter/_call.c @@ -36,90 +36,166 @@ #include <math.h> #include <Python.h> #include <grpc/grpc.h> +#include <grpc/support/alloc.h> #include "grpc/_adapter/_channel.h" #include "grpc/_adapter/_completion_queue.h" #include "grpc/_adapter/_error.h" +#include "grpc/_adapter/_tag.h" -static int pygrpc_call_init(Call *self, PyObject *args, PyObject *kwds) { - const PyObject *channel; +static PyObject *pygrpc_call_new(PyTypeObject *type, PyObject *args, PyObject *kwds) { + Call *self = (Call *)type->tp_alloc(type, 0); + Channel *channel; + CompletionQueue *completion_queue; const char *method; const char *host; double deadline; - static char *kwlist[] = {"channel", "method", "host", "deadline", NULL}; - - if (!PyArg_ParseTupleAndKeywords(args, kwds, "O!ssd:Call", kwlist, - &pygrpc_ChannelType, &channel, &method, - &host, &deadline)) { - return -1; + static char *kwlist[] = {"channel", "completion_queue", + "method", "host", "deadline", NULL}; + + if (!PyArg_ParseTupleAndKeywords( + args, kwds, "O!O!ssd:Call", kwlist, + &pygrpc_ChannelType, &channel, + &pygrpc_CompletionQueueType, &completion_queue, + &method, &host, &deadline)) { + return NULL; } /* TODO(nathaniel): Hoist the gpr_timespec <-> PyFloat arithmetic into its own * function with its own test coverage. */ - self->c_call = grpc_channel_create_call_old( - ((Channel *)channel)->c_channel, method, host, + self->c_call = grpc_channel_create_call( + channel->c_channel, completion_queue->c_completion_queue, method, host, gpr_time_from_nanos(deadline * GPR_NS_PER_SEC)); - - return 0; + self->completion_queue = completion_queue; + Py_INCREF(self->completion_queue); + self->channel = channel; + Py_INCREF(self->channel); + grpc_call_details_init(&self->call_details); + grpc_metadata_array_init(&self->recv_metadata); + grpc_metadata_array_init(&self->recv_trailing_metadata); + self->send_metadata = NULL; + self->send_metadata_count = 0; + self->send_trailing_metadata = NULL; + self->send_trailing_metadata_count = 0; + self->send_message = NULL; + self->recv_message = NULL; + self->adding_to_trailing = 0; + + return (PyObject *)self; } static void pygrpc_call_dealloc(Call *self) { if (self->c_call != NULL) { grpc_call_destroy(self->c_call); } + Py_XDECREF(self->completion_queue); + Py_XDECREF(self->channel); + Py_XDECREF(self->server); + grpc_call_details_destroy(&self->call_details); + grpc_metadata_array_destroy(&self->recv_metadata); + grpc_metadata_array_destroy(&self->recv_trailing_metadata); + if (self->send_message) { + grpc_byte_buffer_destroy(self->send_message); + } + if (self->recv_message) { + grpc_byte_buffer_destroy(self->recv_message); + } + gpr_free(self->status_details); + gpr_free(self->send_metadata); + gpr_free(self->send_trailing_metadata); self->ob_type->tp_free((PyObject *)self); } static const PyObject *pygrpc_call_invoke(Call *self, PyObject *args) { - const PyObject *completion_queue; - const PyObject *metadata_tag; - const PyObject *finish_tag; + PyObject *completion_queue; + PyObject *metadata_tag; + PyObject *finish_tag; grpc_call_error call_error; const PyObject *result; + pygrpc_tag *c_init_metadata_tag; + pygrpc_tag *c_metadata_tag; + pygrpc_tag *c_finish_tag; + grpc_op send_initial_metadata; + grpc_op recv_initial_metadata; + grpc_op recv_status_on_client; if (!(PyArg_ParseTuple(args, "O!OO:invoke", &pygrpc_CompletionQueueType, &completion_queue, &metadata_tag, &finish_tag))) { return NULL; } - - call_error = grpc_call_invoke_old( - self->c_call, ((CompletionQueue *)completion_queue)->c_completion_queue, - (void *)metadata_tag, (void *)finish_tag, 0); - + send_initial_metadata.op = GRPC_OP_SEND_INITIAL_METADATA; + send_initial_metadata.data.send_initial_metadata.metadata = self->send_metadata; + send_initial_metadata.data.send_initial_metadata.count = self->send_metadata_count; + recv_initial_metadata.op = GRPC_OP_RECV_INITIAL_METADATA; + recv_initial_metadata.data.recv_initial_metadata = &self->recv_metadata; + recv_status_on_client.op = GRPC_OP_RECV_STATUS_ON_CLIENT; + recv_status_on_client.data.recv_status_on_client.trailing_metadata = &self->recv_trailing_metadata; + recv_status_on_client.data.recv_status_on_client.status = &self->status; + recv_status_on_client.data.recv_status_on_client.status_details = &self->status_details; + recv_status_on_client.data.recv_status_on_client.status_details_capacity = &self->status_details_capacity; + c_init_metadata_tag = pygrpc_tag_new(PYGRPC_INITIAL_METADATA, NULL, self); + c_metadata_tag = pygrpc_tag_new(PYGRPC_CLIENT_METADATA_READ, metadata_tag, self); + c_finish_tag = pygrpc_tag_new(PYGRPC_FINISHED_CLIENT, finish_tag, self); + + call_error = grpc_call_start_batch(self->c_call, &send_initial_metadata, 1, c_init_metadata_tag); + result = pygrpc_translate_call_error(call_error); + if (result == NULL) { + pygrpc_tag_destroy(c_init_metadata_tag); + pygrpc_tag_destroy(c_metadata_tag); + pygrpc_tag_destroy(c_finish_tag); + return result; + } + call_error = grpc_call_start_batch(self->c_call, &recv_initial_metadata, 1, c_metadata_tag); + result = pygrpc_translate_call_error(call_error); + if (result == NULL) { + pygrpc_tag_destroy(c_metadata_tag); + pygrpc_tag_destroy(c_finish_tag); + return result; + } + call_error = grpc_call_start_batch(self->c_call, &recv_status_on_client, 1, c_finish_tag); result = pygrpc_translate_call_error(call_error); - if (result != NULL) { - Py_INCREF(metadata_tag); - Py_INCREF(finish_tag); + if (result == NULL) { + pygrpc_tag_destroy(c_finish_tag); + return result; } + return result; } static const PyObject *pygrpc_call_write(Call *self, PyObject *args) { const char *bytes; int length; - const PyObject *tag; + PyObject *tag; gpr_slice slice; grpc_byte_buffer *byte_buffer; grpc_call_error call_error; const PyObject *result; + pygrpc_tag *c_tag; + grpc_op op; if (!(PyArg_ParseTuple(args, "s#O:write", &bytes, &length, &tag))) { return NULL; } + c_tag = pygrpc_tag_new(PYGRPC_WRITE_ACCEPTED, tag, self); slice = gpr_slice_from_copied_buffer(bytes, length); byte_buffer = grpc_byte_buffer_create(&slice, 1); gpr_slice_unref(slice); - call_error = - grpc_call_start_write_old(self->c_call, byte_buffer, (void *)tag, 0); + if (self->send_message) { + grpc_byte_buffer_destroy(self->send_message); + } + self->send_message = byte_buffer; + + op.op = GRPC_OP_SEND_MESSAGE; + op.data.send_message = self->send_message; - grpc_byte_buffer_destroy(byte_buffer); + call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag); result = pygrpc_translate_call_error(call_error); - if (result != NULL) { - Py_INCREF(tag); + if (result == NULL) { + pygrpc_tag_destroy(c_tag); } return result; } @@ -127,36 +203,42 @@ static const PyObject *pygrpc_call_write(Call *self, PyObject *args) { static const PyObject *pygrpc_call_complete(Call *self, PyObject *tag) { grpc_call_error call_error; const PyObject *result; + pygrpc_tag *c_tag = pygrpc_tag_new(PYGRPC_FINISH_ACCEPTED, tag, self); + grpc_op op; - call_error = grpc_call_writes_done_old(self->c_call, (void *)tag); + op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT; + + call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag); result = pygrpc_translate_call_error(call_error); - if (result != NULL) { - Py_INCREF(tag); + if (result == NULL) { + pygrpc_tag_destroy(c_tag); } return result; } static const PyObject *pygrpc_call_accept(Call *self, PyObject *args) { - const PyObject *completion_queue; - const PyObject *tag; + PyObject *completion_queue; + PyObject *tag; grpc_call_error call_error; const PyObject *result; + pygrpc_tag *c_tag; + grpc_op op; if (!(PyArg_ParseTuple(args, "O!O:accept", &pygrpc_CompletionQueueType, &completion_queue, &tag))) { return NULL; } - call_error = grpc_call_server_accept_old( - self->c_call, ((CompletionQueue *)completion_queue)->c_completion_queue, - (void *)tag); - result = pygrpc_translate_call_error(call_error); + op.op = GRPC_OP_RECV_CLOSE_ON_SERVER; + op.data.recv_close_on_server.cancelled = &self->cancelled; + c_tag = pygrpc_tag_new(PYGRPC_FINISHED_SERVER, tag, self); - if (result != NULL) { - Py_INCREF(tag); + call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag); + result = pygrpc_translate_call_error(call_error); + if (result == NULL) { + pygrpc_tag_destroy(c_tag); } - return result; } @@ -171,24 +253,52 @@ static const PyObject *pygrpc_call_add_metadata(Call *self, PyObject *args) { metadata.key = key; metadata.value = value; metadata.value_length = value_length; - return pygrpc_translate_call_error( - grpc_call_add_metadata_old(self->c_call, &metadata, 0)); + if (self->adding_to_trailing) { + self->send_trailing_metadata = gpr_realloc(self->send_trailing_metadata, (self->send_trailing_metadata_count + 1) * sizeof(grpc_metadata)); + self->send_trailing_metadata[self->send_trailing_metadata_count] = metadata; + self->send_trailing_metadata_count = self->send_trailing_metadata_count + 1; + } else { + self->send_metadata = gpr_realloc(self->send_metadata, (self->send_metadata_count + 1) * sizeof(grpc_metadata)); + self->send_metadata[self->send_metadata_count] = metadata; + self->send_metadata_count = self->send_metadata_count + 1; + } + return pygrpc_translate_call_error(GRPC_CALL_OK); } static const PyObject *pygrpc_call_premetadata(Call *self) { - return pygrpc_translate_call_error( - grpc_call_server_end_initial_metadata_old(self->c_call, 0)); + grpc_op op; + grpc_call_error call_error; + const PyObject *result; + pygrpc_tag *c_tag = pygrpc_tag_new(PYGRPC_INITIAL_METADATA, NULL, self); + op.op = GRPC_OP_SEND_INITIAL_METADATA; + op.data.send_initial_metadata.metadata = self->send_metadata; + op.data.send_initial_metadata.count = self->send_metadata_count; + self->adding_to_trailing = 1; + + call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag); + result = pygrpc_translate_call_error(call_error); + if (result == NULL) { + pygrpc_tag_destroy(c_tag); + } + return result; } static const PyObject *pygrpc_call_read(Call *self, PyObject *tag) { + grpc_op op; grpc_call_error call_error; const PyObject *result; + pygrpc_tag *c_tag = pygrpc_tag_new(PYGRPC_READ, tag, self); - call_error = grpc_call_start_read_old(self->c_call, (void *)tag); - + op.op = GRPC_OP_RECV_MESSAGE; + if (self->recv_message) { + grpc_byte_buffer_destroy(self->recv_message); + self->recv_message = NULL; + } + op.data.recv_message = &self->recv_message; + call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag); result = pygrpc_translate_call_error(call_error); - if (result != NULL) { - Py_INCREF(tag); + if (result == NULL) { + pygrpc_tag_destroy(c_tag); } return result; } @@ -197,15 +307,18 @@ static const PyObject *pygrpc_call_status(Call *self, PyObject *args) { PyObject *status; PyObject *code; PyObject *details; - const PyObject *tag; + PyObject *tag; grpc_status_code c_code; char *c_message; grpc_call_error call_error; const PyObject *result; + pygrpc_tag *c_tag; + grpc_op op; if (!(PyArg_ParseTuple(args, "OO:status", &status, &tag))) { return NULL; } + c_tag = pygrpc_tag_new(PYGRPC_FINISH_ACCEPTED, tag, self); code = PyObject_GetAttrString(status, "code"); if (code == NULL) { @@ -227,13 +340,21 @@ static const PyObject *pygrpc_call_status(Call *self, PyObject *args) { if (c_message == NULL) { return NULL; } - - call_error = grpc_call_start_write_status_old(self->c_call, c_code, c_message, - (void *)tag); - + if (self->status_details) { + gpr_free(self->status_details); + } + self->status_details = gpr_malloc(strlen(c_message)+1); + strcpy(self->status_details, c_message); + op.op = GRPC_OP_SEND_STATUS_FROM_SERVER; + op.data.send_status_from_server.trailing_metadata_count = self->send_trailing_metadata_count; + op.data.send_status_from_server.trailing_metadata = self->send_trailing_metadata; + op.data.send_status_from_server.status = c_code; + op.data.send_status_from_server.status_details = self->status_details; + + call_error = grpc_call_start_batch(self->c_call, &op, 1, c_tag); result = pygrpc_translate_call_error(call_error); - if (result != NULL) { - Py_INCREF(tag); + if (result == NULL) { + pygrpc_tag_destroy(c_tag); } return result; } @@ -301,9 +422,9 @@ PyTypeObject pygrpc_CallType = { 0, /* tp_descr_get */ 0, /* tp_descr_set */ 0, /* tp_dictoffset */ - (initproc)pygrpc_call_init, /* tp_init */ + 0, /* tp_init */ 0, /* tp_alloc */ - PyType_GenericNew, /* tp_new */ + pygrpc_call_new, /* tp_new */ }; int pygrpc_add_call(PyObject *module) { diff --git a/src/python/src/grpc/_adapter/_call.h b/src/python/src/grpc/_adapter/_call.h index c04a2285f7..fb9160901b 100644 --- a/src/python/src/grpc/_adapter/_call.h +++ b/src/python/src/grpc/_adapter/_call.h @@ -37,8 +37,36 @@ #include <Python.h> #include <grpc/grpc.h> +#include "grpc/_adapter/_completion_queue.h" +#include "grpc/_adapter/_channel.h" +#include "grpc/_adapter/_server.h" + typedef struct { PyObject_HEAD + + CompletionQueue *completion_queue; + Channel *channel; + Server *server; + + /* Legacy state. */ + grpc_call_details call_details; + grpc_metadata_array recv_metadata; + grpc_metadata_array recv_trailing_metadata; + grpc_metadata *send_metadata; + size_t send_metadata_count; + grpc_metadata *send_trailing_metadata; + size_t send_trailing_metadata_count; + int adding_to_trailing; + + grpc_byte_buffer *send_message; + grpc_byte_buffer *recv_message; + + grpc_status_code status; + char *status_details; + size_t status_details_capacity; + + int cancelled; + grpc_call *c_call; } Call; diff --git a/src/python/src/grpc/_adapter/_completion_queue.c b/src/python/src/grpc/_adapter/_completion_queue.c index a639eff53e..5f1cb2a3a6 100644 --- a/src/python/src/grpc/_adapter/_completion_queue.c +++ b/src/python/src/grpc/_adapter/_completion_queue.c @@ -38,6 +38,7 @@ #include <grpc/support/alloc.h> #include "grpc/_adapter/_call.h" +#include "grpc/_adapter/_tag.h" static PyObject *status_class; static PyObject *service_acceptance_class; @@ -138,74 +139,70 @@ static PyObject *pygrpc_stop_event_args(grpc_event *c_event) { } static PyObject *pygrpc_write_event_args(grpc_event *c_event) { - PyObject *write_accepted = - c_event->data.write_accepted == GRPC_OP_OK ? Py_True : Py_False; - return PyTuple_Pack(8, write_event_kind, (PyObject *)c_event->tag, + pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag); + PyObject *user_tag = tag->user_tag; + PyObject *write_accepted = Py_True; + return PyTuple_Pack(8, write_event_kind, user_tag, write_accepted, Py_None, Py_None, Py_None, Py_None, Py_None); } static PyObject *pygrpc_complete_event_args(grpc_event *c_event) { - PyObject *complete_accepted = - c_event->data.finish_accepted == GRPC_OP_OK ? Py_True : Py_False; - return PyTuple_Pack(8, complete_event_kind, (PyObject *)c_event->tag, + pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag); + PyObject *user_tag = tag->user_tag; + PyObject *complete_accepted = Py_True; + return PyTuple_Pack(8, complete_event_kind, user_tag, Py_None, complete_accepted, Py_None, Py_None, Py_None, Py_None); } static PyObject *pygrpc_service_event_args(grpc_event *c_event) { - if (c_event->data.server_rpc_new.method == NULL) { + pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag); + PyObject *user_tag = tag->user_tag; + if (tag->call->call_details.method == NULL) { return PyTuple_Pack( - 8, service_event_kind, c_event->tag, Py_None, Py_None, Py_None, Py_None, + 8, service_event_kind, user_tag, Py_None, Py_None, Py_None, Py_None, Py_None, Py_None); } else { PyObject *method = NULL; PyObject *host = NULL; PyObject *service_deadline = NULL; - Call *call = NULL; PyObject *service_acceptance = NULL; PyObject *metadata = NULL; PyObject *event_args = NULL; - method = PyBytes_FromString(c_event->data.server_rpc_new.method); + method = PyBytes_FromString(tag->call->call_details.method); if (method == NULL) { goto error; } - host = PyBytes_FromString(c_event->data.server_rpc_new.host); + host = PyBytes_FromString(tag->call->call_details.host); if (host == NULL) { goto error; } service_deadline = - pygrpc_as_py_time(&c_event->data.server_rpc_new.deadline); + pygrpc_as_py_time(&tag->call->call_details.deadline); if (service_deadline == NULL) { goto error; } - call = PyObject_New(Call, &pygrpc_CallType); - if (call == NULL) { - goto error; - } - call->c_call = c_event->call; - service_acceptance = - PyObject_CallFunctionObjArgs(service_acceptance_class, call, method, - host, service_deadline, NULL); + PyObject_CallFunctionObjArgs(service_acceptance_class, tag->call, + method, host, service_deadline, NULL); if (service_acceptance == NULL) { goto error; } metadata = pygrpc_metadata_collection_get( - c_event->data.server_rpc_new.metadata_elements, - c_event->data.server_rpc_new.metadata_count); + tag->call->recv_metadata.metadata, + tag->call->recv_metadata.count); event_args = PyTuple_Pack(8, service_event_kind, - (PyObject *)c_event->tag, Py_None, Py_None, + user_tag, Py_None, Py_None, service_acceptance, Py_None, Py_None, metadata); Py_DECREF(service_acceptance); Py_DECREF(metadata); error: - Py_XDECREF(call); Py_XDECREF(method); Py_XDECREF(host); Py_XDECREF(service_deadline); @@ -215,8 +212,10 @@ error: } static PyObject *pygrpc_read_event_args(grpc_event *c_event) { - if (c_event->data.read == NULL) { - return PyTuple_Pack(8, read_event_kind, (PyObject *)c_event->tag, + pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag); + PyObject *user_tag = tag->user_tag; + if (tag->call->recv_message == NULL) { + return PyTuple_Pack(8, read_event_kind, user_tag, Py_None, Py_None, Py_None, Py_None, Py_None, Py_None); } else { size_t length; @@ -227,8 +226,8 @@ static PyObject *pygrpc_read_event_args(grpc_event *c_event) { PyObject *bytes; PyObject *event_args; - length = grpc_byte_buffer_length(c_event->data.read); - reader = grpc_byte_buffer_reader_create(c_event->data.read); + length = grpc_byte_buffer_length(tag->call->recv_message); + reader = grpc_byte_buffer_reader_create(tag->call->recv_message); c_bytes = gpr_malloc(length); offset = 0; while (grpc_byte_buffer_reader_next(reader, &slice)) { @@ -242,7 +241,7 @@ static PyObject *pygrpc_read_event_args(grpc_event *c_event) { if (bytes == NULL) { return NULL; } - event_args = PyTuple_Pack(8, read_event_kind, (PyObject *)c_event->tag, + event_args = PyTuple_Pack(8, read_event_kind, user_tag, Py_None, Py_None, Py_None, bytes, Py_None, Py_None); Py_DECREF(bytes); @@ -251,32 +250,65 @@ static PyObject *pygrpc_read_event_args(grpc_event *c_event) { } static PyObject *pygrpc_metadata_event_args(grpc_event *c_event) { + pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag); + PyObject *user_tag = tag->user_tag; PyObject *metadata = pygrpc_metadata_collection_get( - c_event->data.client_metadata_read.elements, - c_event->data.client_metadata_read.count); + tag->call->recv_metadata.metadata, + tag->call->recv_metadata.count); PyObject* result = PyTuple_Pack( - 8, metadata_event_kind, (PyObject *)c_event->tag, Py_None, Py_None, + 8, metadata_event_kind, user_tag, Py_None, Py_None, Py_None, Py_None, Py_None, metadata); Py_DECREF(metadata); return result; } -static PyObject *pygrpc_finished_event_args(grpc_event *c_event) { +static PyObject *pygrpc_finished_server_event_args(grpc_event *c_event) { + PyObject *code; + PyObject *details; + PyObject *status; + PyObject *event_args; + pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag); + PyObject *user_tag = tag->user_tag; + + code = pygrpc_status_code(tag->call->cancelled ? GRPC_STATUS_CANCELLED : GRPC_STATUS_OK); + if (code == NULL) { + PyErr_SetString(PyExc_RuntimeError, "Unrecognized status code!"); + return NULL; + } + details = PyBytes_FromString(""); + if (details == NULL) { + return NULL; + } + status = PyObject_CallFunctionObjArgs(status_class, code, details, NULL); + Py_DECREF(details); + if (status == NULL) { + return NULL; + } + event_args = PyTuple_Pack(8, finish_event_kind, user_tag, + Py_None, Py_None, Py_None, Py_None, status, + Py_None); + Py_DECREF(status); + return event_args; +} + +static PyObject *pygrpc_finished_client_event_args(grpc_event *c_event) { PyObject *code; PyObject *details; PyObject *status; PyObject *event_args; PyObject *metadata; + pygrpc_tag *tag = (pygrpc_tag *)(c_event->tag); + PyObject *user_tag = tag->user_tag; - code = pygrpc_status_code(c_event->data.finished.status); + code = pygrpc_status_code(tag->call->status); if (code == NULL) { PyErr_SetString(PyExc_RuntimeError, "Unrecognized status code!"); return NULL; } - if (c_event->data.finished.details == NULL) { + if (tag->call->status_details == NULL) { details = PyBytes_FromString(""); } else { - details = PyBytes_FromString(c_event->data.finished.details); + details = PyBytes_FromString(tag->call->status_details); } if (details == NULL) { return NULL; @@ -287,9 +319,9 @@ static PyObject *pygrpc_finished_event_args(grpc_event *c_event) { return NULL; } metadata = pygrpc_metadata_collection_get( - c_event->data.finished.metadata_elements, - c_event->data.finished.metadata_count); - event_args = PyTuple_Pack(8, finish_event_kind, (PyObject *)c_event->tag, + tag->call->recv_trailing_metadata.metadata, + tag->call->recv_trailing_metadata.count); + event_args = PyTuple_Pack(8, finish_event_kind, user_tag, Py_None, Py_None, Py_None, Py_None, status, metadata); Py_DECREF(status); @@ -348,28 +380,51 @@ static PyObject *pygrpc_completion_queue_get(CompletionQueue *self, Py_RETURN_NONE; } + pygrpc_tag *tag = (pygrpc_tag *)c_event->tag; + switch (c_event->type) { case GRPC_QUEUE_SHUTDOWN: event_args = pygrpc_stop_event_args(c_event); break; - case GRPC_WRITE_ACCEPTED: - event_args = pygrpc_write_event_args(c_event); - break; - case GRPC_FINISH_ACCEPTED: - event_args = pygrpc_complete_event_args(c_event); - break; - case GRPC_SERVER_RPC_NEW: - event_args = pygrpc_service_event_args(c_event); - break; - case GRPC_READ: - event_args = pygrpc_read_event_args(c_event); - break; - case GRPC_CLIENT_METADATA_READ: - event_args = pygrpc_metadata_event_args(c_event); - break; - case GRPC_FINISHED: - event_args = pygrpc_finished_event_args(c_event); + case GRPC_OP_COMPLETE: { + if (!tag) { + PyErr_SetString(PyExc_Exception, "Unrecognized event type!"); + return NULL; + } + switch (tag->type) { + case PYGRPC_INITIAL_METADATA: + if (tag) { + pygrpc_tag_destroy(tag); + } + grpc_event_finish(c_event); + return pygrpc_completion_queue_get(self, args); + case PYGRPC_WRITE_ACCEPTED: + event_args = pygrpc_write_event_args(c_event); + break; + case PYGRPC_FINISH_ACCEPTED: + event_args = pygrpc_complete_event_args(c_event); + break; + case PYGRPC_SERVER_RPC_NEW: + event_args = pygrpc_service_event_args(c_event); + break; + case PYGRPC_READ: + event_args = pygrpc_read_event_args(c_event); + break; + case PYGRPC_CLIENT_METADATA_READ: + event_args = pygrpc_metadata_event_args(c_event); + break; + case PYGRPC_FINISHED_CLIENT: + event_args = pygrpc_finished_client_event_args(c_event); + break; + case PYGRPC_FINISHED_SERVER: + event_args = pygrpc_finished_server_event_args(c_event); + break; + default: + PyErr_SetString(PyExc_Exception, "Unrecognized op event type!"); + return NULL; + } break; + } default: PyErr_SetString(PyExc_Exception, "Unrecognized event type!"); return NULL; @@ -382,7 +437,9 @@ static PyObject *pygrpc_completion_queue_get(CompletionQueue *self, event = PyObject_CallObject(event_class, event_args); Py_DECREF(event_args); - Py_XDECREF((PyObject *)c_event->tag); + if (tag) { + pygrpc_tag_destroy(tag); + } grpc_event_finish(c_event); return event; diff --git a/src/python/src/grpc/_adapter/_low_test.py b/src/python/src/grpc/_adapter/_low_test.py index 826c586a1e..09c4660a2b 100644 --- a/src/python/src/grpc/_adapter/_low_test.py +++ b/src/python/src/grpc/_adapter/_low_test.py @@ -56,7 +56,7 @@ class LonelyClientTest(unittest.TestCase): completion_queue = _low.CompletionQueue() channel = _low.Channel('%s:%d' % (host, port), None) - client_call = _low.Call(channel, method, host, deadline) + client_call = _low.Call(channel, completion_queue, method, host, deadline) client_call.invoke(completion_queue, metadata_tag, finish_tag) first_event = completion_queue.get(after_deadline) @@ -138,7 +138,8 @@ class EchoTest(unittest.TestCase): server_data = [] client_data = [] - client_call = _low.Call(self.channel, method, self.host, deadline) + client_call = _low.Call(self.channel, self.client_completion_queue, + method, self.host, deadline) client_call.add_metadata(client_metadata_key, client_metadata_value) client_call.add_metadata(client_binary_metadata_key, client_binary_metadata_value) @@ -335,7 +336,8 @@ class CancellationTest(unittest.TestCase): server_data = [] client_data = [] - client_call = _low.Call(self.channel, method, self.host, deadline) + client_call = _low.Call(self.channel, self.client_completion_queue, + method, self.host, deadline) client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag) diff --git a/src/python/src/grpc/_adapter/_server.c b/src/python/src/grpc/_adapter/_server.c index 181b6c21fc..e7c5917724 100644 --- a/src/python/src/grpc/_adapter/_server.c +++ b/src/python/src/grpc/_adapter/_server.c @@ -36,12 +36,14 @@ #include <Python.h> #include <grpc/grpc.h> +#include "grpc/_adapter/_call.h" #include "grpc/_adapter/_completion_queue.h" #include "grpc/_adapter/_error.h" #include "grpc/_adapter/_server_credentials.h" +#include "grpc/_adapter/_tag.h" static int pygrpc_server_init(Server *self, PyObject *args, PyObject *kwds) { - const PyObject *completion_queue; + CompletionQueue *completion_queue; static char *kwlist[] = {"completion_queue", NULL}; if (!PyArg_ParseTupleAndKeywords(args, kwds, "O!:Server", kwlist, @@ -50,7 +52,9 @@ static int pygrpc_server_init(Server *self, PyObject *args, PyObject *kwds) { return -1; } self->c_server = grpc_server_create( - ((CompletionQueue *)completion_queue)->c_completion_queue, NULL); + completion_queue->c_completion_queue, NULL); + self->completion_queue = completion_queue; + Py_INCREF(completion_queue); return 0; } @@ -58,6 +62,7 @@ static void pygrpc_server_dealloc(Server *self) { if (self->c_server != NULL) { grpc_server_destroy(self->c_server); } + Py_XDECREF(self->completion_queue); self->ob_type->tp_free((PyObject *)self); } @@ -109,8 +114,15 @@ static PyObject *pygrpc_server_start(Server *self) { static const PyObject *pygrpc_server_service(Server *self, PyObject *tag) { grpc_call_error call_error; const PyObject *result; - - call_error = grpc_server_request_call_old(self->c_server, (void *)tag); + pygrpc_tag *c_tag = pygrpc_tag_new_server_rpc_call(tag); + c_tag->call->completion_queue = self->completion_queue; + c_tag->call->server = self; + Py_INCREF(c_tag->call->completion_queue); + Py_INCREF(c_tag->call->server); + call_error = grpc_server_request_call( + self->c_server, &c_tag->call->c_call, &c_tag->call->call_details, + &c_tag->call->recv_metadata, self->completion_queue->c_completion_queue, + c_tag); result = pygrpc_translate_call_error(call_error); if (result != NULL) { diff --git a/src/python/src/grpc/_adapter/_server.h b/src/python/src/grpc/_adapter/_server.h index 4836bb638c..d31d4e678b 100644 --- a/src/python/src/grpc/_adapter/_server.h +++ b/src/python/src/grpc/_adapter/_server.h @@ -37,8 +37,12 @@ #include <Python.h> #include <grpc/grpc.h> +#include "grpc/_adapter/_completion_queue.h" + typedef struct { PyObject_HEAD + + CompletionQueue *completion_queue; grpc_server *c_server; } Server; diff --git a/src/python/src/grpc/_adapter/_tag.c b/src/python/src/grpc/_adapter/_tag.c new file mode 100644 index 0000000000..9c6ee19d79 --- /dev/null +++ b/src/python/src/grpc/_adapter/_tag.c @@ -0,0 +1,65 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include "grpc/_adapter/_tag.h" + +#include <Python.h> +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> + +pygrpc_tag *pygrpc_tag_new(pygrpc_tag_type type, PyObject *user_tag, + Call *call) { + pygrpc_tag *self = (pygrpc_tag *)gpr_malloc(sizeof(pygrpc_tag)); + memset(self, 0, sizeof(pygrpc_tag)); + if (user_tag == NULL) { + self->user_tag = Py_None; + } else { + self->user_tag = user_tag; + } + Py_INCREF(self->user_tag); + self->type = type; + self->call = call; + Py_INCREF(call); + return self; +} + +pygrpc_tag *pygrpc_tag_new_server_rpc_call(PyObject *user_tag) { + return pygrpc_tag_new(PYGRPC_SERVER_RPC_NEW, user_tag, + (Call *)pygrpc_CallType.tp_alloc(&pygrpc_CallType, 0)); +} + +void pygrpc_tag_destroy(pygrpc_tag *self) { + Py_XDECREF(self->user_tag); + Py_XDECREF(self->call); + gpr_free(self); +} diff --git a/src/python/src/grpc/_adapter/_tag.h b/src/python/src/grpc/_adapter/_tag.h new file mode 100644 index 0000000000..82987ea102 --- /dev/null +++ b/src/python/src/grpc/_adapter/_tag.h @@ -0,0 +1,70 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef _ADAPTER__TAG_H_ +#define _ADAPTER__TAG_H_ + +#include <Python.h> +#include <grpc/grpc.h> + +#include "grpc/_adapter/_call.h" +#include "grpc/_adapter/_completion_queue.h" + +/* grpc_completion_type is becoming meaningless in grpc_event; this is a partial + replacement for its descriptive functionality until Python can move its whole + C and C adapter stack to more closely resemble the core batching API. */ +typedef enum { + PYGRPC_SERVER_RPC_NEW = 0, + PYGRPC_INITIAL_METADATA = 1, + PYGRPC_READ = 2, + PYGRPC_WRITE_ACCEPTED = 3, + PYGRPC_FINISH_ACCEPTED = 4, + PYGRPC_CLIENT_METADATA_READ = 5, + PYGRPC_FINISHED_CLIENT = 6, + PYGRPC_FINISHED_SERVER = 7, +} pygrpc_tag_type; + +typedef struct { + pygrpc_tag_type type; + PyObject *user_tag; + + Call *call; +} pygrpc_tag; + +pygrpc_tag *pygrpc_tag_new(pygrpc_tag_type type, PyObject *user_tag, + Call *call); +pygrpc_tag *pygrpc_tag_new_server_rpc_call(PyObject *user_tag); +void pygrpc_tag_destroy(pygrpc_tag *self); + +#endif /* _ADAPTER__TAG_H_ */ + diff --git a/src/python/src/grpc/_adapter/rear.py b/src/python/src/grpc/_adapter/rear.py index 2b93aa6331..dd0a486117 100644 --- a/src/python/src/grpc/_adapter/rear.py +++ b/src/python/src/grpc/_adapter/rear.py @@ -246,7 +246,7 @@ class RearLink(base_interfaces.RearLink, activated.Activated): timeout: A duration of time in seconds to allow for the RPC. """ request_serializer = self._request_serializers[name] - call = _low.Call(self._channel, name, self._host, time.time() + timeout) + call = _low.Call(self._channel, self._completion_queue, name, self._host, time.time() + timeout) if self._metadata_transformer is not None: metadata = self._metadata_transformer([]) for metadata_key, metadata_value in metadata: diff --git a/src/python/src/setup.py b/src/python/src/setup.py index 32ac41e285..7149a9eb85 100644 --- a/src/python/src/setup.py +++ b/src/python/src/setup.py @@ -42,6 +42,7 @@ _EXTENSION_SOURCES = ( 'grpc/_adapter/_server.c', 'grpc/_adapter/_client_credentials.c', 'grpc/_adapter/_server_credentials.c', + 'grpc/_adapter/_tag.c' ) _EXTENSION_INCLUDE_DIRECTORIES = ( |