aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/httpcli/httpcli.c10
-rw-r--r--src/core/iomgr/sockaddr_utils.c6
-rw-r--r--src/core/support/subprocess_posix.c6
-rw-r--r--src/core/surface/call.c56
-rw-r--r--src/core/surface/server.c2
-rw-r--r--src/core/transport/chttp2/hpack_parser.c4
-rw-r--r--src/core/transport/chttp2/hpack_parser.h2
-rw-r--r--src/cpp/server/server.cc2
-rw-r--r--src/node/examples/math_server.js2
-rw-r--r--src/node/examples/stock_server.js2
-rw-r--r--src/node/interop/interop_client.js6
-rw-r--r--src/node/src/client.js58
-rw-r--r--src/objective-c/README.md2
-rw-r--r--src/objective-c/examples/Sample/Podfile2
-rw-r--r--src/python/src/grpc/_adapter/_face_test_case.py2
-rw-r--r--src/python/src/grpc/_adapter/_links_test.py4
-rw-r--r--src/python/src/grpc/_adapter/_lonely_rear_link_test.py2
-rw-r--r--src/python/src/grpc/_adapter/fore.py2
-rw-r--r--src/python/src/grpc/_adapter/rear.py2
-rw-r--r--src/python/src/grpc/early_adopter/implementations.py2
-rw-r--r--src/python/src/grpc/framework/base/implementations_test.py2
-rw-r--r--src/python/src/grpc/framework/face/_test_case.py2
-rw-r--r--src/python/src/grpc/framework/face/demonstration.py2
-rw-r--r--src/python/src/grpc/framework/face/testing/base_util.py2
-rw-r--r--src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py2
-rw-r--r--src/ruby/.rubocop_todo.yml6
-rw-r--r--src/ruby/ext/grpc/rb_server.c4
-rw-r--r--src/ruby/lib/grpc/generic/rpc_server.rb60
-rw-r--r--src/ruby/spec/generic/rpc_server_pool_spec.rb4
-rw-r--r--src/ruby/spec/generic/rpc_server_spec.rb42
30 files changed, 177 insertions, 123 deletions
diff --git a/src/core/httpcli/httpcli.c b/src/core/httpcli/httpcli.c
index fe7ea6a86b..6e4156c385 100644
--- a/src/core/httpcli/httpcli.c
+++ b/src/core/httpcli/httpcli.c
@@ -67,7 +67,6 @@ static grpc_httpcli_post_override g_post_override = NULL;
static void next_address(internal_request *req);
static void finish(internal_request *req, int success) {
- gpr_log(GPR_DEBUG, "%s", __FUNCTION__);
req->on_response(req->user_data, success ? &req->parser.r : NULL);
grpc_httpcli_parser_destroy(&req->parser);
if (req->addresses != NULL) {
@@ -86,8 +85,6 @@ static void on_read(void *user_data, gpr_slice *slices, size_t nslices,
internal_request *req = user_data;
size_t i;
- gpr_log(GPR_DEBUG, "%s nslices=%d status=%d", __FUNCTION__, nslices, status);
-
for (i = 0; i < nslices; i++) {
if (GPR_SLICE_LENGTH(slices[i])) {
req->have_read_byte = 1;
@@ -120,13 +117,11 @@ done:
}
static void on_written(internal_request *req) {
- gpr_log(GPR_DEBUG, "%s", __FUNCTION__);
grpc_endpoint_notify_on_read(req->ep, on_read, req);
}
static void done_write(void *arg, grpc_endpoint_cb_status status) {
internal_request *req = arg;
- gpr_log(GPR_DEBUG, "%s", __FUNCTION__);
switch (status) {
case GRPC_ENDPOINT_CB_OK:
on_written(req);
@@ -141,7 +136,6 @@ static void done_write(void *arg, grpc_endpoint_cb_status status) {
static void start_write(internal_request *req) {
gpr_slice_ref(req->request_text);
- gpr_log(GPR_DEBUG, "%s", __FUNCTION__);
switch (
grpc_endpoint_write(req->ep, &req->request_text, 1, done_write, req)) {
case GRPC_ENDPOINT_WRITE_DONE:
@@ -159,7 +153,6 @@ static void on_secure_transport_setup_done(void *rp,
grpc_security_status status,
grpc_endpoint *secure_endpoint) {
internal_request *req = rp;
- gpr_log(GPR_DEBUG, "%s", __FUNCTION__);
if (status != GRPC_SECURITY_OK) {
gpr_log(GPR_ERROR, "Secure transport setup failed with error %d.", status);
finish(req, 0);
@@ -172,7 +165,6 @@ static void on_secure_transport_setup_done(void *rp,
static void on_connected(void *arg, grpc_endpoint *tcp) {
internal_request *req = arg;
- gpr_log(GPR_DEBUG, "%s", __FUNCTION__);
if (!tcp) {
next_address(req);
return;
@@ -200,7 +192,6 @@ static void on_connected(void *arg, grpc_endpoint *tcp) {
static void next_address(internal_request *req) {
grpc_resolved_address *addr;
- gpr_log(GPR_DEBUG, "%s", __FUNCTION__);
if (req->next_address == req->addresses->naddrs) {
finish(req, 0);
return;
@@ -212,7 +203,6 @@ static void next_address(internal_request *req) {
static void on_resolved(void *arg, grpc_resolved_addresses *addresses) {
internal_request *req = arg;
- gpr_log(GPR_DEBUG, "%s", __FUNCTION__);
if (!addresses) {
finish(req, 0);
return;
diff --git a/src/core/iomgr/sockaddr_utils.c b/src/core/iomgr/sockaddr_utils.c
index 740bbe716e..3d202a5cc8 100644
--- a/src/core/iomgr/sockaddr_utils.c
+++ b/src/core/iomgr/sockaddr_utils.c
@@ -169,8 +169,7 @@ int grpc_sockaddr_get_port(const struct sockaddr *addr) {
case AF_UNIX:
return 1;
default:
- gpr_log(GPR_ERROR, "Unknown socket family %d in %s", addr->sa_family,
- __FUNCTION__);
+ gpr_log(GPR_ERROR, "Unknown socket family %d in grpc_sockaddr_get_port", addr->sa_family);
return 0;
}
}
@@ -184,8 +183,7 @@ int grpc_sockaddr_set_port(const struct sockaddr *addr, int port) {
((struct sockaddr_in6 *)addr)->sin6_port = htons(port);
return 1;
default:
- gpr_log(GPR_ERROR, "Unknown socket family %d in %s", addr->sa_family,
- __FUNCTION__);
+ gpr_log(GPR_ERROR, "Unknown socket family %d in grpc_sockaddr_set_port", addr->sa_family);
return 0;
}
}
diff --git a/src/core/support/subprocess_posix.c b/src/core/support/subprocess_posix.c
index 642520bb47..b4631fa0ed 100644
--- a/src/core/support/subprocess_posix.c
+++ b/src/core/support/subprocess_posix.c
@@ -57,7 +57,7 @@ struct gpr_subprocess {
const char *gpr_subprocess_binary_extension() { return ""; }
-gpr_subprocess *gpr_subprocess_create(int argc, char **argv) {
+gpr_subprocess *gpr_subprocess_create(int argc, const char **argv) {
gpr_subprocess *r;
int pid;
char **exec_args;
@@ -92,7 +92,11 @@ void gpr_subprocess_destroy(gpr_subprocess *p) {
int gpr_subprocess_join(gpr_subprocess *p) {
int status;
+retry:
if (waitpid(p->pid, &status, 0) == -1) {
+ if (errno == EINTR) {
+ goto retry;
+ }
gpr_log(GPR_ERROR, "waitpid failed: %s", strerror(errno));
return -1;
}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index ec6fd65ea3..4d2ba7cd7d 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -536,9 +536,8 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
switch ((grpc_ioreq_op)i) {
case GRPC_IOREQ_RECV_MESSAGE:
case GRPC_IOREQ_SEND_MESSAGE:
- if (master->success) {
- call->request_set[i] = REQSET_EMPTY;
- } else {
+ call->request_set[i] = REQSET_EMPTY;
+ if (!master->success) {
call->write_state = WRITE_STATE_WRITE_CLOSED;
}
break;
@@ -583,11 +582,29 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, int success) {
}
}
+static void early_out_write_ops(grpc_call *call) {
+ switch (call->write_state) {
+ case WRITE_STATE_WRITE_CLOSED:
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1);
+ /* fallthrough */
+ case WRITE_STATE_STARTED:
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0);
+ /* fallthrough */
+ case WRITE_STATE_INITIAL:
+ /* do nothing */
+ break;
+ }
+}
+
static void call_on_done_send(void *pc, int success) {
grpc_call *call = pc;
lock(call);
if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) {
finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, success);
+ call->write_state = WRITE_STATE_STARTED;
}
if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_MESSAGE)) {
finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, success);
@@ -596,6 +613,11 @@ static void call_on_done_send(void *pc, int success) {
finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, success);
finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, success);
finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1);
+ call->write_state = WRITE_STATE_WRITE_CLOSED;
+ }
+ if (!success) {
+ call->write_state = WRITE_STATE_WRITE_CLOSED;
+ early_out_write_ops(call);
}
call->send_ops.nops = 0;
call->last_send_contains = 0;
@@ -811,7 +833,6 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
op->send_ops = &call->send_ops;
op->bind_pollset = grpc_cq_pollset(call->cq);
call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA;
- call->write_state = WRITE_STATE_STARTED;
call->send_initial_metadata_count = 0;
/* fall through intended */
case WRITE_STATE_STARTED:
@@ -827,7 +848,6 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) {
op->is_last_send = 1;
op->send_ops = &call->send_ops;
call->last_send_contains |= 1 << GRPC_IOREQ_SEND_CLOSE;
- call->write_state = WRITE_STATE_WRITE_CLOSED;
if (!call->is_client) {
/* send trailing metadata */
data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA];
@@ -919,23 +939,6 @@ static void finish_read_ops(grpc_call *call) {
}
}
-static void early_out_write_ops(grpc_call *call) {
- switch (call->write_state) {
- case WRITE_STATE_WRITE_CLOSED:
- finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0);
- finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0);
- finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0);
- finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1);
- /* fallthrough */
- case WRITE_STATE_STARTED:
- finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0);
- /* fallthrough */
- case WRITE_STATE_INITIAL:
- /* do nothing */
- break;
- }
-}
-
static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
size_t nreqs,
grpc_ioreq_completion_func completion,
@@ -1176,6 +1179,10 @@ static void set_cancelled_value(grpc_status_code status, void *dest) {
}
static void finish_batch(grpc_call *call, int success, void *tag) {
+ grpc_cq_end_op(call->cq, tag, call, success);
+}
+
+static void finish_batch_with_close(grpc_call *call, int success, void *tag) {
grpc_cq_end_op(call->cq, tag, call, 1);
}
@@ -1186,6 +1193,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
size_t out;
const grpc_op *op;
grpc_ioreq *req;
+ void (*finish_func)(grpc_call *, int, void *) = finish_batch;
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag);
@@ -1269,6 +1277,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
op->data.recv_status_on_client.trailing_metadata;
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_CLOSE;
+ finish_func = finish_batch_with_close;
break;
case GRPC_OP_RECV_CLOSE_ON_SERVER:
req = &reqs[out++];
@@ -1278,13 +1287,14 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
op->data.recv_close_on_server.cancelled;
req = &reqs[out++];
req->op = GRPC_IOREQ_RECV_CLOSE;
+ finish_func = finish_batch_with_close;
break;
}
}
grpc_cq_begin_op(call->cq, call);
- return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_batch,
+ return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func,
tag);
}
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index d75af7291b..24a23ae5c4 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -665,7 +665,7 @@ void *grpc_server_register_method(grpc_server *server, const char *method,
const char *host) {
registered_method *m;
if (!method) {
- gpr_log(GPR_ERROR, "%s method string cannot be NULL", __FUNCTION__);
+ gpr_log(GPR_ERROR, "grpc_server_register_method method string cannot be NULL");
return NULL;
}
for (m = server->registered_methods; m; m = m->next) {
diff --git a/src/core/transport/chttp2/hpack_parser.c b/src/core/transport/chttp2/hpack_parser.c
index 3fd8f67226..a489543868 100644
--- a/src/core/transport/chttp2/hpack_parser.c
+++ b/src/core/transport/chttp2/hpack_parser.c
@@ -654,7 +654,7 @@ static int parse_stream_weight(grpc_chttp2_hpack_parser *p,
return 1;
}
- return parse_begin(p, cur + 1, end);
+ return p->after_prioritization(p, cur + 1, end);
}
static int parse_stream_dep3(grpc_chttp2_hpack_parser *p, const gpr_uint8 *cur,
@@ -1349,7 +1349,7 @@ void grpc_chttp2_hpack_parser_init(grpc_chttp2_hpack_parser *p,
}
void grpc_chttp2_hpack_parser_set_has_priority(grpc_chttp2_hpack_parser *p) {
- GPR_ASSERT(p->state == parse_begin);
+ p->after_prioritization = p->state;
p->state = parse_stream_dep0;
}
diff --git a/src/core/transport/chttp2/hpack_parser.h b/src/core/transport/chttp2/hpack_parser.h
index bb4c1a1f49..bfc06b3980 100644
--- a/src/core/transport/chttp2/hpack_parser.h
+++ b/src/core/transport/chttp2/hpack_parser.h
@@ -62,6 +62,8 @@ struct grpc_chttp2_hpack_parser {
grpc_chttp2_hpack_parser_state state;
/* future states dependent on the opening op code */
const grpc_chttp2_hpack_parser_state *next_state;
+ /* what to do after skipping prioritization data */
+ grpc_chttp2_hpack_parser_state after_prioritization;
/* the value we're currently parsing */
union {
gpr_uint32 *value;
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 62f4020d7e..e66b4ed2d8 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -149,7 +149,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
}
buf.AddServerSendStatus(&ctx_.trailing_metadata_, status);
call_.PerformOps(&buf);
- GPR_ASSERT(cq_.Pluck(&buf));
+ cq_.Pluck(&buf); /* status ignored */
void* ignored_tag;
bool ignored_ok;
cq_.Shutdown();
diff --git a/src/node/examples/math_server.js b/src/node/examples/math_server.js
index 3fac193d64..0a86e7eaff 100644
--- a/src/node/examples/math_server.js
+++ b/src/node/examples/math_server.js
@@ -119,7 +119,7 @@ var server = new Server({
});
if (require.main === module) {
- server.bind('0.0.0.0:7070');
+ server.bind('0.0.0.0:50051');
server.listen();
}
diff --git a/src/node/examples/stock_server.js b/src/node/examples/stock_server.js
index e475c9cb4c..8c22af147e 100644
--- a/src/node/examples/stock_server.js
+++ b/src/node/examples/stock_server.js
@@ -83,7 +83,7 @@ var stockServer = new StockServer({
});
if (require.main === module) {
- stockServer.bind('0.0.0.0:8080');
+ stockServer.bind('0.0.0.0:50051');
stockServer.listen();
}
diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js
index 80f811901c..455055d9f3 100644
--- a/src/node/interop/interop_client.js
+++ b/src/node/interop/interop_client.js
@@ -154,13 +154,15 @@ function serverStreaming(client, done) {
arg.response_parameters[resp_index].size);
resp_index += 1;
});
- call.on('status', function(status) {
- assert.strictEqual(status.code, grpc.status.OK);
+ call.on('end', function() {
assert.strictEqual(resp_index, 4);
if (done) {
done();
}
});
+ call.on('status', function(status) {
+ assert.strictEqual(status.code, grpc.status.OK);
+ });
}
/**
diff --git a/src/node/src/client.js b/src/node/src/client.js
index 46d476b9f4..65339406b2 100644
--- a/src/node/src/client.js
+++ b/src/node/src/client.js
@@ -81,7 +81,8 @@ function _write(chunk, encoding, callback) {
batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk);
this.call.startBatch(batch, function(err, event) {
if (err) {
- throw err;
+ // Something has gone wrong. Stop writing by failing to call callback
+ return;
}
callback();
});
@@ -120,10 +121,8 @@ function _read(size) {
*/
function readCallback(err, event) {
if (err) {
- throw err;
- }
- if (self.finished) {
- self.push(null);
+ // Something has gone wrong. Stop reading and wait for status
+ self.finished = true;
return;
}
var data = event.read;
@@ -237,10 +236,6 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
client_batch[grpc.opType.RECV_MESSAGE] = true;
client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(client_batch, function(err, response) {
- if (err) {
- callback(err);
- return;
- }
emitter.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
@@ -248,6 +243,12 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
error.metadata = response.status.metadata;
callback(error);
return;
+ } else {
+ if (err) {
+ // Got a batch error, but OK status. Something went wrong
+ callback(err);
+ return;
+ }
}
emitter.emit('metadata', response.metadata);
callback(null, deserialize(response.read));
@@ -300,7 +301,8 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
call.startBatch(metadata_batch, function(err, response) {
if (err) {
- callback(err);
+ // The call has stopped for some reason. A non-OK status will arrive
+ // in the other batch.
return;
}
stream.emit('metadata', response.metadata);
@@ -309,10 +311,6 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
client_batch[grpc.opType.RECV_MESSAGE] = true;
client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(client_batch, function(err, response) {
- if (err) {
- callback(err);
- return;
- }
stream.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
@@ -320,6 +318,12 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
error.metadata = response.status.metadata;
callback(error);
return;
+ } else {
+ if (err) {
+ // Got a batch error, but OK status. Something went wrong
+ callback(err);
+ return;
+ }
}
callback(null, deserialize(response.read));
});
@@ -373,16 +377,15 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
call.startBatch(start_batch, function(err, response) {
if (err) {
- throw err;
+ // The call has stopped for some reason. A non-OK status will arrive
+ // in the other batch.
+ return;
}
stream.emit('metadata', response.metadata);
});
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
- if (err) {
- throw err;
- }
stream.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
@@ -390,6 +393,12 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
error.metadata = response.status.metadata;
stream.emit('error', error);
return;
+ } else {
+ if (err) {
+ // Got a batch error, but OK status. Something went wrong
+ stream.emit('error', err);
+ return;
+ }
}
});
});
@@ -438,16 +447,15 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
call.startBatch(start_batch, function(err, response) {
if (err) {
- throw err;
+ // The call has stopped for some reason. A non-OK status will arrive
+ // in the other batch.
+ return;
}
stream.emit('metadata', response.metadata);
});
var status_batch = {};
status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true;
call.startBatch(status_batch, function(err, response) {
- if (err) {
- throw err;
- }
stream.emit('status', response.status);
if (response.status.code !== grpc.status.OK) {
var error = new Error(response.status.details);
@@ -455,6 +463,12 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) {
error.metadata = response.status.metadata;
stream.emit('error', error);
return;
+ } else {
+ if (err) {
+ // Got a batch error, but OK status. Something went wrong
+ stream.emit('error', err);
+ return;
+ }
}
});
});
diff --git a/src/objective-c/README.md b/src/objective-c/README.md
index cdba3777dd..49d7f43882 100644
--- a/src/objective-c/README.md
+++ b/src/objective-c/README.md
@@ -22,7 +22,7 @@ If you don't want to create the symbolic link, you can alternatively copy the bi
Finally, run _protoc_ with the following flags to generate the client library for your `.proto` files:
```sh
-protoc --objc_out=. --objcrpc_out=. *.proto
+protoc --objc_out=. --objcgrpc_out=. *.proto
```
This will generate a pair of `.pbobjc.h`/`.pbobjc.m` files for each `.proto` file, with the messages and enums defined in them. And a pair of `.pbrpc.h`/`.pbrpc.m` files for each `.proto` file with services defined. The latter contains the code to make remote calls to the specified API.
diff --git a/src/objective-c/examples/Sample/Podfile b/src/objective-c/examples/Sample/Podfile
index d30d9c5210..e8b78647ac 100644
--- a/src/objective-c/examples/Sample/Podfile
+++ b/src/objective-c/examples/Sample/Podfile
@@ -2,7 +2,7 @@ source 'https://github.com/CocoaPods/Specs.git'
platform :ios, '8.0'
pod 'gRPC', :path => "../../../.."
-pod 'Protobuf', :git => 'https://github.com/jcanizales/protobuf.git', :branch => 'add-podspec'
+pod 'Protobuf', :git => 'https://github.com/google/protobuf.git'
pod 'Route_guide', :path => "RouteGuideClient"
pod 'RemoteTest', :path => "RemoteTestClient"
diff --git a/src/python/src/grpc/_adapter/_face_test_case.py b/src/python/src/grpc/_adapter/_face_test_case.py
index 923e889844..5fa974ed06 100644
--- a/src/python/src/grpc/_adapter/_face_test_case.py
+++ b/src/python/src/grpc/_adapter/_face_test_case.py
@@ -43,7 +43,7 @@ from grpc.framework.foundation import logging_pool
_TIMEOUT = 3
_MAXIMUM_TIMEOUT = 90
-_MAXIMUM_POOL_SIZE = 400
+_MAXIMUM_POOL_SIZE = 4
class FaceTestCase(test_case.FaceTestCase, coverage.BlockingCoverage):
diff --git a/src/python/src/grpc/_adapter/_links_test.py b/src/python/src/grpc/_adapter/_links_test.py
index 4987be389a..4fd76f60f8 100644
--- a/src/python/src/grpc/_adapter/_links_test.py
+++ b/src/python/src/grpc/_adapter/_links_test.py
@@ -54,8 +54,8 @@ def _transform_metadata(unused_metadata):
class RoundTripTest(unittest.TestCase):
def setUp(self):
- self.fore_link_pool = logging_pool.pool(80)
- self.rear_link_pool = logging_pool.pool(80)
+ self.fore_link_pool = logging_pool.pool(8)
+ self.rear_link_pool = logging_pool.pool(8)
def tearDown(self):
self.rear_link_pool.shutdown(wait=True)
diff --git a/src/python/src/grpc/_adapter/_lonely_rear_link_test.py b/src/python/src/grpc/_adapter/_lonely_rear_link_test.py
index 25799d679c..bdb1ee2379 100644
--- a/src/python/src/grpc/_adapter/_lonely_rear_link_test.py
+++ b/src/python/src/grpc/_adapter/_lonely_rear_link_test.py
@@ -43,7 +43,7 @@ _TIMEOUT = 2
class LonelyRearLinkTest(unittest.TestCase):
def setUp(self):
- self.pool = logging_pool.pool(80)
+ self.pool = logging_pool.pool(8)
def tearDown(self):
self.pool.shutdown(wait=True)
diff --git a/src/python/src/grpc/_adapter/fore.py b/src/python/src/grpc/_adapter/fore.py
index 05016cdaf3..69e145e3f6 100644
--- a/src/python/src/grpc/_adapter/fore.py
+++ b/src/python/src/grpc/_adapter/fore.py
@@ -41,7 +41,7 @@ from grpc.framework.base import null
from grpc.framework.foundation import activated
from grpc.framework.foundation import logging_pool
-_THREAD_POOL_SIZE = 100
+_THREAD_POOL_SIZE = 10
@enum.unique
diff --git a/src/python/src/grpc/_adapter/rear.py b/src/python/src/grpc/_adapter/rear.py
index dd0a486117..b3b0b4ed32 100644
--- a/src/python/src/grpc/_adapter/rear.py
+++ b/src/python/src/grpc/_adapter/rear.py
@@ -41,7 +41,7 @@ from grpc.framework.base import null
from grpc.framework.foundation import activated
from grpc.framework.foundation import logging_pool
-_THREAD_POOL_SIZE = 100
+_THREAD_POOL_SIZE = 10
_INVOCATION_EVENT_KINDS = (
_low.Event.Kind.METADATA_ACCEPTED,
diff --git a/src/python/src/grpc/early_adopter/implementations.py b/src/python/src/grpc/early_adopter/implementations.py
index f3f2a043eb..10919fae69 100644
--- a/src/python/src/grpc/early_adopter/implementations.py
+++ b/src/python/src/grpc/early_adopter/implementations.py
@@ -41,7 +41,7 @@ from grpc.framework.base import util as _base_utilities
from grpc.framework.face import implementations as _face_implementations
from grpc.framework.foundation import logging_pool
-_THREAD_POOL_SIZE = 80
+_THREAD_POOL_SIZE = 8
_ONE_DAY_IN_SECONDS = 24 * 60 * 60
diff --git a/src/python/src/grpc/framework/base/implementations_test.py b/src/python/src/grpc/framework/base/implementations_test.py
index 11e49caf75..d40bb4d92e 100644
--- a/src/python/src/grpc/framework/base/implementations_test.py
+++ b/src/python/src/grpc/framework/base/implementations_test.py
@@ -36,7 +36,7 @@ from grpc.framework.base import interfaces_test_case
from grpc.framework.base import util
from grpc.framework.foundation import logging_pool
-POOL_MAX_WORKERS = 100
+POOL_MAX_WORKERS = 10
DEFAULT_TIMEOUT = 30
MAXIMUM_TIMEOUT = 60
diff --git a/src/python/src/grpc/framework/face/_test_case.py b/src/python/src/grpc/framework/face/_test_case.py
index b3a012db00..642d500628 100644
--- a/src/python/src/grpc/framework/face/_test_case.py
+++ b/src/python/src/grpc/framework/face/_test_case.py
@@ -35,7 +35,7 @@ from grpc.framework.face.testing import test_case
from grpc.framework.foundation import logging_pool
_TIMEOUT = 3
-_MAXIMUM_POOL_SIZE = 100
+_MAXIMUM_POOL_SIZE = 10
class FaceTestCase(test_case.FaceTestCase):
diff --git a/src/python/src/grpc/framework/face/demonstration.py b/src/python/src/grpc/framework/face/demonstration.py
index eabeac4569..f6b4b609ff 100644
--- a/src/python/src/grpc/framework/face/demonstration.py
+++ b/src/python/src/grpc/framework/face/demonstration.py
@@ -34,7 +34,7 @@ from grpc.framework.base import implementations as _base_implementations
from grpc.framework.face import implementations
from grpc.framework.foundation import logging_pool
-_POOL_SIZE_LIMIT = 20
+_POOL_SIZE_LIMIT = 5
_MAXIMUM_TIMEOUT = 90
diff --git a/src/python/src/grpc/framework/face/testing/base_util.py b/src/python/src/grpc/framework/face/testing/base_util.py
index 151d0ef793..1df1529b27 100644
--- a/src/python/src/grpc/framework/face/testing/base_util.py
+++ b/src/python/src/grpc/framework/face/testing/base_util.py
@@ -38,7 +38,7 @@ from grpc.framework.base import in_memory
from grpc.framework.base import interfaces # pylint: disable=unused-import
from grpc.framework.foundation import logging_pool
-_POOL_SIZE_LIMIT = 20
+_POOL_SIZE_LIMIT = 5
_MAXIMUM_TIMEOUT = 90
diff --git a/src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py b/src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py
index 0d51b64f1b..21bf9a4248 100644
--- a/src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py
+++ b/src/python/src/grpc/framework/face/testing/future_invocation_asynchronous_event_service_test_case.py
@@ -44,7 +44,7 @@ from grpc.framework.foundation import future
from grpc.framework.foundation import logging_pool
_TIMEOUT = 3
-_MAXIMUM_POOL_SIZE = 100
+_MAXIMUM_POOL_SIZE = 10
class _PauseableIterator(object):
diff --git a/src/ruby/.rubocop_todo.yml b/src/ruby/.rubocop_todo.yml
index ed4a4438b3..c35e970df6 100644
--- a/src/ruby/.rubocop_todo.yml
+++ b/src/ruby/.rubocop_todo.yml
@@ -1,5 +1,5 @@
# This configuration was generated by `rubocop --auto-gen-config`
-# on 2015-04-17 14:43:27 -0700 using RuboCop version 0.30.0.
+# on 2015-05-22 13:23:34 -0700 using RuboCop version 0.30.1.
# The point is for the user to remove these configuration records
# one by one as the offenses are removed from the code base.
# Note that changes in the inspected code, or installation of new
@@ -7,12 +7,12 @@
# Offense count: 30
Metrics/AbcSize:
- Max: 40
+ Max: 38
# Offense count: 3
# Configuration parameters: CountComments.
Metrics/ClassLength:
- Max: 184
+ Max: 192
# Offense count: 35
# Configuration parameters: CountComments.
diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c
index 3182d03fab..837ca3b5e8 100644
--- a/src/ruby/ext/grpc/rb_server.c
+++ b/src/ruby/ext/grpc/rb_server.c
@@ -282,12 +282,12 @@ static VALUE grpc_rb_server_destroy(VALUE self) {
call-seq:
// insecure port
insecure_server = Server.new(cq, {'arg1': 'value1'})
- insecure_server.add_http2_port('mydomain:7575')
+ insecure_server.add_http2_port('mydomain:50051')
// secure port
server_creds = ...
secure_server = Server.new(cq, {'arg1': 'value1'})
- secure_server.add_http_port('mydomain:7575', server_creds)
+ secure_server.add_http_port('mydomain:50051', server_creds)
Adds a http2 port to server */
static VALUE grpc_rb_server_add_http2_port(int argc, VALUE *argv, VALUE self) {
diff --git a/src/ruby/lib/grpc/generic/rpc_server.rb b/src/ruby/lib/grpc/generic/rpc_server.rb
index 665c144432..dcb11bfbef 100644
--- a/src/ruby/lib/grpc/generic/rpc_server.rb
+++ b/src/ruby/lib/grpc/generic/rpc_server.rb
@@ -76,7 +76,7 @@ module GRPC
@jobs = Queue.new
@size = size
@stopped = false
- @stop_mutex = Mutex.new
+ @stop_mutex = Mutex.new # needs to be held when accessing @stopped
@stop_cond = ConditionVariable.new
@workers = []
@keep_alive = keep_alive
@@ -92,10 +92,15 @@ module GRPC
# @param args the args passed blk when it is called
# @param blk the block to call
def schedule(*args, &blk)
- fail 'already stopped' if @stopped
return if blk.nil?
- GRPC.logger.info('schedule another job')
- @jobs << [blk, args]
+ @stop_mutex.synchronize do
+ if @stopped
+ GRPC.logger.warn('did not schedule job, already stopped')
+ return
+ end
+ GRPC.logger.info('schedule another job')
+ @jobs << [blk, args]
+ end
end
# Starts running the jobs in the thread pool.
@@ -116,8 +121,8 @@ module GRPC
def stop
GRPC.logger.info('stopping, will wait for all the workers to exit')
@workers.size.times { schedule { throw :exit } }
- @stopped = true
@stop_mutex.synchronize do # wait @keep_alive for works to stop
+ @stopped = true
@stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
end
forcibly_stop_workers
@@ -249,15 +254,18 @@ module GRPC
server_override:nil,
connect_md_proc:nil,
**kw)
- @cq = RpcServer.setup_cq(completion_queue_override)
- @server = RpcServer.setup_srv(server_override, @cq, **kw)
@connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc)
- @pool_size = pool_size
+ @cq = RpcServer.setup_cq(completion_queue_override)
@max_waiting_requests = max_waiting_requests
@poll_period = poll_period
- @run_mutex = Mutex.new
- @run_cond = ConditionVariable.new
+ @pool_size = pool_size
@pool = Pool.new(@pool_size)
+ @run_cond = ConditionVariable.new
+ @run_mutex = Mutex.new
+ @running = false
+ @server = RpcServer.setup_srv(server_override, @cq, **kw)
+ @stopped = false
+ @stop_mutex = Mutex.new
end
# stops a running server
@@ -266,20 +274,23 @@ module GRPC
# server's current call loop is it's last.
def stop
return unless @running
- @stopped = true
+ @stop_mutex.synchronize do
+ @stopped = true
+ end
@pool.stop
+ @server.close
+ end
- # TODO: uncomment this:
- #
- # This segfaults in the c layer, so its commented out for now. Shutdown
- # still occurs, but the c layer has to do the cleanup.
- #
- # @server.close
+ # determines if the server has been stopped
+ def stopped?
+ @stop_mutex.synchronize do
+ return @stopped
+ end
end
# determines if the server is currently running
def running?
- @running ||= false
+ @running
end
# Is called from other threads to wait for #run to start up the server.
@@ -311,11 +322,6 @@ module GRPC
t.join
end
- # Determines if the server is currently stopped
- def stopped?
- @stopped ||= false
- end
-
# handle registration of classes
#
# service is either a class that includes GRPC::GenericService and whose
@@ -407,7 +413,13 @@ module GRPC
request_call_tag = Object.new
until stopped?
deadline = from_relative_time(@poll_period)
- an_rpc = @server.request_call(@cq, request_call_tag, deadline)
+ begin
+ an_rpc = @server.request_call(@cq, request_call_tag, deadline)
+ rescue Core::CallError, RuntimeError => e
+ # can happen during server shutdown
+ GRPC.logger.warn("server call failed: #{e}")
+ next
+ end
c = new_active_server_call(an_rpc)
unless c.nil?
mth = an_rpc.method.to_sym
diff --git a/src/ruby/spec/generic/rpc_server_pool_spec.rb b/src/ruby/spec/generic/rpc_server_pool_spec.rb
index aae3a7d7cb..b67008de48 100644
--- a/src/ruby/spec/generic/rpc_server_pool_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_pool_spec.rb
@@ -74,11 +74,11 @@ describe GRPC::Pool do
end
describe '#schedule' do
- it 'throws if the pool is already stopped' do
+ it 'return if the pool is already stopped' do
p = Pool.new(1)
p.stop
job = proc {}
- expect { p.schedule(&job) }.to raise_error
+ expect { p.schedule(&job) }.to_not raise_error
end
it 'adds jobs that get run by the pool' do
diff --git a/src/ruby/spec/generic/rpc_server_spec.rb b/src/ruby/spec/generic/rpc_server_spec.rb
index 640b0f656c..e60a8b27c3 100644
--- a/src/ruby/spec/generic/rpc_server_spec.rb
+++ b/src/ruby/spec/generic/rpc_server_spec.rb
@@ -212,10 +212,14 @@ describe GRPC::RpcServer do
describe '#stopped?' do
before(:each) do
- opts = { a_channel_arg: 'an_arg', poll_period: 1 }
+ opts = { a_channel_arg: 'an_arg', poll_period: 1.5 }
@srv = RpcServer.new(**opts)
end
+ after(:each) do
+ @srv.stop
+ end
+
it 'starts out false' do
expect(@srv.stopped?).to be(false)
end
@@ -225,7 +229,7 @@ describe GRPC::RpcServer do
expect(@srv.stopped?).to be(false)
end
- it 'stays false after the server starts running' do
+ it 'stays false after the server starts running', server: true do
@srv.handle(EchoService)
t = Thread.new { @srv.run }
@srv.wait_till_running
@@ -234,7 +238,7 @@ describe GRPC::RpcServer do
t.join
end
- it 'is true after a running server is stopped' do
+ it 'is true after a running server is stopped', server: true do
@srv.handle(EchoService)
t = Thread.new { @srv.run }
@srv.wait_till_running
@@ -251,21 +255,22 @@ describe GRPC::RpcServer do
expect(r.running?).to be(false)
end
- it 'is false after run is called with no services registered' do
+ it 'is false if run is called with no services registered', server: true do
opts = {
a_channel_arg: 'an_arg',
- poll_period: 1,
+ poll_period: 2,
server_override: @server
}
r = RpcServer.new(**opts)
r.run
expect(r.running?).to be(false)
+ r.stop
end
it 'is true after run is called with a registered service' do
opts = {
a_channel_arg: 'an_arg',
- poll_period: 1,
+ poll_period: 2.5,
server_override: @server
}
r = RpcServer.new(**opts)
@@ -284,6 +289,10 @@ describe GRPC::RpcServer do
@srv = RpcServer.new(**@opts)
end
+ after(:each) do
+ @srv.stop
+ end
+
it 'raises if #run has already been called' do
@srv.handle(EchoService)
t = Thread.new { @srv.run }
@@ -335,6 +344,10 @@ describe GRPC::RpcServer do
@srv = RpcServer.new(**server_opts)
end
+ after(:each) do
+ @srv.stop
+ end
+
it 'should return NOT_FOUND status on unknown methods', server: true do
@srv.handle(EchoService)
t = Thread.new { @srv.run }
@@ -376,7 +389,7 @@ describe GRPC::RpcServer do
t.join
end
- it 'should receive metadata when a deadline is specified', server: true do
+ it 'should receive metadata if a deadline is specified', server: true do
service = SlowService.new
@srv.handle(service)
t = Thread.new { @srv.run }
@@ -445,11 +458,11 @@ describe GRPC::RpcServer do
it 'should handle multiple parallel requests', server: true do
@srv.handle(EchoService)
- Thread.new { @srv.run }
+ t = Thread.new { @srv.run }
@srv.wait_till_running
req, q = EchoMsg.new, Queue.new
n = 5 # arbitrary
- threads = []
+ threads = [t]
n.times do
threads << Thread.new do
stub = EchoStub.new(@host, **client_opts)
@@ -472,7 +485,7 @@ describe GRPC::RpcServer do
}
alt_srv = RpcServer.new(**opts)
alt_srv.handle(SlowService)
- Thread.new { alt_srv.run }
+ t = Thread.new { alt_srv.run }
alt_srv.wait_till_running
req = EchoMsg.new
n = 5 # arbitrary, use as many to ensure the server pool is exceeded
@@ -490,6 +503,7 @@ describe GRPC::RpcServer do
end
threads.each(&:join)
alt_srv.stop
+ t.join
expect(one_failed_as_unavailable).to be(true)
end
end
@@ -513,6 +527,10 @@ describe GRPC::RpcServer do
@srv = RpcServer.new(**server_opts)
end
+ after(:each) do
+ @srv.stop
+ end
+
it 'should send connect metadata to the client', server: true do
service = EchoService.new
@srv.handle(service)
@@ -545,6 +563,10 @@ describe GRPC::RpcServer do
@srv = RpcServer.new(**server_opts)
end
+ after(:each) do
+ @srv.stop
+ end
+
it 'should be added to BadStatus when requests fail', server: true do
service = FailingService.new
@srv.handle(service)