diff options
Diffstat (limited to 'src')
30 files changed, 177 insertions, 123 deletions
diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c index fe7ea6a86b..6e4156c385 100644 --- a/src/core/httpcli/httpcli.c +++ b/src/core/httpcli/httpcli.c @@ -67,7 +67,6 @@ static grpc_httpcli_post_override g_post_override = NULL; static void next_address(internal_request *req); static void finish(internal_request *req, int success) { - gpr_log(GPR_DEBUG, "%s", __FUNCTION__); req->on_response(req->user_data, success ? &req->parser.r : NULL); grpc_httpcli_parser_destroy(&req->parser); if (req->addresses != NULL) { @@ -86,8 +85,6 @@ static void on_read(void *user_data, gpr_slice *slices, size_t nslices, internal_request *req = user_data; size_t i; - gpr_log(GPR_DEBUG, "%s nslices=%d status=%d", __FUNCTION__, nslices, status); - for (i = 0; i < nslices; i++) { if (GPR_SLICE_LENGTH(slices[i])) { req->have_read_byte = 1; @@ -120,13 +117,11 @@ done: } static void on_written(internal_request *req) { - gpr_log(GPR_DEBUG, "%s", __FUNCTION__); grpc_endpoint_notify_on_read(req->ep, on_read, req); } static void done_write(void *arg, grpc_endpoint_cb_status status) { internal_request *req = arg; - gpr_log(GPR_DEBUG, "%s", __FUNCTION__); switch (status) { case GRPC_ENDPOINT_CB_OK: on_written(req); @@ -141,7 +136,6 @@ static void done_write(void *arg, grpc_endpoint_cb_status status) { static void start_write(internal_request *req) { gpr_slice_ref(req->request_text); - gpr_log(GPR_DEBUG, "%s", __FUNCTION__); switch ( grpc_endpoint_write(req->ep, &req->request_text, 1, done_write, req)) { case GRPC_ENDPOINT_WRITE_DONE: @@ -159,7 +153,6 @@ static void on_secure_transport_setup_done(void *rp, grpc_security_status status, grpc_endpoint *secure_endpoint) { internal_request *req = rp; - gpr_log(GPR_DEBUG, "%s", __FUNCTION__); if (status != GRPC_SECURITY_OK) { gpr_log(GPR_ERROR, "Secure transport setup failed with error %d.", status); finish(req, 0); @@ -172,7 +165,6 @@ static void on_secure_transport_setup_done(void *rp, static void on_connected(void *arg, grpc_endpoint *tcp) { internal_request *req = arg; - gpr_log(GPR_DEBUG, "%s", __FUNCTION__); if (!tcp) { next_address(req); return; @@ -200,7 +192,6 @@ static void on_connected(void *arg, grpc_endpoint *tcp) { static void next_address(internal_request *req) { grpc_resolved_address *addr; - gpr_log(GPR_DEBUG, "%s", __FUNCTION__); if (req->next_address == req->addresses->naddrs) { finish(req, 0); return; @@ -212,7 +203,6 @@ static void next_address(internal_request *req) { static void on_resolved(void *arg, grpc_resolved_addresses *addresses) { internal_request *req = arg; - gpr_log(GPR_DEBUG, "%s", __FUNCTION__); if (!addresses) { finish(req, 0); return; diff --git a/src/core/iomgr/sockaddr_utils.c b/src/core/iomgr/sockaddr_utils.c index 740bbe716e..3d202a5cc8 100644 --- a/src/core/iomgr/sockaddr_utils.c +++ b/src/core/iomgr/sockaddr_utils.c @@ -169,8 +169,7 @@ int grpc_sockaddr_get_port(const struct sockaddr *addr) { case AF_UNIX: return 1; default: - gpr_log(GPR_ERROR, "Unknown socket family %d in %s", addr->sa_family, - __FUNCTION__); + gpr_log(GPR_ERROR, "Unknown socket family %d in grpc_sockaddr_get_port", addr->sa_family); return 0; } } @@ -184,8 +183,7 @@ int grpc_sockaddr_set_port(const struct sockaddr *addr, int port) { ((struct sockaddr_in6 *)addr)->sin6_port = htons(port); return 1; default: - gpr_log(GPR_ERROR, "Unknown socket family %d in %s", addr->sa_family, - __FUNCTION__); + gpr_log(GPR_ERROR, "Unknown socket family %d in grpc_sockaddr_set_port", addr->sa_family); return 0; } } diff --git a/src/core/support/subprocess_posix.c b/src/core/support/subprocess_posix.c index 642520bb47..b4631fa0ed 100644 --- a/src/core/support/subprocess_posix.c +++ b/src/core/support/subprocess_posix.c @@ -57,7 +57,7 @@ struct gpr_subprocess { const char *gpr_subprocess_binary_extension() { return ""; } -gpr_subprocess *gpr_subprocess_create(int argc, char **argv) { +gpr_subprocess *gpr_subprocess_create(int argc, const char **argv) { gpr_subprocess *r; int pid; char **exec_args; @@ -92,7 +92,11 @@ void gpr_subprocess_destroy(gpr_subprocess *p) { int gpr_subprocess_join(gpr_subprocess *p) { int status; +retry: if (waitpid(p->pid, &status, 0) == -1) { + if (errno == EINTR) { + goto retry; + } gpr_log(GPR_ERROR, "waitpid failed: %s", strerror(errno)); return -1; } diff --git a/src/core/surface/call.c b/src/core/surface/call.c index ec6fd65ea3..4d2ba7cd7d 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -536,9 +536,8 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, switch ((grpc_ioreq_op)i) { case GRPC_IOREQ_RECV_MESSAGE: case GRPC_IOREQ_SEND_MESSAGE: - if (master->success) { - call->request_set[i] = REQSET_EMPTY; - } else { + call->request_set[i] = REQSET_EMPTY; + if (!master->success) { call->write_state = WRITE_STATE_WRITE_CLOSED; } break; @@ -583,11 +582,29 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, int success) { } } +static void early_out_write_ops(grpc_call *call) { + switch (call->write_state) { + case WRITE_STATE_WRITE_CLOSED: + finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0); + finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0); + finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0); + finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1); + /* fallthrough */ + case WRITE_STATE_STARTED: + finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0); + /* fallthrough */ + case WRITE_STATE_INITIAL: + /* do nothing */ + break; + } +} + static void call_on_done_send(void *pc, int success) { grpc_call *call = pc; lock(call); if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) { finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, success); + call->write_state = WRITE_STATE_STARTED; } if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_MESSAGE)) { finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, success); @@ -596,6 +613,11 @@ static void call_on_done_send(void *pc, int success) { finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, success); finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, success); finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1); + call->write_state = WRITE_STATE_WRITE_CLOSED; + } + if (!success) { + call->write_state = WRITE_STATE_WRITE_CLOSED; + early_out_write_ops(call); } call->send_ops.nops = 0; call->last_send_contains = 0; @@ -811,7 +833,6 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { op->send_ops = &call->send_ops; op->bind_pollset = grpc_cq_pollset(call->cq); call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA; - call->write_state = WRITE_STATE_STARTED; call->send_initial_metadata_count = 0; /* fall through intended */ case WRITE_STATE_STARTED: @@ -827,7 +848,6 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { op->is_last_send = 1; op->send_ops = &call->send_ops; call->last_send_contains |= 1 << GRPC_IOREQ_SEND_CLOSE; - call->write_state = WRITE_STATE_WRITE_CLOSED; if (!call->is_client) { /* send trailing metadata */ data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA]; @@ -919,23 +939,6 @@ static void finish_read_ops(grpc_call *call) { } } -static void early_out_write_ops(grpc_call *call) { - switch (call->write_state) { - case WRITE_STATE_WRITE_CLOSED: - finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0); - finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0); - finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0); - finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1); - /* fallthrough */ - case WRITE_STATE_STARTED: - finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0); - /* fallthrough */ - case WRITE_STATE_INITIAL: - /* do nothing */ - break; - } -} - static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, size_t nreqs, grpc_ioreq_completion_func completion, @@ -1176,6 +1179,10 @@ static void set_cancelled_value(grpc_status_code status, void *dest) { } static void finish_batch(grpc_call *call, int success, void *tag) { + grpc_cq_end_op(call->cq, tag, call, success); +} + +static void finish_batch_with_close(grpc_call *call, int success, void *tag) { grpc_cq_end_op(call->cq, tag, call, 1); } @@ -1186,6 +1193,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, size_t out; const grpc_op *op; grpc_ioreq *req; + void (*finish_func)(grpc_call *, int, void *) = finish_batch; GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag); @@ -1269,6 +1277,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, op->data.recv_status_on_client.trailing_metadata; req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_CLOSE; + finish_func = finish_batch_with_close; break; case GRPC_OP_RECV_CLOSE_ON_SERVER: req = &reqs[out++]; @@ -1278,13 +1287,14 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, op->data.recv_close_on_server.cancelled; req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_CLOSE; + finish_func = finish_batch_with_close; break; } } grpc_cq_begin_op(call->cq, call); - return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_batch, + return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func, tag); } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index d75af7291b..24a23ae5c4 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -665,7 +665,7 @@ void *grpc_server_register_method(grpc_server *server, const char *method, const char *host) { registered_method *m; if (!method) { - gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__); + gpr_log(GPR_ERROR, "grpc_server_register_method method string cannot be NULL"); return NULL; } for (m = server->registered_methods; m; m = m->next) { diff --git a/src/core/transport/chttp2/hpack_parser.c b/src/core/transport/chttp2/hpack_parser.c index 3fd8f67226..a489543868 100644 --- a/src/core/transport/chttp2/hpack_parser.c +++ b/src/core/transport/chttp2/hpack_parser.c @@ -654,7 +654,7 @@ static int parse_stream_weight(grpc_chttp2_hpack_parser *p, return 1; } - return parse_begin(p, cur + 1, end); + return p->after_prioritization(p, cur + 1, end); } static int parse_stream_dep3(grpc_chttp2_hpack_parser *p, const gpr_uint8 *cur, @@ -1349,7 +1349,7 @@ void grpc_chttp2_hpack_parser_init(grpc_chttp2_hpack_parser *p, } void grpc_chttp2_hpack_parser_set_has_priority(grpc_chttp2_hpack_parser *p) { - GPR_ASSERT(p->state == parse_begin); + p->after_prioritization = p->state; p->state = parse_stream_dep0; } diff --git a/src/core/transport/chttp2/hpack_parser.h b/src/core/transport/chttp2/hpack_parser.h index bb4c1a1f49..bfc06b3980 100644 --- a/src/core/transport/chttp2/hpack_parser.h +++ b/src/core/transport/chttp2/hpack_parser.h @@ -62,6 +62,8 @@ struct grpc_chttp2_hpack_parser { grpc_chttp2_hpack_parser_state state; /* future states dependent on the opening op code */ const grpc_chttp2_hpack_parser_state *next_state; + /* what to do after skipping prioritization data */ + grpc_chttp2_hpack_parser_state after_prioritization; /* the value we're currently parsing */ union { gpr_uint32 *value; diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 62f4020d7e..e66b4ed2d8 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -149,7 +149,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { } buf.AddServerSendStatus(&ctx_.trailing_metadata_, status); call_.PerformOps(&buf); - GPR_ASSERT(cq_.Pluck(&buf)); + cq_.Pluck(&buf); /* status ignored */ void* ignored_tag; bool ignored_ok; cq_.Shutdown(); diff --git a/src/node/examples/math_server.js b/src/node/examples/math_server.js index 3fac193d64..0a86e7eaff 100644 --- a/src/node/examples/math_server.js +++ b/src/node/examples/math_server.js @@ -119,7 +119,7 @@ var server = new Server({ }); if (require.main === module) { - server.bind('0.0.0.0:7070'); + server.bind('0.0.0.0:50051'); server.listen(); } diff --git a/src/node/examples/stock_server.js b/src/node/examples/stock_server.js index e475c9cb4c..8c22af147e 100644 --- a/src/node/examples/stock_server.js +++ b/src/node/examples/stock_server.js @@ -83,7 +83,7 @@ var stockServer = new StockServer({ }); if (require.main === module) { - stockServer.bind('0.0.0.0:8080'); + stockServer.bind('0.0.0.0:50051'); stockServer.listen(); } diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js index 80f811901c..455055d9f3 100644 --- a/src/node/interop/interop_client.js +++ b/src/node/interop/interop_client.js @@ -154,13 +154,15 @@ function serverStreaming(client, done) { arg.response_parameters[resp_index].size); resp_index += 1; }); - call.on('status', function(status) { - assert.strictEqual(status.code, grpc.status.OK); + call.on('end', function() { assert.strictEqual(resp_index, 4); if (done) { done(); } }); + call.on('status', function(status) { + assert.strictEqual(status.code, grpc.status.OK); + }); } /** diff --git a/src/node/src/client.js b/src/node/src/client.js index 46d476b9f4..65339406b2 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -81,7 +81,8 @@ function _write(chunk, encoding, callback) { batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk); this.call.startBatch(batch, function(err, event) { if (err) { - throw err; + // Something has gone wrong. Stop writing by failing to call callback + return; } callback(); }); @@ -120,10 +121,8 @@ function _read(size) { */ function readCallback(err, event) { if (err) { - throw err; - } - if (self.finished) { - self.push(null); + // Something has gone wrong. Stop reading and wait for status + self.finished = true; return; } var data = event.read; @@ -237,10 +236,6 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { client_batch[grpc.opType.RECV_MESSAGE] = true; client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; call.startBatch(client_batch, function(err, response) { - if (err) { - callback(err); - return; - } emitter.emit('status', response.status); if (response.status.code !== grpc.status.OK) { var error = new Error(response.status.details); @@ -248,6 +243,12 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { error.metadata = response.status.metadata; callback(error); return; + } else { + if (err) { + // Got a batch error, but OK status. Something went wrong + callback(err); + return; + } } emitter.emit('metadata', response.metadata); callback(null, deserialize(response.read)); @@ -300,7 +301,8 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true; call.startBatch(metadata_batch, function(err, response) { if (err) { - callback(err); + // The call has stopped for some reason. A non-OK status will arrive + // in the other batch. return; } stream.emit('metadata', response.metadata); @@ -309,10 +311,6 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { client_batch[grpc.opType.RECV_MESSAGE] = true; client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; call.startBatch(client_batch, function(err, response) { - if (err) { - callback(err); - return; - } stream.emit('status', response.status); if (response.status.code !== grpc.status.OK) { var error = new Error(response.status.details); @@ -320,6 +318,12 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { error.metadata = response.status.metadata; callback(error); return; + } else { + if (err) { + // Got a batch error, but OK status. Something went wrong + callback(err); + return; + } } callback(null, deserialize(response.read)); }); @@ -373,16 +377,15 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) { start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; call.startBatch(start_batch, function(err, response) { if (err) { - throw err; + // The call has stopped for some reason. A non-OK status will arrive + // in the other batch. + return; } stream.emit('metadata', response.metadata); }); var status_batch = {}; status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; call.startBatch(status_batch, function(err, response) { - if (err) { - throw err; - } stream.emit('status', response.status); if (response.status.code !== grpc.status.OK) { var error = new Error(response.status.details); @@ -390,6 +393,12 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) { error.metadata = response.status.metadata; stream.emit('error', error); return; + } else { + if (err) { + // Got a batch error, but OK status. Something went wrong + stream.emit('error', err); + return; + } } }); }); @@ -438,16 +447,15 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) { start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; call.startBatch(start_batch, function(err, response) { if (err) { - throw err; + // The call has stopped for some reason. A non-OK status will arrive + // in the other batch. + return; } stream.emit('metadata', response.metadata); }); var status_batch = {}; status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; call.startBatch(status_batch, function(err, response) { - if (err) { - throw err; - } stream.emit('status', response.status); if (response.status.code !== grpc.status.OK) { var error = new Error(response.status.details); @@ -455,6 +463,12 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) { error.metadata = response.status.metadata; stream.emit('error', error); return; + } else { + if (err) { + // Got a batch error, but OK status. Something went wrong + stream.emit('error', err); + return; + } } }); }); diff --git a/src/objective-c/README.md b/src/objective-c/README.md index cdba3777dd..49d7f43882 100644 --- a/src/objective-c/README.md +++ b/src/objective-c/README.md @@ -22,7 +22,7 @@ If you don't want to create the symbolic link, you can alternatively copy the bi Finally, run _protoc_ with the following flags to generate the client library for your `.proto` files: ```sh -protoc --objc_out=. --objcrpc_out=. *.proto +protoc --objc_out=. --objcgrpc_out=. *.proto ``` This will generate a pair of `.pbobjc.h`/`.pbobjc.m` files for each `.proto` file, with the messages and enums defined in them. And a pair of `.pbrpc.h`/`.pbrpc.m` files for each `.proto` file with services defined. The latter contains the code to make remote calls to the specified API. diff --git a/src/objective-c/examples/Sample/Podfile b/src/objective-c/examples/Sample/Podfile index d30d9c5210..e8b78647ac 100644 --- a/src/objective-c/examples/Sample/Podfile +++ b/src/objective-c/examples/Sample/Podfile @@ -2,7 +2,7 @@ source 'https://github.com/CocoaPods/Specs.git' platform :ios, '8.0' pod 'gRPC', :path => "../../../.." -pod 'Protobuf', :git => 'https://github.com/jcanizales/protobuf.git', :branch => 'add-podspec' +pod 'Protobuf', :git => 'https://github.com/google/protobuf.git' pod 'Route_guide', :path => "RouteGuideClient" pod 'RemoteTest', :path => "RemoteTestClient" diff --git a/src/python/src/grpc/_adapter/_face_test_case.py b/src/python/src/grpc/_adapter/_face_test_case.py index 923e889844..5fa974ed06 100644 --- a/src/python/src/grpc/_adapter/_face_test_case.py +++ b/src/python/src/grpc/_adapter/_face_test_case.py @@ -43,7 +43,7 @@ from grpc.framework.foundation import logging_pool _TIMEOUT = 3 _MAXIMUM_TIMEOUT = 90 -_MAXIMUM_POOL_SIZE = 400 +_MAXIMUM_POOL_SIZE = 4 class FaceTestCase(test_case.FaceTestCase, coverage.BlockingCoverage): diff --git a/src/python/src/grpc/_adapter/_links_test.py b/src/python/src/grpc/_adapter/_links_test.py index 4987be389a..4fd76f60f8 100644 --- a/src/python/src/grpc/_adapter/_links_test.py +++ b/src/python/src/grpc/_adapter/_links_test.py @@ -54,8 +54,8 @@ def _transform_metadata(unused_metadata): class RoundTripTest(unittest.TestCase): def setUp(self): - self.fore_link_pool = logging_pool.pool(80) - self.rear_link_pool = logging_pool.pool(80) + self.fore_link_pool = logging_pool.pool(8) + self.rear_link_pool = logging_pool.pool(8) def tearDown(self): self.rear_link_pool.shutdown(wait=True) diff --git a/src/python/src/grpc/_adapter/_lonely_rear_link_test.py b/src/python/src/grpc/_adapter/_lonely_rear_link_test.py index 25799d679c..bdb1ee2379 100644 --- a/src/python/src/grpc/_adapter/_lonely_rear_link_test.py +++ b/src/python/src/grpc/_adapter/_lonely_rear_link_test.py @@ -43,7 +43,7 @@ _TIMEOUT = 2 class LonelyRearLinkTest(unittest.TestCase): def setUp(self): - self.pool = logging_pool.pool(80) + self.pool = logging_pool.pool(8) def tearDown(self): self.pool.shutdown(wait=True) diff --git a/src/python/src/grpc/_adapter/fore.py b/src/python/src/grpc/_adapter/fore.py index 05016cdaf3..69e145e3f6 100644 --- a/src/python/src/grpc/_adapter/fore.py +++ b/src/python/src/grpc/_adapter/fore.py @@ -41,7 +41,7 @@ from grpc.framework.base import null from grpc.framework.foundation import activated from grpc.framework.foundation import logging_pool -_THREAD_POOL_SIZE = 100 +_THREAD_POOL_SIZE = 10 @enum.unique diff --git a/src/python/src/grpc/_adapter/rear.py b/src/python/src/grpc/_adapter/rear.py index dd0a486117..b3b0b4ed32 100644 --- a/src/python/src/grpc/_adapter/rear.py +++ b/src/python/src/grpc/_adapter/rear.py @@ -41,7 +41,7 @@ from grpc.framework.base import null from grpc.framework.foundation import activated from grpc.framework.foundation import logging_pool -_THREAD_POOL_SIZE = 100 +_THREAD_POOL_SIZE = 10 _INVOCATION_EVENT_KINDS = ( _low.Event.Kind.METADATA_ACCEPTED, diff --git a/src/python/src/grpc/early_adopter/implementations.py b/src/python/src/grpc/early_adopter/implementations.py index f3f2a043eb..10919fae69 100644 --- a/src/python/src/grpc/early_adopter/implementations.py +++ b/src/python/src/grpc/early_adopter/implementations.py @@ -41,7 +41,7 @@ from grpc.framework.base import util as _base_utilities from grpc.framework.face import implementations as _face_implementations from grpc.framework.foundation import logging_pool -_THREAD_POOL_SIZE = 80 +_THREAD_POOL_SIZE = 8 _ONE_DAY_IN_SECONDS = 24 * 60 * 60 diff --git a/src/python/src/grpc/framework/base/implementations_test.py b/src/python/src/grpc/framework/base/implementations_test.py index 11e49caf75..d40bb4d92e 100644 --- a/src/python/src/grpc/framework/base/implementations_test.py +++ b/src/python/src/grpc/framework/base/implementations_test.py @@ -36,7 +36,7 @@ from grpc.framework.base import interfaces_test_case from grpc.framework.base import util from grpc.framework.foundation import logging_pool -POOL_MAX_WORKERS = 100 +POOL_MAX_WORKERS = 10 DEFAULT_TIMEOUT = 30 MAXIMUM_TIMEOUT = 60 diff --git a/src/python/src/grpc/framework/face/_test_case.py b/src/python/src/grpc/framework/face/_test_case.py index b3a012db00..642d500628 100644 --- a/src/python/src/grpc/framework/face/_test_case.py +++ b/src/python/src/grpc/framework/face/_test_case.py @@ -35,7 +35,7 @@ from grpc.framework.face.testing import test_case from grpc.framework.foundation import logging_pool _TIMEOUT = 3 -_MAXIMUM_POOL_SIZE = 100 +_MAXIMUM_POOL_SIZE = 10 class FaceTestCase(test_case.FaceTestCase): diff --git a/src/python/src/grpc/framework/face/demonstration.py b/src/python/src/grpc/framework/face/demonstration.py index eabeac4569..f6b4b609ff 100644 --- a/src/python/src/grpc/framework/face/demonstration.py +++ b/src/python/src/grpc/framework/face/demonstration.py @@ -34,7 +34,7 @@ from grpc.framework.base import implementations as _base_implementations from grpc.framework.face import implementations from grpc.framework.foundation import logging_pool -_POOL_SIZE_LIMIT = 20 +_POOL_SIZE_LIMIT = 5 _MAXIMUM_TIMEOUT = 90 diff --git a/src/python/src/grpc/framework/face/testing/base_util.py b/src/python/src/grpc/framework/face/testing/base_util.py index 151d0ef793..1df1529b27 100644 --- a/src/python/src/grpc/framework/face/testing/base_util.py +++ b/src/python/src/grpc/framework/face/testing/base_util.py @@ -38,7 +38,7 @@ from grpc.framework.base import in_memory from grpc.framework.base import interfaces # pylint: disable=unused-import from grpc.framework.foundation import logging_pool -_POOL_SIZE_LIMIT = 20 +_POOL_SIZE_LIMIT = 5 _MAXIMUM_TIMEOUT = 90 diff --git a/src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py b/src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py index 0d51b64f1b..21bf9a4248 100644 --- a/src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py +++ b/src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py @@ -44,7 +44,7 @@ from grpc.framework.foundation import future from grpc.framework.foundation import logging_pool _TIMEOUT = 3 -_MAXIMUM_POOL_SIZE = 100 +_MAXIMUM_POOL_SIZE = 10 class _PauseableIterator(object): diff --git a/src/ruby/.rubocop_todo.yml b/src/ruby/.rubocop_todo.yml index ed4a4438b3..c35e970df6 100644 --- a/src/ruby/.rubocop_todo.yml +++ b/src/ruby/.rubocop_todo.yml @@ -1,5 +1,5 @@ # This configuration was generated by `rubocop --auto-gen-config` -# on 2015-04-17 14:43:27 -0700 using RuboCop version 0.30.0. +# on 2015-05-22 13:23:34 -0700 using RuboCop version 0.30.1. # The point is for the user to remove these configuration records # one by one as the offenses are removed from the code base. # Note that changes in the inspected code, or installation of new @@ -7,12 +7,12 @@ # Offense count: 30 Metrics/AbcSize: - Max: 40 + Max: 38 # Offense count: 3 # Configuration parameters: CountComments. Metrics/ClassLength: - Max: 184 + Max: 192 # Offense count: 35 # Configuration parameters: CountComments. diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c index 3182d03fab..837ca3b5e8 100644 --- a/src/ruby/ext/grpc/rb_server.c +++ b/src/ruby/ext/grpc/rb_server.c @@ -282,12 +282,12 @@ static VALUE grpc_rb_server_destroy(VALUE self) { call-seq: // insecure port insecure_server = Server.new(cq, {'arg1': 'value1'}) - insecure_server.add_http2_port('mydomain:7575') + insecure_server.add_http2_port('mydomain:50051') // secure port server_creds = ... secure_server = Server.new(cq, {'arg1': 'value1'}) - secure_server.add_http_port('mydomain:7575', server_creds) + secure_server.add_http_port('mydomain:50051', server_creds) Adds a http2 port to server */ static VALUE grpc_rb_server_add_http2_port(int argc, VALUE *argv, VALUE self) { diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb index 665c144432..dcb11bfbef 100644 --- a/src/ruby/lib/grpc/generic/rpc_server.rb +++ b/src/ruby/lib/grpc/generic/rpc_server.rb @@ -76,7 +76,7 @@ module GRPC @jobs = Queue.new @size = size @stopped = false - @stop_mutex = Mutex.new + @stop_mutex = Mutex.new # needs to be held when accessing @stopped @stop_cond = ConditionVariable.new @workers = [] @keep_alive = keep_alive @@ -92,10 +92,15 @@ module GRPC # @param args the args passed blk when it is called # @param blk the block to call def schedule(*args, &blk) - fail 'already stopped' if @stopped return if blk.nil? - GRPC.logger.info('schedule another job') - @jobs << [blk, args] + @stop_mutex.synchronize do + if @stopped + GRPC.logger.warn('did not schedule job, already stopped') + return + end + GRPC.logger.info('schedule another job') + @jobs << [blk, args] + end end # Starts running the jobs in the thread pool. @@ -116,8 +121,8 @@ module GRPC def stop GRPC.logger.info('stopping, will wait for all the workers to exit') @workers.size.times { schedule { throw :exit } } - @stopped = true @stop_mutex.synchronize do # wait @keep_alive for works to stop + @stopped = true @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 end forcibly_stop_workers @@ -249,15 +254,18 @@ module GRPC server_override:nil, connect_md_proc:nil, **kw) - @cq = RpcServer.setup_cq(completion_queue_override) - @server = RpcServer.setup_srv(server_override, @cq, **kw) @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc) - @pool_size = pool_size + @cq = RpcServer.setup_cq(completion_queue_override) @max_waiting_requests = max_waiting_requests @poll_period = poll_period - @run_mutex = Mutex.new - @run_cond = ConditionVariable.new + @pool_size = pool_size @pool = Pool.new(@pool_size) + @run_cond = ConditionVariable.new + @run_mutex = Mutex.new + @running = false + @server = RpcServer.setup_srv(server_override, @cq, **kw) + @stopped = false + @stop_mutex = Mutex.new end # stops a running server @@ -266,20 +274,23 @@ module GRPC # server's current call loop is it's last. def stop return unless @running - @stopped = true + @stop_mutex.synchronize do + @stopped = true + end @pool.stop + @server.close + end - # TODO: uncomment this: - # - # This segfaults in the c layer, so its commented out for now. Shutdown - # still occurs, but the c layer has to do the cleanup. - # - # @server.close + # determines if the server has been stopped + def stopped? + @stop_mutex.synchronize do + return @stopped + end end # determines if the server is currently running def running? - @running ||= false + @running end # Is called from other threads to wait for #run to start up the server. @@ -311,11 +322,6 @@ module GRPC t.join end - # Determines if the server is currently stopped - def stopped? - @stopped ||= false - end - # handle registration of classes # # service is either a class that includes GRPC::GenericService and whose @@ -407,7 +413,13 @@ module GRPC request_call_tag = Object.new until stopped? deadline = from_relative_time(@poll_period) - an_rpc = @server.request_call(@cq, request_call_tag, deadline) + begin + an_rpc = @server.request_call(@cq, request_call_tag, deadline) + rescue Core::CallError, RuntimeError => e + # can happen during server shutdown + GRPC.logger.warn("server call failed: #{e}") + next + end c = new_active_server_call(an_rpc) unless c.nil? mth = an_rpc.method.to_sym diff --git a/src/ruby/spec/generic/rpc_server_pool_spec.rb b/src/ruby/spec/generic/rpc_server_pool_spec.rb index aae3a7d7cb..b67008de48 100644 --- a/src/ruby/spec/generic/rpc_server_pool_spec.rb +++ b/src/ruby/spec/generic/rpc_server_pool_spec.rb @@ -74,11 +74,11 @@ describe GRPC::Pool do end describe '#schedule' do - it 'throws if the pool is already stopped' do + it 'return if the pool is already stopped' do p = Pool.new(1) p.stop job = proc {} - expect { p.schedule(&job) }.to raise_error + expect { p.schedule(&job) }.to_not raise_error end it 'adds jobs that get run by the pool' do diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb index 640b0f656c..e60a8b27c3 100644 --- a/src/ruby/spec/generic/rpc_server_spec.rb +++ b/src/ruby/spec/generic/rpc_server_spec.rb @@ -212,10 +212,14 @@ describe GRPC::RpcServer do describe '#stopped?' do before(:each) do - opts = { a_channel_arg: 'an_arg', poll_period: 1 } + opts = { a_channel_arg: 'an_arg', poll_period: 1.5 } @srv = RpcServer.new(**opts) end + after(:each) do + @srv.stop + end + it 'starts out false' do expect(@srv.stopped?).to be(false) end @@ -225,7 +229,7 @@ describe GRPC::RpcServer do expect(@srv.stopped?).to be(false) end - it 'stays false after the server starts running' do + it 'stays false after the server starts running', server: true do @srv.handle(EchoService) t = Thread.new { @srv.run } @srv.wait_till_running @@ -234,7 +238,7 @@ describe GRPC::RpcServer do t.join end - it 'is true after a running server is stopped' do + it 'is true after a running server is stopped', server: true do @srv.handle(EchoService) t = Thread.new { @srv.run } @srv.wait_till_running @@ -251,21 +255,22 @@ describe GRPC::RpcServer do expect(r.running?).to be(false) end - it 'is false after run is called with no services registered' do + it 'is false if run is called with no services registered', server: true do opts = { a_channel_arg: 'an_arg', - poll_period: 1, + poll_period: 2, server_override: @server } r = RpcServer.new(**opts) r.run expect(r.running?).to be(false) + r.stop end it 'is true after run is called with a registered service' do opts = { a_channel_arg: 'an_arg', - poll_period: 1, + poll_period: 2.5, server_override: @server } r = RpcServer.new(**opts) @@ -284,6 +289,10 @@ describe GRPC::RpcServer do @srv = RpcServer.new(**@opts) end + after(:each) do + @srv.stop + end + it 'raises if #run has already been called' do @srv.handle(EchoService) t = Thread.new { @srv.run } @@ -335,6 +344,10 @@ describe GRPC::RpcServer do @srv = RpcServer.new(**server_opts) end + after(:each) do + @srv.stop + end + it 'should return NOT_FOUND status on unknown methods', server: true do @srv.handle(EchoService) t = Thread.new { @srv.run } @@ -376,7 +389,7 @@ describe GRPC::RpcServer do t.join end - it 'should receive metadata when a deadline is specified', server: true do + it 'should receive metadata if a deadline is specified', server: true do service = SlowService.new @srv.handle(service) t = Thread.new { @srv.run } @@ -445,11 +458,11 @@ describe GRPC::RpcServer do it 'should handle multiple parallel requests', server: true do @srv.handle(EchoService) - Thread.new { @srv.run } + t = Thread.new { @srv.run } @srv.wait_till_running req, q = EchoMsg.new, Queue.new n = 5 # arbitrary - threads = [] + threads = [t] n.times do threads << Thread.new do stub = EchoStub.new(@host, **client_opts) @@ -472,7 +485,7 @@ describe GRPC::RpcServer do } alt_srv = RpcServer.new(**opts) alt_srv.handle(SlowService) - Thread.new { alt_srv.run } + t = Thread.new { alt_srv.run } alt_srv.wait_till_running req = EchoMsg.new n = 5 # arbitrary, use as many to ensure the server pool is exceeded @@ -490,6 +503,7 @@ describe GRPC::RpcServer do end threads.each(&:join) alt_srv.stop + t.join expect(one_failed_as_unavailable).to be(true) end end @@ -513,6 +527,10 @@ describe GRPC::RpcServer do @srv = RpcServer.new(**server_opts) end + after(:each) do + @srv.stop + end + it 'should send connect metadata to the client', server: true do service = EchoService.new @srv.handle(service) @@ -545,6 +563,10 @@ describe GRPC::RpcServer do @srv = RpcServer.new(**server_opts) end + after(:each) do + @srv.stop + end + it 'should be added to BadStatus when requests fail', server: true do service = FailingService.new @srv.handle(service) |