diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/iomgr/endpoint_pair_posix.c | 3 | ||||
-rw-r--r-- | src/core/transport/chttp2/hpack_parser.c | 39 | ||||
-rw-r--r-- | src/core/transport/chttp2/hpack_parser.h | 2 | ||||
-rw-r--r-- | src/node/src/server.js | 139 | ||||
-rw-r--r-- | src/node/test/surface_test.js | 48 |
5 files changed, 145 insertions, 86 deletions
diff --git a/src/core/iomgr/endpoint_pair_posix.c b/src/core/iomgr/endpoint_pair_posix.c index deae9c6875..56f6f146fd 100644 --- a/src/core/iomgr/endpoint_pair_posix.c +++ b/src/core/iomgr/endpoint_pair_posix.c @@ -36,6 +36,7 @@ #ifdef GPR_POSIX_SOCKET #include "src/core/iomgr/endpoint_pair.h" +#include "src/core/iomgr/socket_utils_posix.h" #include <errno.h> #include <fcntl.h> @@ -56,6 +57,8 @@ static void create_sockets(int sv[2]) { GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0); flags = fcntl(sv[1], F_GETFL, 0); GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0); + GPR_ASSERT(grpc_set_socket_no_sigpipe_if_possible(sv[0])); + GPR_ASSERT(grpc_set_socket_no_sigpipe_if_possible(sv[1])); } grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name, diff --git a/src/core/transport/chttp2/hpack_parser.c b/src/core/transport/chttp2/hpack_parser.c index d38ff68754..e5453000ec 100644 --- a/src/core/transport/chttp2/hpack_parser.c +++ b/src/core/transport/chttp2/hpack_parser.c @@ -728,6 +728,7 @@ static int finish_indexed_field(grpc_chttp2_hpack_parser *p, /* parse an indexed field with index < 127 */ static int parse_indexed_field(grpc_chttp2_hpack_parser *p, const gpr_uint8 *cur, const gpr_uint8 *end) { + p->dynamic_table_update_allowed = 0; p->index = (*cur) & 0x7f; return finish_indexed_field(p, cur + 1, end); } @@ -737,6 +738,7 @@ static int parse_indexed_field_x(grpc_chttp2_hpack_parser *p, const gpr_uint8 *cur, const gpr_uint8 *end) { static const grpc_chttp2_hpack_parser_state and_then[] = { finish_indexed_field}; + p->dynamic_table_update_allowed = 0; p->next_state = and_then; p->index = 0x7f; p->parsing.value = &p->index; @@ -748,6 +750,7 @@ static int parse_indexed_field_x(grpc_chttp2_hpack_parser *p, static int finish_lithdr_incidx(grpc_chttp2_hpack_parser *p, const gpr_uint8 *cur, const gpr_uint8 *end) { grpc_mdelem *md = grpc_chttp2_hptbl_lookup(&p->table, p->index); + GPR_ASSERT(md != NULL); /* handled in string parsing */ return on_hdr(p, grpc_mdelem_from_metadata_strings(GRPC_MDSTR_REF(md->key), take_string(p, &p->value)), 1) && @@ -768,6 +771,7 @@ static int parse_lithdr_incidx(grpc_chttp2_hpack_parser *p, const gpr_uint8 *cur, const gpr_uint8 *end) { static const grpc_chttp2_hpack_parser_state and_then[] = { parse_value_string_with_indexed_key, finish_lithdr_incidx}; + p->dynamic_table_update_allowed = 0; p->next_state = and_then; p->index = (*cur) & 0x3f; return parse_string_prefix(p, cur + 1, end); @@ -779,6 +783,7 @@ static int parse_lithdr_incidx_x(grpc_chttp2_hpack_parser *p, static const grpc_chttp2_hpack_parser_state and_then[] = { parse_string_prefix, parse_value_string_with_indexed_key, finish_lithdr_incidx}; + p->dynamic_table_update_allowed = 0; p->next_state = and_then; p->index = 0x3f; p->parsing.value = &p->index; @@ -791,6 +796,7 @@ static int parse_lithdr_incidx_v(grpc_chttp2_hpack_parser *p, static const grpc_chttp2_hpack_parser_state and_then[] = { parse_key_string, parse_string_prefix, parse_value_string_with_literal_key, finish_lithdr_incidx_v}; + p->dynamic_table_update_allowed = 0; p->next_state = and_then; return parse_string_prefix(p, cur + 1, end); } @@ -799,6 +805,7 @@ static int parse_lithdr_incidx_v(grpc_chttp2_hpack_parser *p, static int finish_lithdr_notidx(grpc_chttp2_hpack_parser *p, const gpr_uint8 *cur, const gpr_uint8 *end) { grpc_mdelem *md = grpc_chttp2_hptbl_lookup(&p->table, p->index); + GPR_ASSERT(md != NULL); /* handled in string parsing */ return on_hdr(p, grpc_mdelem_from_metadata_strings(GRPC_MDSTR_REF(md->key), take_string(p, &p->value)), 0) && @@ -819,6 +826,7 @@ static int parse_lithdr_notidx(grpc_chttp2_hpack_parser *p, const gpr_uint8 *cur, const gpr_uint8 *end) { static const grpc_chttp2_hpack_parser_state and_then[] = { parse_value_string_with_indexed_key, finish_lithdr_notidx}; + p->dynamic_table_update_allowed = 0; p->next_state = and_then; p->index = (*cur) & 0xf; return parse_string_prefix(p, cur + 1, end); @@ -830,6 +838,7 @@ static int parse_lithdr_notidx_x(grpc_chttp2_hpack_parser *p, static const grpc_chttp2_hpack_parser_state and_then[] = { parse_string_prefix, parse_value_string_with_indexed_key, finish_lithdr_notidx}; + p->dynamic_table_update_allowed = 0; p->next_state = and_then; p->index = 0xf; p->parsing.value = &p->index; @@ -842,6 +851,7 @@ static int parse_lithdr_notidx_v(grpc_chttp2_hpack_parser *p, static const grpc_chttp2_hpack_parser_state and_then[] = { parse_key_string, parse_string_prefix, parse_value_string_with_literal_key, finish_lithdr_notidx_v}; + p->dynamic_table_update_allowed = 0; p->next_state = and_then; return parse_string_prefix(p, cur + 1, end); } @@ -850,6 +860,7 @@ static int parse_lithdr_notidx_v(grpc_chttp2_hpack_parser *p, static int finish_lithdr_nvridx(grpc_chttp2_hpack_parser *p, const gpr_uint8 *cur, const gpr_uint8 *end) { grpc_mdelem *md = grpc_chttp2_hptbl_lookup(&p->table, p->index); + GPR_ASSERT(md != NULL); /* handled in string parsing */ return on_hdr(p, grpc_mdelem_from_metadata_strings(GRPC_MDSTR_REF(md->key), take_string(p, &p->value)), 0) && @@ -870,6 +881,7 @@ static int parse_lithdr_nvridx(grpc_chttp2_hpack_parser *p, const gpr_uint8 *cur, const gpr_uint8 *end) { static const grpc_chttp2_hpack_parser_state and_then[] = { parse_value_string_with_indexed_key, finish_lithdr_nvridx}; + p->dynamic_table_update_allowed = 0; p->next_state = and_then; p->index = (*cur) & 0xf; return parse_string_prefix(p, cur + 1, end); @@ -881,6 +893,7 @@ static int parse_lithdr_nvridx_x(grpc_chttp2_hpack_parser *p, static const grpc_chttp2_hpack_parser_state and_then[] = { parse_string_prefix, parse_value_string_with_indexed_key, finish_lithdr_nvridx}; + p->dynamic_table_update_allowed = 0; p->next_state = and_then; p->index = 0xf; p->parsing.value = &p->index; @@ -893,6 +906,7 @@ static int parse_lithdr_nvridx_v(grpc_chttp2_hpack_parser *p, static const grpc_chttp2_hpack_parser_state and_then[] = { parse_key_string, parse_string_prefix, parse_value_string_with_literal_key, finish_lithdr_nvridx_v}; + p->dynamic_table_update_allowed = 0; p->next_state = and_then; return parse_string_prefix(p, cur + 1, end); } @@ -908,6 +922,10 @@ static int finish_max_tbl_size(grpc_chttp2_hpack_parser *p, /* parse a max table size change, max size < 15 */ static int parse_max_tbl_size(grpc_chttp2_hpack_parser *p, const gpr_uint8 *cur, const gpr_uint8 *end) { + if (p->dynamic_table_update_allowed == 0) { + return 0; + } + p->dynamic_table_update_allowed--; p->index = (*cur) & 0x1f; return finish_max_tbl_size(p, cur + 1, end); } @@ -917,6 +935,10 @@ static int parse_max_tbl_size_x(grpc_chttp2_hpack_parser *p, const gpr_uint8 *cur, const gpr_uint8 *end) { static const grpc_chttp2_hpack_parser_state and_then[] = { finish_max_tbl_size}; + if (p->dynamic_table_update_allowed == 0) { + return 0; + } + p->dynamic_table_update_allowed--; p->next_state = and_then; p->index = 0x1f; p->parsing.value = &p->index; @@ -1300,7 +1322,10 @@ static is_binary_header is_binary_literal_header(grpc_chttp2_hpack_parser *p) { static is_binary_header is_binary_indexed_header(grpc_chttp2_hpack_parser *p) { grpc_mdelem *elem = grpc_chttp2_hptbl_lookup(&p->table, p->index); - if (!elem) return ERROR_HEADER; + if (!elem) { + gpr_log(GPR_ERROR, "Invalid HPACK index received: %d", p->index); + return ERROR_HEADER; + } return grpc_is_binary_header( (const char *)GPR_SLICE_START_PTR(elem->key->slice), GPR_SLICE_LENGTH(elem->key->slice)) @@ -1338,15 +1363,7 @@ static int parse_value_string_with_literal_key(grpc_chttp2_hpack_parser *p, /* PUBLIC INTERFACE */ static void on_header_not_set(void *user_data, grpc_mdelem *md) { - char *keyhex = gpr_dump_slice(md->key->slice, GPR_DUMP_HEX | GPR_DUMP_ASCII); - char *valuehex = - gpr_dump_slice(md->value->slice, GPR_DUMP_HEX | GPR_DUMP_ASCII); - gpr_log(GPR_ERROR, "on_header callback not set; key=%s value=%s", keyhex, - valuehex); - gpr_free(keyhex); - gpr_free(valuehex); - GRPC_MDELEM_UNREF(md); - abort(); + GPR_UNREACHABLE_CODE(return ); } void grpc_chttp2_hpack_parser_init(grpc_chttp2_hpack_parser *p) { @@ -1359,6 +1376,7 @@ void grpc_chttp2_hpack_parser_init(grpc_chttp2_hpack_parser *p) { p->value.str = NULL; p->value.capacity = 0; p->value.length = 0; + p->dynamic_table_update_allowed = 2; grpc_chttp2_hptbl_init(&p->table); } @@ -1414,6 +1432,7 @@ grpc_chttp2_parse_error grpc_chttp2_header_parser_parse( parser->on_header_user_data = NULL; parser->is_boundary = 0xde; parser->is_eof = 0xde; + parser->dynamic_table_update_allowed = 2; } GPR_TIMER_END("grpc_chttp2_hpack_parser_parse", 0); return GRPC_CHTTP2_PARSE_OK; diff --git a/src/core/transport/chttp2/hpack_parser.h b/src/core/transport/chttp2/hpack_parser.h index fb894b5735..bd36357124 100644 --- a/src/core/transport/chttp2/hpack_parser.h +++ b/src/core/transport/chttp2/hpack_parser.h @@ -85,6 +85,8 @@ struct grpc_chttp2_hpack_parser { gpr_uint8 binary; /* is the current string huffman encoded? */ gpr_uint8 huff; + /* is a dynamic table update allowed? */ + gpr_uint8 dynamic_table_update_allowed; /* set by higher layers, used by grpc_chttp2_header_parser_parse to signal it should append a metadata boundary at the end of frame */ gpr_uint8 is_boundary; diff --git a/src/node/src/server.js b/src/node/src/server.js index d1fb627e6c..ceaa9f5d1f 100644 --- a/src/node/src/server.js +++ b/src/node/src/server.js @@ -101,28 +101,6 @@ function handleError(call, error) { } /** - * Wait for the client to close, then emit a cancelled event if the client - * cancelled. - * @access private - * @param {grpc.Call} call The call object to wait on - * @param {EventEmitter} emitter The event emitter to emit the cancelled event - * on - */ -function waitForCancel(call, emitter) { - var cancel_batch = {}; - cancel_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; - call.startBatch(cancel_batch, function(err, result) { - if (err) { - emitter.emit('error', err); - } - if (result.cancelled) { - emitter.cancelled = true; - emitter.emit('cancelled'); - } - }); -} - -/** * Send a response to a unary or client streaming call. * @access private * @param {grpc.Call} call The call to respond on @@ -258,6 +236,13 @@ function setUpReadable(stream, deserialize) { }); } +util.inherits(ServerUnaryCall, EventEmitter); + +function ServerUnaryCall(call) { + EventEmitter.call(this); + this.call = call; +} + util.inherits(ServerWritableStream, Writable); /** @@ -311,33 +296,6 @@ function _write(chunk, encoding, callback) { ServerWritableStream.prototype._write = _write; -/** - * Send the initial metadata for a writable stream. - * @param {Metadata} responseMetadata Metadata to send - */ -function sendMetadata(responseMetadata) { - /* jshint validthis: true */ - var self = this; - if (!this.call.metadataSent) { - this.call.metadataSent = true; - var batch = []; - batch[grpc.opType.SEND_INITIAL_METADATA] = - responseMetadata._getCoreRepresentation(); - this.call.startBatch(batch, function(err) { - if (err) { - self.emit('error', err); - return; - } - }); - } -} - -/** - * @inheritdoc - * @alias module:src/server~ServerWritableStream#sendMetadata - */ -ServerWritableStream.prototype.sendMetadata = sendMetadata; - util.inherits(ServerReadableStream, Readable); /** @@ -427,6 +385,31 @@ function ServerDuplexStream(call, serialize, deserialize) { ServerDuplexStream.prototype._read = _read; ServerDuplexStream.prototype._write = _write; + +/** + * Send the initial metadata for a writable stream. + * @param {Metadata} responseMetadata Metadata to send + */ +function sendMetadata(responseMetadata) { + /* jshint validthis: true */ + var self = this; + if (!this.call.metadataSent) { + this.call.metadataSent = true; + var batch = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = + responseMetadata._getCoreRepresentation(); + this.call.startBatch(batch, function(err) { + if (err) { + self.emit('error', err); + return; + } + }); + } +} + +ServerUnaryCall.prototype.sendMetadata = sendMetadata; +ServerWritableStream.prototype.sendMetadata = sendMetadata; +ServerReadableStream.prototype.sendMetadata = sendMetadata; ServerDuplexStream.prototype.sendMetadata = sendMetadata; /** @@ -438,11 +421,37 @@ function getPeer() { return this.call.getPeer(); } +ServerUnaryCall.prototype.getPeer = getPeer; ServerReadableStream.prototype.getPeer = getPeer; ServerWritableStream.prototype.getPeer = getPeer; ServerDuplexStream.prototype.getPeer = getPeer; /** + * Wait for the client to close, then emit a cancelled event if the client + * cancelled. + */ +function waitForCancel() { + /* jshint validthis: true */ + var self = this; + var cancel_batch = {}; + cancel_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true; + self.call.startBatch(cancel_batch, function(err, result) { + if (err) { + self.emit('error', err); + } + if (result.cancelled) { + self.cancelled = true; + self.emit('cancelled'); + } + }); +} + +ServerUnaryCall.prototype.waitForCancel = waitForCancel; +ServerReadableStream.prototype.waitForCancel = waitForCancel; +ServerWritableStream.prototype.waitForCancel = waitForCancel; +ServerDuplexStream.prototype.waitForCancel = waitForCancel; + +/** * Fully handle a unary call * @access private * @param {grpc.Call} call The call to handle @@ -450,25 +459,12 @@ ServerDuplexStream.prototype.getPeer = getPeer; * @param {Metadata} metadata Metadata from the client */ function handleUnary(call, handler, metadata) { - var emitter = new EventEmitter(); - emitter.sendMetadata = function(responseMetadata) { - if (!call.metadataSent) { - call.metadataSent = true; - var batch = {}; - batch[grpc.opType.SEND_INITIAL_METADATA] = - responseMetadata._getCoreRepresentation(); - call.startBatch(batch, function() {}); - } - }; - emitter.getPeer = function() { - return call.getPeer(); - }; + var emitter = new ServerUnaryCall(call); emitter.on('error', function(error) { handleError(call, error); }); emitter.metadata = metadata; - waitForCancel(call, emitter); - emitter.call = call; + emitter.waitForCancel(); var batch = {}; batch[grpc.opType.RECV_MESSAGE] = true; call.startBatch(batch, function(err, result) { @@ -508,7 +504,7 @@ function handleUnary(call, handler, metadata) { */ function handleServerStreaming(call, handler, metadata) { var stream = new ServerWritableStream(call, handler.serialize); - waitForCancel(call, stream); + stream.waitForCancel(); stream.metadata = metadata; var batch = {}; batch[grpc.opType.RECV_MESSAGE] = true; @@ -537,19 +533,10 @@ function handleServerStreaming(call, handler, metadata) { */ function handleClientStreaming(call, handler, metadata) { var stream = new ServerReadableStream(call, handler.deserialize); - stream.sendMetadata = function(responseMetadata) { - if (!call.metadataSent) { - call.metadataSent = true; - var batch = {}; - batch[grpc.opType.SEND_INITIAL_METADATA] = - responseMetadata._getCoreRepresentation(); - call.startBatch(batch, function() {}); - } - }; stream.on('error', function(error) { handleError(call, error); }); - waitForCancel(call, stream); + stream.waitForCancel(); stream.metadata = metadata; handler.func(stream, function(err, value, trailer, flags) { stream.terminate(); @@ -574,7 +561,7 @@ function handleClientStreaming(call, handler, metadata) { function handleBidiStreaming(call, handler, metadata) { var stream = new ServerDuplexStream(call, handler.serialize, handler.deserialize); - waitForCancel(call, stream); + stream.waitForCancel(); stream.metadata = metadata; handler.func(stream); } diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js index 39673e4e05..523fda6849 100644 --- a/src/node/test/surface_test.js +++ b/src/node/test/surface_test.js @@ -312,6 +312,54 @@ describe('Generic client and server', function() { }); }); }); +describe('Server-side getPeer', function() { + function toString(val) { + return val.toString(); + } + function toBuffer(str) { + return new Buffer(str); + } + var string_service_attrs = { + 'getPeer' : { + path: '/string/getPeer', + requestStream: false, + responseStream: false, + requestSerialize: toBuffer, + requestDeserialize: toString, + responseSerialize: toBuffer, + responseDeserialize: toString + } + }; + var client; + var server; + before(function() { + server = new grpc.Server(); + server.addService(string_service_attrs, { + getPeer: function(call, callback) { + try { + callback(null, call.getPeer()); + } catch (e) { + call.emit('error', e); + } + } + }); + var port = server.bind('localhost:0', server_insecure_creds); + server.start(); + var Client = grpc.makeGenericClientConstructor(string_service_attrs); + client = new Client('localhost:' + port, + grpc.credentials.createInsecure()); + }); + after(function() { + server.forceShutdown(); + }); + it('should respond with a string representing the client', function(done) { + client.getPeer('', function(err, response) { + assert.ifError(err); + // We don't expect a specific value, just that it worked without error + done(); + }); + }); +}); describe('Echo metadata', function() { var client; var server; |