diff options
author | Craig Tiller <ctiller@google.com> | 2015-06-11 14:54:21 -0700 |
---|---|---|
committer | Craig Tiller <ctiller@google.com> | 2015-06-11 14:54:21 -0700 |
commit | 3cbfcb4dcd3cbf785f7391a8e4c5f36e6615c9d4 (patch) | |
tree | 500e29f6deb747644937e72b4a21568234c66af3 /src | |
parent | e0617624bafca93f795f12483451583764fd8c80 (diff) | |
parent | 0680527a78f5eb86e04a2e909662500172bf92a0 (diff) |
Merge branch 'we-dont-need-no-backup' into oops-i-split-it-again
Diffstat (limited to 'src')
22 files changed, 222 insertions, 113 deletions
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc index b0d2b5d229..c00c85bb90 100644 --- a/src/compiler/cpp_generator.cc +++ b/src/compiler/cpp_generator.cc @@ -849,6 +849,9 @@ void PrintSourceServerMethod(grpc::protobuf::io::Printer *printer, "::grpc::Status $ns$$Service$::Service::$Method$(" "::grpc::ServerContext* context, " "const $Request$* request, $Response$* response) {\n"); + printer->Print(" (void) context;\n"); + printer->Print(" (void) request;\n"); + printer->Print(" (void) response;\n"); printer->Print( " return ::grpc::Status(" "::grpc::StatusCode::UNIMPLEMENTED);\n"); @@ -859,6 +862,9 @@ void PrintSourceServerMethod(grpc::protobuf::io::Printer *printer, "::grpc::ServerContext* context, " "::grpc::ServerReader< $Request$>* reader, " "$Response$* response) {\n"); + printer->Print(" (void) context;\n"); + printer->Print(" (void) reader;\n"); + printer->Print(" (void) response;\n"); printer->Print( " return ::grpc::Status(" "::grpc::StatusCode::UNIMPLEMENTED);\n"); @@ -869,6 +875,9 @@ void PrintSourceServerMethod(grpc::protobuf::io::Printer *printer, "::grpc::ServerContext* context, " "const $Request$* request, " "::grpc::ServerWriter< $Response$>* writer) {\n"); + printer->Print(" (void) context;\n"); + printer->Print(" (void) request;\n"); + printer->Print(" (void) writer;\n"); printer->Print( " return ::grpc::Status(" "::grpc::StatusCode::UNIMPLEMENTED);\n"); @@ -879,6 +888,8 @@ void PrintSourceServerMethod(grpc::protobuf::io::Printer *printer, "::grpc::ServerContext* context, " "::grpc::ServerReaderWriter< $Response$, $Request$>* " "stream) {\n"); + printer->Print(" (void) context;\n"); + printer->Print(" (void) stream;\n"); printer->Print( " return ::grpc::Status(" "::grpc::StatusCode::UNIMPLEMENTED);\n"); diff --git a/src/core/channel/client_channel.c b/src/core/channel/client_channel.c index 31b1fc3bde..726196e996 100644 --- a/src/core/channel/client_channel.c +++ b/src/core/channel/client_channel.c @@ -102,10 +102,17 @@ struct call_data { static int prepare_activate(grpc_call_element *elem, grpc_child_channel *on_child) { call_data *calld = elem->call_data; + channel_data *chand = elem->channel_data; if (calld->state == CALL_CANCELLED) return 0; /* no more access to calld->s.waiting allowed */ GPR_ASSERT(calld->state == CALL_WAITING); + + if (calld->s.waiting_op.bind_pollset) { + grpc_transport_setup_del_interested_party(chand->transport_setup, + calld->s.waiting_op.bind_pollset); + } + calld->state = CALL_ACTIVE; /* create a child call */ @@ -199,6 +206,7 @@ static void cc_start_transport_op(grpc_call_element *elem, handle_op_after_cancellation(elem, op); } else { calld->state = CALL_WAITING; + calld->s.waiting_op.bind_pollset = NULL; if (chand->active_child) { /* channel is connected - use the connected stack */ if (prepare_activate(elem, chand->active_child)) { @@ -230,14 +238,14 @@ static void cc_start_transport_op(grpc_call_element *elem, } calld->s.waiting_op = *op; chand->waiting_children[chand->waiting_child_count++] = calld; + grpc_transport_setup_add_interested_party(chand->transport_setup, + op->bind_pollset); gpr_mu_unlock(&chand->mu); /* finally initiate transport setup if needed */ if (initiate_transport_setup) { grpc_transport_setup_initiate(chand->transport_setup); } - grpc_transport_setup_add_interested_party(chand->transport_setup, - op->bind_pollset); } } break; diff --git a/src/core/channel/client_setup.c b/src/core/channel/client_setup.c index 8a318eaa86..42ee23d87f 100644 --- a/src/core/channel/client_setup.c +++ b/src/core/channel/client_setup.c @@ -56,12 +56,12 @@ struct grpc_client_setup { gpr_cv cv; grpc_client_setup_request *active_request; int refs; + grpc_pollset_set interested_parties; }; struct grpc_client_setup_request { /* pointer back to the setup object */ grpc_client_setup *setup; - grpc_pollset_set interested_parties; gpr_timespec deadline; }; @@ -71,7 +71,7 @@ gpr_timespec grpc_client_setup_request_deadline(grpc_client_setup_request *r) { grpc_pollset_set *grpc_client_setup_get_interested_parties( grpc_client_setup_request *r) { - return &r->interested_parties; + return &r->setup->interested_parties; } static void destroy_setup(grpc_client_setup *s) { @@ -79,11 +79,11 @@ static void destroy_setup(grpc_client_setup *s) { gpr_cv_destroy(&s->cv); s->done(s->user_data); grpc_channel_args_destroy(s->args); + grpc_pollset_set_destroy(&s->interested_parties); gpr_free(s); } static void destroy_request(grpc_client_setup_request *r) { - grpc_pollset_set_destroy(&r->interested_parties); gpr_free(r); } @@ -94,7 +94,6 @@ static void setup_initiate(grpc_transport_setup *sp) { int in_alarm = 0; r->setup = s; - grpc_pollset_set_init(&r->interested_parties); /* TODO(klempner): Actually set a deadline */ r->deadline = gpr_time_add(gpr_now(), gpr_time_from_seconds(60)); @@ -125,12 +124,8 @@ static void setup_add_interested_party(grpc_transport_setup *sp, grpc_client_setup *s = (grpc_client_setup *)sp; gpr_mu_lock(&s->mu); - if (!s->active_request) { - gpr_mu_unlock(&s->mu); - return; - } - grpc_pollset_set_add_pollset(&s->active_request->interested_parties, pollset); + grpc_pollset_set_add_pollset(&s->interested_parties, pollset); gpr_mu_unlock(&s->mu); } @@ -140,12 +135,8 @@ static void setup_del_interested_party(grpc_transport_setup *sp, grpc_client_setup *s = (grpc_client_setup *)sp; gpr_mu_lock(&s->mu); - if (!s->active_request) { - gpr_mu_unlock(&s->mu); - return; - } - grpc_pollset_set_del_pollset(&s->active_request->interested_parties, pollset); + grpc_pollset_set_del_pollset(&s->interested_parties, pollset); gpr_mu_unlock(&s->mu); } @@ -225,6 +216,7 @@ void grpc_client_setup_create_and_attach( s->in_alarm = 0; s->in_cb = 0; s->cancelled = 0; + grpc_pollset_set_init(&s->interested_parties); grpc_client_channel_set_transport_setup(newly_minted_channel, &s->base); } diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index c701abb91d..db704b9df1 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -99,6 +99,7 @@ void grpc_pollset_init(grpc_pollset *pollset) { grpc_pollset_kick_init(&pollset->kick_state); pollset->in_flight_cbs = 0; pollset->shutting_down = 0; + pollset->called_shutdown = 0; become_basic_pollset(pollset, NULL); } @@ -141,7 +142,8 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { if (pollset->shutting_down) { if (pollset->counter > 0) { grpc_pollset_kick(pollset); - } else if (pollset->in_flight_cbs == 0) { + } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) { + pollset->called_shutdown = 1; gpr_mu_unlock(&pollset->mu); finish_shutdown(pollset); /* Continuing to access pollset here is safe -- it is the caller's @@ -157,21 +159,22 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { void grpc_pollset_shutdown(grpc_pollset *pollset, void (*shutdown_done)(void *arg), void *shutdown_done_arg) { - int in_flight_cbs; - int counter; + int call_shutdown = 0; gpr_mu_lock(&pollset->mu); GPR_ASSERT(!pollset->shutting_down); pollset->shutting_down = 1; - in_flight_cbs = pollset->in_flight_cbs; - counter = pollset->counter; + if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 && pollset->counter == 0) { + pollset->called_shutdown = 1; + call_shutdown = 1; + } pollset->shutdown_done_cb = shutdown_done; pollset->shutdown_done_arg = shutdown_done_arg; - if (counter > 0) { + if (pollset->counter > 0) { grpc_pollset_kick(pollset); } gpr_mu_unlock(&pollset->mu); - if (in_flight_cbs == 0 && counter == 0) { + if (call_shutdown) { finish_shutdown(pollset); } } diff --git a/src/core/iomgr/pollset_posix.h b/src/core/iomgr/pollset_posix.h index 2b897caa4b..92c258e0cd 100644 --- a/src/core/iomgr/pollset_posix.h +++ b/src/core/iomgr/pollset_posix.h @@ -56,6 +56,7 @@ typedef struct grpc_pollset { int counter; int in_flight_cbs; int shutting_down; + int called_shutdown; void (*shutdown_done_cb)(void *arg); void *shutdown_done_arg; union { diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 3803e648b4..641c1cd435 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -99,6 +99,8 @@ typedef enum { /* Status came from 'the wire' - or somewhere below the surface layer */ STATUS_FROM_WIRE, + /* Status came from the server sending status */ + STATUS_FROM_SERVER_STATUS, STATUS_SOURCE_COUNT } status_source; @@ -581,10 +583,18 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, call->write_state = WRITE_STATE_WRITE_CLOSED; } break; + case GRPC_IOREQ_SEND_STATUS: + if (call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details != + NULL) { + grpc_mdstr_unref( + call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details); + call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details = + NULL; + } + break; case GRPC_IOREQ_RECV_CLOSE: case GRPC_IOREQ_SEND_INITIAL_METADATA: case GRPC_IOREQ_SEND_TRAILING_METADATA: - case GRPC_IOREQ_SEND_STATUS: case GRPC_IOREQ_SEND_CLOSE: break; case GRPC_IOREQ_RECV_STATUS: @@ -907,8 +917,9 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { call->metadata_context, grpc_mdstr_ref( grpc_channel_get_message_string(call->channel)), - grpc_mdstr_from_string(call->metadata_context, - data.send_status.details))); + data.send_status.details)); + call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details = + NULL; } grpc_sopb_add_metadata(&call->send_ops, mdb); } @@ -1008,6 +1019,14 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, GRPC_CALL_ERROR_INVALID_METADATA); } } + if (op == GRPC_IOREQ_SEND_STATUS) { + set_status_code(call, STATUS_FROM_SERVER_STATUS, + reqs[i].data.send_status.code); + if (reqs[i].data.send_status.details) { + set_status_details(call, STATUS_FROM_SERVER_STATUS, + grpc_mdstr_ref(reqs[i].data.send_status.details)); + } + } have_ops |= 1u << op; call->request_data[op] = data; @@ -1283,7 +1302,11 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, req->op = GRPC_IOREQ_SEND_STATUS; req->data.send_status.code = op->data.send_status_from_server.status; req->data.send_status.details = - op->data.send_status_from_server.status_details; + op->data.send_status_from_server.status_details != NULL + ? grpc_mdstr_from_string( + call->metadata_context, + op->data.send_status_from_server.status_details) + : NULL; req = &reqs[out++]; req->op = GRPC_IOREQ_SEND_CLOSE; break; diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 17db8c2cdc..7ea6cd7fa9 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -72,7 +72,7 @@ typedef union { grpc_byte_buffer *send_message; struct { grpc_status_code code; - const char *details; + grpc_mdstr *details; } send_status; } grpc_ioreq_data; diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 525fe2e103..3671efe0d0 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -1010,7 +1010,7 @@ void grpc_server_destroy(grpc_server *server) { listener *l; gpr_mu_lock(&server->mu); - GPR_ASSERT(server->shutdown); + GPR_ASSERT(server->shutdown || !server->listeners); GPR_ASSERT(server->listeners_destroyed == num_listeners(server)); while (server->listeners) { diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h index 5496504229..5215cc87b1 100644 --- a/src/core/transport/stream_op.h +++ b/src/core/transport/stream_op.h @@ -58,7 +58,7 @@ typedef enum grpc_stream_op_code { GRPC_OP_SLICE } grpc_stream_op_code; -/* Arguments for GRPC_OP_BEGIN */ +/* Arguments for GRPC_OP_BEGIN_MESSAGE */ typedef struct grpc_begin_message { /* How many bytes of data will this message contain */ gpr_uint32 length; diff --git a/src/php/README.md b/src/php/README.md index 40c79e0dd4..cb9b48aee3 100644 --- a/src/php/README.md +++ b/src/php/README.md @@ -7,51 +7,122 @@ This directory contains source code for PHP implementation of gRPC layered on sh Pre-Alpha : This gRPC PHP implementation is work-in-progress and is not expected to work yet. - -## LAYOUT - -Directory structure is as generated by the PHP utility -[ext_skel](http://php.net/manual/en/internals2.buildsys.skeleton.php) - ## ENVIRONMENT Install `php5` and `php5-dev`. -To run the tests, additionally install `php5-readline` and `phpunit`. +To run the tests, additionally install `phpunit`. Alternatively, build and install PHP 5.5 or later from source with standard configuration options. -To also download and install protoc and the PHP code generator. +## Build from Homebrew + +On Mac OS X, install [homebrew][]. On Linux, install [linuxbrew][]. Run the following command to +install gRPC. + +```sh +$ curl -fsSL https://goo.gl/getgrpc | bash -s php +``` + +This will download and run the [gRPC install script][] and compile the gRPC PHP extension. + +## Build from Source + +Clone this repository + +``` +$ git clone https://github.com/grpc/grpc.git +``` + +Build and install the Protocol Buffers compiler (protoc) + +``` +$ cd grpc +$ git pull --recurse-submodules && git submodule update --init --recursive +$ cd third_party/protobuf +$ ./autogen.sh +$ ./configure +$ make +$ make check +$ sudo make install +``` + +Build and install the gRPC C core + +```sh +$ cd grpc +$ make +$ sudo make install +``` + +Build the gRPC PHP extension -```bash -apt-get install -y procps -curl -sSL https://get.rvm.io | sudo bash -s stable --ruby -git clone git@github.com:google/protobuf.git -cd protobuf -./configure -make -make install -git clone git@github.com:murgatroid99/Protobuf-PHP.git -cd Protobuf-PHP -rake pear:package version=1.0 -pear install Protobuf-1.0.tgz +```sh +$ cd grpc/src/php/ext/grpc +$ phpize +$ ./configure +$ make +$ sudo make install ``` -## BUILDING +In your php.ini file, add the line `extension=grpc.so` to load the extension +at PHP startup. - 1. In ./ext/grpc, run the command `phpize` (distributed with PHP) - 2. Run `./ext/grpc/configure` - 3. In ./ext/grpc, run `make` and `sudo make install` - 4. In your php.ini file, add the line `extension=grpc.so` to load the - extension at PHP startup. +Install Composer -## PHPUnit +```sh +$ cd grpc/src/php +$ curl -sS https://getcomposer.org/installer | php +$ php composer.phar install +``` + +## Unit Tests + +Run unit tests + +```sh +$ cd grpc/src/php +$ ./bin/run_tests.sh +``` + +## Generated Code Tests + +Install `protoc-gen-php` + +```sh +$ cd grpc/src/php/vendor/datto/protobuf-php +$ gem install rake ronn +$ rake pear:package version=1.0 +$ sudo pear install Protobuf-1.0.tgz +``` + +Generate client stub code + +```sh +$ cd grpc/src/php +$ ./bin/generate_proto_php.sh +``` + +Run a local server serving the math services + + - Please see [Node][] on how to run an example server + +```sh +$ cd grpc/src/node +$ npm install +$ nodejs examples/math_server.js +``` + +Run the generated code tests + +```sh +$ cd grpc/src/php +$ ./bin/run_gen_code_test.sh +``` -This repo now has PHPUnit tests, which can by run by executing -`./bin/run_tests.sh` after building. +[homebrew]:http://brew.sh +[linuxbrew]:https://github.com/Homebrew/linuxbrew#installation +[gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install +[Node]:https://github.com/grpc/grpc/tree/master/src/node/examples -There is also a generated code test (`./bin/run_gen_code_test.sh`), which tests -the stub `./tests/generated_code/math.php` against a running localhost server -serving the math service. That stub is generated from -`./tests/generated_code/math.proto`. diff --git a/src/php/bin/run_gen_code_test.sh b/src/php/bin/run_gen_code_test.sh index 4882a2b846..1be2ed3f72 100755 --- a/src/php/bin/run_gen_code_test.sh +++ b/src/php/bin/run_gen_code_test.sh @@ -30,8 +30,8 @@ cd $(dirname $0) GRPC_TEST_HOST=localhost:50051 php -d extension_dir=../ext/grpc/modules/ \ - -d extension=grpc.so /usr/local/bin/phpunit -v --debug --strict \ + -d extension=grpc.so `which phpunit` -v --debug --strict \ ../tests/generated_code/GeneratedCodeTest.php GRPC_TEST_HOST=localhost:50051 php -d extension_dir=../ext/grpc/modules/ \ - -d extension=grpc.so /usr/local/bin/phpunit -v --debug --strict \ + -d extension=grpc.so `which phpunit` -v --debug --strict \ ../tests/generated_code/GeneratedCodeWithCallbackTest.php diff --git a/src/php/composer.json b/src/php/composer.json index ba7a1302f2..b0115bdadd 100644 --- a/src/php/composer.json +++ b/src/php/composer.json @@ -4,8 +4,15 @@ "version": "0.5.0", "homepage": "http://grpc.io", "license": "BSD-3-Clause", + "repositories": [ + { + "type": "vcs", + "url": "https://github.com/stanley-cheung/Protobuf-PHP" + } + ], "require": { "php": ">=5.5.0", + "datto/protobuf-php": "dev-master", "google/auth": "dev-master" }, "autoload": { diff --git a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php index 2d2352b199..6102aaf0a8 100644 --- a/src/php/tests/generated_code/AbstractGeneratedCodeTest.php +++ b/src/php/tests/generated_code/AbstractGeneratedCodeTest.php @@ -32,8 +32,6 @@ * */ require_once realpath(dirname(__FILE__) . '/../../vendor/autoload.php'); -require 'DrSlump/Protobuf.php'; -\DrSlump\Protobuf::autoload(); require 'math.php'; abstract class AbstractGeneratedCodeTest extends PHPUnit_Framework_TestCase { /* These tests require that a server exporting the math service must be diff --git a/src/php/tests/interop/interop_client.php b/src/php/tests/interop/interop_client.php index bf8d0cd93c..9aee01cd4d 100755 --- a/src/php/tests/interop/interop_client.php +++ b/src/php/tests/interop/interop_client.php @@ -32,8 +32,6 @@ * */ require_once realpath(dirname(__FILE__) . '/../../vendor/autoload.php'); -require 'DrSlump/Protobuf.php'; -\DrSlump\Protobuf::autoload(); require 'empty.php'; require 'message_set.php'; require 'messages.php'; diff --git a/src/python/README.md b/src/python/README.md index 0bca457a33..2beb3a913a 100644 --- a/src/python/README.md +++ b/src/python/README.md @@ -20,6 +20,10 @@ $ curl -fsSL https://goo.gl/getgrpc | bash -s python ``` This will download and run the [gRPC install script][], then install the latest version of the gRPC Python package. It also installs the Protocol Buffers compiler (_protoc_) and the gRPC _protoc_ plugin for python. +EXAMPLES +-------- +Please read our online documentation for a [Quick Start][] and a [detailed example][] + BUILDING FROM SOURCE --------------------- - Clone this repository @@ -58,3 +62,5 @@ $ ../../tools/distrib/python/submit.py [homebrew]:http://brew.sh [linuxbrew]:https://github.com/Homebrew/linuxbrew#installation [gRPC install script]:https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install +[Quick Start]:http://www.grpc.io/docs/tutorials/basic/python.html +[detailed example]:http://www.grpc.io/docs/installation/python.html diff --git a/src/python/src/README.rst b/src/python/src/README.rst index bc1815febc..00bdecf56f 100644 --- a/src/python/src/README.rst +++ b/src/python/src/README.rst @@ -6,22 +6,18 @@ Package for GRPC Python. Dependencies ------------ -Ensure that you have installed GRPC core. - -On debian linux systems, install from our released deb package: +Ensure you have installed the gRPC core. On Mac OS X, install homebrew_. On Linux, install linuxbrew_. +Run the following command to install gRPC Python. :: - $ wget https://github.com/grpc/grpc/releases/download/release-0_5_0/libgrpc_0.5.0_amd64.deb - $ wget https://github.com/grpc/grpc/releases/download/release-0_5_0/libgrpc-dev_0.5.0_amd64.deb - $ sudo dpkg -i libgrpc_0.5.0_amd64.deb libgrpc-dev_0.5.0_amd64.deb - -Otherwise, install from source: + $ curl -fsSL https://goo.gl/getgrpc | bash -s python -:: +This will download and run the [gRPC install script][] to install grpc core. The script then uses pip to install this package. It also installs the Protocol Buffers compiler (_protoc_) and the gRPC _protoc_ plugin for python. - git clone https://github.com/grpc/grpc.git - cd grpc - ./configure - make && make install +Otherwise, `install from source`_ +.. _`install from source`: https://github.com/grpc/grpc/blob/master/src/python/README.md#building-from-source +.. _homebrew: http://brew.sh +.. _linuxbrew: https://github.com/Homebrew/linuxbrew#installation +.. _`gRPC install script`: https://raw.githubusercontent.com/grpc/homebrew-grpc/master/scripts/install diff --git a/src/python/src/grpc/_adapter/_c/types/server.c b/src/python/src/grpc/_adapter/_c/types/server.c index 65d84b58fe..26b38da8f1 100644 --- a/src/python/src/grpc/_adapter/_c/types/server.c +++ b/src/python/src/grpc/_adapter/_c/types/server.c @@ -167,17 +167,13 @@ PyObject *pygrpc_Server_start(Server *self, PyObject *ignored) { PyObject *pygrpc_Server_shutdown( Server *self, PyObject *args, PyObject *kwargs) { - PyObject *user_tag = NULL; + PyObject *user_tag; pygrpc_tag *tag; static char *keywords[] = {"tag", NULL}; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|O", keywords, &user_tag)) { + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O", keywords, &user_tag)) { return NULL; } - if (user_tag) { - tag = pygrpc_produce_server_shutdown_tag(user_tag); - grpc_server_shutdown_and_notify(self->c_serv, tag); - } else { - grpc_server_shutdown(self->c_serv); - } + tag = pygrpc_produce_server_shutdown_tag(user_tag); + grpc_server_shutdown_and_notify(self->c_serv, self->cq->c_cq, tag); Py_RETURN_NONE; } diff --git a/src/python/src/grpc/_adapter/_c/utility.c b/src/python/src/grpc/_adapter/_c/utility.c index 6722b53f84..cfb8270832 100644 --- a/src/python/src/grpc/_adapter/_c/utility.c +++ b/src/python/src/grpc/_adapter/_c/utility.c @@ -40,6 +40,8 @@ #include <grpc/support/alloc.h> #include <grpc/support/slice.h> #include <grpc/support/time.h> +#include <grpc/support/string_util.h> +#include <grpc/support/log.h> #include "grpc/_adapter/_c/types.h" @@ -122,7 +124,8 @@ PyObject *pygrpc_consume_event(grpc_event event) { event.success ? Py_True : Py_False); } else { result = Py_BuildValue("iOOONO", GRPC_OP_COMPLETE, tag->user_tag, - tag->call, Py_None, pygrpc_consume_ops(tag->ops, tag->nops), + tag->call ? tag->call : Py_None, Py_None, + pygrpc_consume_ops(tag->ops, tag->nops), event.success ? Py_True : Py_False); } break; @@ -156,9 +159,10 @@ int pygrpc_produce_op(PyObject *op, grpc_op *result) { return 0; } if (PyTuple_Size(op) != OP_TUPLE_SIZE) { - char buf[64]; - snprintf(buf, sizeof(buf), "expected tuple op of length %d", OP_TUPLE_SIZE); + char *buf; + gpr_asprintf(&buf, "expected tuple op of length %d", OP_TUPLE_SIZE); PyErr_SetString(PyExc_ValueError, buf); + gpr_free(buf); return 0; } type = PyInt_AsLong(PyTuple_GET_ITEM(op, TYPE_INDEX)); @@ -353,9 +357,14 @@ double pygrpc_cast_gpr_timespec_to_double(gpr_timespec timespec) { return timespec.tv_sec + 1e-9*timespec.tv_nsec; } +/* Because C89 doesn't have a way to check for infinity... */ +static int pygrpc_isinf(double x) { + return x * 0 != 0; +} + gpr_timespec pygrpc_cast_double_to_gpr_timespec(double seconds) { gpr_timespec result; - if isinf(seconds) { + if (pygrpc_isinf(seconds)) { result = seconds > 0.0 ? gpr_inf_future : gpr_inf_past; } else { result.tv_sec = (time_t)seconds; diff --git a/src/python/src/grpc/_adapter/_intermediary_low.py b/src/python/src/grpc/_adapter/_intermediary_low.py index a6e325c4e5..6b96aef1d3 100644 --- a/src/python/src/grpc/_adapter/_intermediary_low.py +++ b/src/python/src/grpc/_adapter/_intermediary_low.py @@ -100,7 +100,7 @@ class _TagAdapter(collections.namedtuple('_TagAdapter', [ class Call(object): """Adapter from old _low.Call interface to new _low.Call.""" - + def __init__(self, channel, completion_queue, method, host, deadline): self._internal = channel._internal.create_call( completion_queue._internal, method, host, deadline) @@ -207,7 +207,7 @@ class CompletionQueue(object): complete_accepted = ev.success if kind == Event.Kind.COMPLETE_ACCEPTED else None service_acceptance = ServiceAcceptance(Call._from_internal(ev.call), ev.call_details.method, ev.call_details.host, ev.call_details.deadline) if kind == Event.Kind.SERVICE_ACCEPTED else None message_bytes = ev.results[0].message if kind == Event.Kind.READ_ACCEPTED else None - status = Status(ev.results[0].status.code, ev.results[0].status.details) if (kind == Event.Kind.FINISH and ev.results[0].status) else Status(_types.StatusCode.CANCELLED if ev.results[0].cancelled else _types.StatusCode.OK, '') if ev.results[0].cancelled is not None else None + status = Status(ev.results[0].status.code, ev.results[0].status.details) if (kind == Event.Kind.FINISH and ev.results[0].status) else Status(_types.StatusCode.CANCELLED if ev.results[0].cancelled else _types.StatusCode.OK, '') if len(ev.results) > 0 and ev.results[0].cancelled is not None else None metadata = ev.results[0].initial_metadata if (kind in [Event.Kind.SERVICE_ACCEPTED, Event.Kind.METADATA_ACCEPTED]) else (ev.results[0].trailing_metadata if kind == Event.Kind.FINISH else None) else: raise RuntimeError('unknown event') @@ -241,7 +241,7 @@ class Server(object): return self._internal.request_call(self._internal_cq, _TagAdapter(tag, Event.Kind.SERVICE_ACCEPTED)) def stop(self): - return self._internal.shutdown() + return self._internal.shutdown(_TagAdapter(None, Event.Kind.STOP)) class ClientCredentials(object): @@ -253,6 +253,6 @@ class ClientCredentials(object): class ServerCredentials(object): """Adapter from old _low.ServerCredentials interface to new _low.ServerCredentials.""" - + def __init__(self, root_credentials, pair_sequence): self._internal = _low.ServerCredentials.ssl(root_credentials, list(pair_sequence)) diff --git a/src/python/src/grpc/_adapter/_intermediary_low_test.py b/src/python/src/grpc/_adapter/_intermediary_low_test.py index 6ff51c43a6..478346341b 100644 --- a/src/python/src/grpc/_adapter/_intermediary_low_test.py +++ b/src/python/src/grpc/_adapter/_intermediary_low_test.py @@ -94,14 +94,6 @@ class EchoTest(unittest.TestCase): def tearDown(self): self.server.stop() - # NOTE(nathaniel): Yep, this is weird; it's a consequence of - # grpc_server_destroy's being what has the effect of telling the server's - # completion queue to pump out all pending events/tags immediately rather - # than gracefully completing all outstanding RPCs while accepting no new - # ones. - # TODO(nathaniel): Deallocation of a Python object shouldn't have this kind - # of observable side effect let alone such an important one. - del self.server self.server_completion_queue.stop() self.client_completion_queue.stop() while True: @@ -114,6 +106,7 @@ class EchoTest(unittest.TestCase): break self.server_completion_queue = None self.client_completion_queue = None + del self.server def _perform_echo_test(self, test_data): method = 'test method' @@ -316,7 +309,6 @@ class CancellationTest(unittest.TestCase): def tearDown(self): self.server.stop() - del self.server self.server_completion_queue.stop() self.client_completion_queue.stop() while True: @@ -327,6 +319,7 @@ class CancellationTest(unittest.TestCase): event = self.client_completion_queue.get(0) if event is not None and event.kind is _low.Event.Kind.STOP: break + del self.server def testCancellation(self): method = 'test method' diff --git a/src/python/src/grpc/_adapter/_low.py b/src/python/src/grpc/_adapter/_low.py index 0c1d3b40a5..dcf67dbc11 100644 --- a/src/python/src/grpc/_adapter/_low.py +++ b/src/python/src/grpc/_adapter/_low.py @@ -101,11 +101,8 @@ class Server(_types.Server): def start(self): return self.server.start() - def shutdown(self, tag=_NO_TAG): - if tag is _NO_TAG: - return self.server.shutdown() - else: - return self.server.shutdown(tag) + def shutdown(self, tag=None): + return self.server.shutdown(tag) def request_call(self, completion_queue, tag): return self.server.request_call(completion_queue.completion_queue, tag) diff --git a/src/python/src/grpc/_adapter/_low_test.py b/src/python/src/grpc/_adapter/_low_test.py index e53b176caf..8a9f1a0c49 100644 --- a/src/python/src/grpc/_adapter/_low_test.py +++ b/src/python/src/grpc/_adapter/_low_test.py @@ -48,7 +48,6 @@ class InsecureServerInsecureClient(unittest.TestCase): def tearDown(self): self.server.shutdown() del self.client_channel - del self.server self.client_completion_queue.shutdown() while self.client_completion_queue.next().type != _types.EventType.QUEUE_SHUTDOWN: @@ -59,6 +58,7 @@ class InsecureServerInsecureClient(unittest.TestCase): del self.client_completion_queue del self.server_completion_queue + del self.server def testEcho(self): DEADLINE = time.time()+5 |