aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/iomgr/endpoint_pair_posix.c3
-rw-r--r--src/core/transport/chttp2/hpack_parser.c39
-rw-r--r--src/core/transport/chttp2/hpack_parser.h2
-rw-r--r--src/node/src/server.js139
-rw-r--r--src/node/test/surface_test.js48
5 files changed, 145 insertions, 86 deletions
diff --git a/src/core/iomgr/endpoint_pair_posix.c b/src/core/iomgr/endpoint_pair_posix.c
index deae9c6875..56f6f146fd 100644
--- a/src/core/iomgr/endpoint_pair_posix.c
+++ b/src/core/iomgr/endpoint_pair_posix.c
@@ -36,6 +36,7 @@
#ifdef GPR_POSIX_SOCKET
#include "src/core/iomgr/endpoint_pair.h"
+#include "src/core/iomgr/socket_utils_posix.h"
#include <errno.h>
#include <fcntl.h>
@@ -56,6 +57,8 @@ static void create_sockets(int sv[2]) {
GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
flags = fcntl(sv[1], F_GETFL, 0);
GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
+ GPR_ASSERT(grpc_set_socket_no_sigpipe_if_possible(sv[0]));
+ GPR_ASSERT(grpc_set_socket_no_sigpipe_if_possible(sv[1]));
}
grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char *name,
diff --git a/src/core/transport/chttp2/hpack_parser.c b/src/core/transport/chttp2/hpack_parser.c
index d38ff68754..e5453000ec 100644
--- a/src/core/transport/chttp2/hpack_parser.c
+++ b/src/core/transport/chttp2/hpack_parser.c
@@ -728,6 +728,7 @@ static int finish_indexed_field(grpc_chttp2_hpack_parser *p,
/* parse an indexed field with index < 127 */
static int parse_indexed_field(grpc_chttp2_hpack_parser *p,
const gpr_uint8 *cur, const gpr_uint8 *end) {
+ p->dynamic_table_update_allowed = 0;
p->index = (*cur) & 0x7f;
return finish_indexed_field(p, cur + 1, end);
}
@@ -737,6 +738,7 @@ static int parse_indexed_field_x(grpc_chttp2_hpack_parser *p,
const gpr_uint8 *cur, const gpr_uint8 *end) {
static const grpc_chttp2_hpack_parser_state and_then[] = {
finish_indexed_field};
+ p->dynamic_table_update_allowed = 0;
p->next_state = and_then;
p->index = 0x7f;
p->parsing.value = &p->index;
@@ -748,6 +750,7 @@ static int parse_indexed_field_x(grpc_chttp2_hpack_parser *p,
static int finish_lithdr_incidx(grpc_chttp2_hpack_parser *p,
const gpr_uint8 *cur, const gpr_uint8 *end) {
grpc_mdelem *md = grpc_chttp2_hptbl_lookup(&p->table, p->index);
+ GPR_ASSERT(md != NULL); /* handled in string parsing */
return on_hdr(p, grpc_mdelem_from_metadata_strings(GRPC_MDSTR_REF(md->key),
take_string(p, &p->value)),
1) &&
@@ -768,6 +771,7 @@ static int parse_lithdr_incidx(grpc_chttp2_hpack_parser *p,
const gpr_uint8 *cur, const gpr_uint8 *end) {
static const grpc_chttp2_hpack_parser_state and_then[] = {
parse_value_string_with_indexed_key, finish_lithdr_incidx};
+ p->dynamic_table_update_allowed = 0;
p->next_state = and_then;
p->index = (*cur) & 0x3f;
return parse_string_prefix(p, cur + 1, end);
@@ -779,6 +783,7 @@ static int parse_lithdr_incidx_x(grpc_chttp2_hpack_parser *p,
static const grpc_chttp2_hpack_parser_state and_then[] = {
parse_string_prefix, parse_value_string_with_indexed_key,
finish_lithdr_incidx};
+ p->dynamic_table_update_allowed = 0;
p->next_state = and_then;
p->index = 0x3f;
p->parsing.value = &p->index;
@@ -791,6 +796,7 @@ static int parse_lithdr_incidx_v(grpc_chttp2_hpack_parser *p,
static const grpc_chttp2_hpack_parser_state and_then[] = {
parse_key_string, parse_string_prefix,
parse_value_string_with_literal_key, finish_lithdr_incidx_v};
+ p->dynamic_table_update_allowed = 0;
p->next_state = and_then;
return parse_string_prefix(p, cur + 1, end);
}
@@ -799,6 +805,7 @@ static int parse_lithdr_incidx_v(grpc_chttp2_hpack_parser *p,
static int finish_lithdr_notidx(grpc_chttp2_hpack_parser *p,
const gpr_uint8 *cur, const gpr_uint8 *end) {
grpc_mdelem *md = grpc_chttp2_hptbl_lookup(&p->table, p->index);
+ GPR_ASSERT(md != NULL); /* handled in string parsing */
return on_hdr(p, grpc_mdelem_from_metadata_strings(GRPC_MDSTR_REF(md->key),
take_string(p, &p->value)),
0) &&
@@ -819,6 +826,7 @@ static int parse_lithdr_notidx(grpc_chttp2_hpack_parser *p,
const gpr_uint8 *cur, const gpr_uint8 *end) {
static const grpc_chttp2_hpack_parser_state and_then[] = {
parse_value_string_with_indexed_key, finish_lithdr_notidx};
+ p->dynamic_table_update_allowed = 0;
p->next_state = and_then;
p->index = (*cur) & 0xf;
return parse_string_prefix(p, cur + 1, end);
@@ -830,6 +838,7 @@ static int parse_lithdr_notidx_x(grpc_chttp2_hpack_parser *p,
static const grpc_chttp2_hpack_parser_state and_then[] = {
parse_string_prefix, parse_value_string_with_indexed_key,
finish_lithdr_notidx};
+ p->dynamic_table_update_allowed = 0;
p->next_state = and_then;
p->index = 0xf;
p->parsing.value = &p->index;
@@ -842,6 +851,7 @@ static int parse_lithdr_notidx_v(grpc_chttp2_hpack_parser *p,
static const grpc_chttp2_hpack_parser_state and_then[] = {
parse_key_string, parse_string_prefix,
parse_value_string_with_literal_key, finish_lithdr_notidx_v};
+ p->dynamic_table_update_allowed = 0;
p->next_state = and_then;
return parse_string_prefix(p, cur + 1, end);
}
@@ -850,6 +860,7 @@ static int parse_lithdr_notidx_v(grpc_chttp2_hpack_parser *p,
static int finish_lithdr_nvridx(grpc_chttp2_hpack_parser *p,
const gpr_uint8 *cur, const gpr_uint8 *end) {
grpc_mdelem *md = grpc_chttp2_hptbl_lookup(&p->table, p->index);
+ GPR_ASSERT(md != NULL); /* handled in string parsing */
return on_hdr(p, grpc_mdelem_from_metadata_strings(GRPC_MDSTR_REF(md->key),
take_string(p, &p->value)),
0) &&
@@ -870,6 +881,7 @@ static int parse_lithdr_nvridx(grpc_chttp2_hpack_parser *p,
const gpr_uint8 *cur, const gpr_uint8 *end) {
static const grpc_chttp2_hpack_parser_state and_then[] = {
parse_value_string_with_indexed_key, finish_lithdr_nvridx};
+ p->dynamic_table_update_allowed = 0;
p->next_state = and_then;
p->index = (*cur) & 0xf;
return parse_string_prefix(p, cur + 1, end);
@@ -881,6 +893,7 @@ static int parse_lithdr_nvridx_x(grpc_chttp2_hpack_parser *p,
static const grpc_chttp2_hpack_parser_state and_then[] = {
parse_string_prefix, parse_value_string_with_indexed_key,
finish_lithdr_nvridx};
+ p->dynamic_table_update_allowed = 0;
p->next_state = and_then;
p->index = 0xf;
p->parsing.value = &p->index;
@@ -893,6 +906,7 @@ static int parse_lithdr_nvridx_v(grpc_chttp2_hpack_parser *p,
static const grpc_chttp2_hpack_parser_state and_then[] = {
parse_key_string, parse_string_prefix,
parse_value_string_with_literal_key, finish_lithdr_nvridx_v};
+ p->dynamic_table_update_allowed = 0;
p->next_state = and_then;
return parse_string_prefix(p, cur + 1, end);
}
@@ -908,6 +922,10 @@ static int finish_max_tbl_size(grpc_chttp2_hpack_parser *p,
/* parse a max table size change, max size < 15 */
static int parse_max_tbl_size(grpc_chttp2_hpack_parser *p, const gpr_uint8 *cur,
const gpr_uint8 *end) {
+ if (p->dynamic_table_update_allowed == 0) {
+ return 0;
+ }
+ p->dynamic_table_update_allowed--;
p->index = (*cur) & 0x1f;
return finish_max_tbl_size(p, cur + 1, end);
}
@@ -917,6 +935,10 @@ static int parse_max_tbl_size_x(grpc_chttp2_hpack_parser *p,
const gpr_uint8 *cur, const gpr_uint8 *end) {
static const grpc_chttp2_hpack_parser_state and_then[] = {
finish_max_tbl_size};
+ if (p->dynamic_table_update_allowed == 0) {
+ return 0;
+ }
+ p->dynamic_table_update_allowed--;
p->next_state = and_then;
p->index = 0x1f;
p->parsing.value = &p->index;
@@ -1300,7 +1322,10 @@ static is_binary_header is_binary_literal_header(grpc_chttp2_hpack_parser *p) {
static is_binary_header is_binary_indexed_header(grpc_chttp2_hpack_parser *p) {
grpc_mdelem *elem = grpc_chttp2_hptbl_lookup(&p->table, p->index);
- if (!elem) return ERROR_HEADER;
+ if (!elem) {
+ gpr_log(GPR_ERROR, "Invalid HPACK index received: %d", p->index);
+ return ERROR_HEADER;
+ }
return grpc_is_binary_header(
(const char *)GPR_SLICE_START_PTR(elem->key->slice),
GPR_SLICE_LENGTH(elem->key->slice))
@@ -1338,15 +1363,7 @@ static int parse_value_string_with_literal_key(grpc_chttp2_hpack_parser *p,
/* PUBLIC INTERFACE */
static void on_header_not_set(void *user_data, grpc_mdelem *md) {
- char *keyhex = gpr_dump_slice(md->key->slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
- char *valuehex =
- gpr_dump_slice(md->value->slice, GPR_DUMP_HEX | GPR_DUMP_ASCII);
- gpr_log(GPR_ERROR, "on_header callback not set; key=%s value=%s", keyhex,
- valuehex);
- gpr_free(keyhex);
- gpr_free(valuehex);
- GRPC_MDELEM_UNREF(md);
- abort();
+ GPR_UNREACHABLE_CODE(return );
}
void grpc_chttp2_hpack_parser_init(grpc_chttp2_hpack_parser *p) {
@@ -1359,6 +1376,7 @@ void grpc_chttp2_hpack_parser_init(grpc_chttp2_hpack_parser *p) {
p->value.str = NULL;
p->value.capacity = 0;
p->value.length = 0;
+ p->dynamic_table_update_allowed = 2;
grpc_chttp2_hptbl_init(&p->table);
}
@@ -1414,6 +1432,7 @@ grpc_chttp2_parse_error grpc_chttp2_header_parser_parse(
parser->on_header_user_data = NULL;
parser->is_boundary = 0xde;
parser->is_eof = 0xde;
+ parser->dynamic_table_update_allowed = 2;
}
GPR_TIMER_END("grpc_chttp2_hpack_parser_parse", 0);
return GRPC_CHTTP2_PARSE_OK;
diff --git a/src/core/transport/chttp2/hpack_parser.h b/src/core/transport/chttp2/hpack_parser.h
index fb894b5735..bd36357124 100644
--- a/src/core/transport/chttp2/hpack_parser.h
+++ b/src/core/transport/chttp2/hpack_parser.h
@@ -85,6 +85,8 @@ struct grpc_chttp2_hpack_parser {
gpr_uint8 binary;
/* is the current string huffman encoded? */
gpr_uint8 huff;
+ /* is a dynamic table update allowed? */
+ gpr_uint8 dynamic_table_update_allowed;
/* set by higher layers, used by grpc_chttp2_header_parser_parse to signal
it should append a metadata boundary at the end of frame */
gpr_uint8 is_boundary;
diff --git a/src/node/src/server.js b/src/node/src/server.js
index d1fb627e6c..ceaa9f5d1f 100644
--- a/src/node/src/server.js
+++ b/src/node/src/server.js
@@ -101,28 +101,6 @@ function handleError(call, error) {
}
/**
- * Wait for the client to close, then emit a cancelled event if the client
- * cancelled.
- * @access private
- * @param {grpc.Call} call The call object to wait on
- * @param {EventEmitter} emitter The event emitter to emit the cancelled event
- * on
- */
-function waitForCancel(call, emitter) {
- var cancel_batch = {};
- cancel_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
- call.startBatch(cancel_batch, function(err, result) {
- if (err) {
- emitter.emit('error', err);
- }
- if (result.cancelled) {
- emitter.cancelled = true;
- emitter.emit('cancelled');
- }
- });
-}
-
-/**
* Send a response to a unary or client streaming call.
* @access private
* @param {grpc.Call} call The call to respond on
@@ -258,6 +236,13 @@ function setUpReadable(stream, deserialize) {
});
}
+util.inherits(ServerUnaryCall, EventEmitter);
+
+function ServerUnaryCall(call) {
+ EventEmitter.call(this);
+ this.call = call;
+}
+
util.inherits(ServerWritableStream, Writable);
/**
@@ -311,33 +296,6 @@ function _write(chunk, encoding, callback) {
ServerWritableStream.prototype._write = _write;
-/**
- * Send the initial metadata for a writable stream.
- * @param {Metadata} responseMetadata Metadata to send
- */
-function sendMetadata(responseMetadata) {
- /* jshint validthis: true */
- var self = this;
- if (!this.call.metadataSent) {
- this.call.metadataSent = true;
- var batch = [];
- batch[grpc.opType.SEND_INITIAL_METADATA] =
- responseMetadata._getCoreRepresentation();
- this.call.startBatch(batch, function(err) {
- if (err) {
- self.emit('error', err);
- return;
- }
- });
- }
-}
-
-/**
- * @inheritdoc
- * @alias module:src/server~ServerWritableStream#sendMetadata
- */
-ServerWritableStream.prototype.sendMetadata = sendMetadata;
-
util.inherits(ServerReadableStream, Readable);
/**
@@ -427,6 +385,31 @@ function ServerDuplexStream(call, serialize, deserialize) {
ServerDuplexStream.prototype._read = _read;
ServerDuplexStream.prototype._write = _write;
+
+/**
+ * Send the initial metadata for a writable stream.
+ * @param {Metadata} responseMetadata Metadata to send
+ */
+function sendMetadata(responseMetadata) {
+ /* jshint validthis: true */
+ var self = this;
+ if (!this.call.metadataSent) {
+ this.call.metadataSent = true;
+ var batch = {};
+ batch[grpc.opType.SEND_INITIAL_METADATA] =
+ responseMetadata._getCoreRepresentation();
+ this.call.startBatch(batch, function(err) {
+ if (err) {
+ self.emit('error', err);
+ return;
+ }
+ });
+ }
+}
+
+ServerUnaryCall.prototype.sendMetadata = sendMetadata;
+ServerWritableStream.prototype.sendMetadata = sendMetadata;
+ServerReadableStream.prototype.sendMetadata = sendMetadata;
ServerDuplexStream.prototype.sendMetadata = sendMetadata;
/**
@@ -438,11 +421,37 @@ function getPeer() {
return this.call.getPeer();
}
+ServerUnaryCall.prototype.getPeer = getPeer;
ServerReadableStream.prototype.getPeer = getPeer;
ServerWritableStream.prototype.getPeer = getPeer;
ServerDuplexStream.prototype.getPeer = getPeer;
/**
+ * Wait for the client to close, then emit a cancelled event if the client
+ * cancelled.
+ */
+function waitForCancel() {
+ /* jshint validthis: true */
+ var self = this;
+ var cancel_batch = {};
+ cancel_batch[grpc.opType.RECV_CLOSE_ON_SERVER] = true;
+ self.call.startBatch(cancel_batch, function(err, result) {
+ if (err) {
+ self.emit('error', err);
+ }
+ if (result.cancelled) {
+ self.cancelled = true;
+ self.emit('cancelled');
+ }
+ });
+}
+
+ServerUnaryCall.prototype.waitForCancel = waitForCancel;
+ServerReadableStream.prototype.waitForCancel = waitForCancel;
+ServerWritableStream.prototype.waitForCancel = waitForCancel;
+ServerDuplexStream.prototype.waitForCancel = waitForCancel;
+
+/**
* Fully handle a unary call
* @access private
* @param {grpc.Call} call The call to handle
@@ -450,25 +459,12 @@ ServerDuplexStream.prototype.getPeer = getPeer;
* @param {Metadata} metadata Metadata from the client
*/
function handleUnary(call, handler, metadata) {
- var emitter = new EventEmitter();
- emitter.sendMetadata = function(responseMetadata) {
- if (!call.metadataSent) {
- call.metadataSent = true;
- var batch = {};
- batch[grpc.opType.SEND_INITIAL_METADATA] =
- responseMetadata._getCoreRepresentation();
- call.startBatch(batch, function() {});
- }
- };
- emitter.getPeer = function() {
- return call.getPeer();
- };
+ var emitter = new ServerUnaryCall(call);
emitter.on('error', function(error) {
handleError(call, error);
});
emitter.metadata = metadata;
- waitForCancel(call, emitter);
- emitter.call = call;
+ emitter.waitForCancel();
var batch = {};
batch[grpc.opType.RECV_MESSAGE] = true;
call.startBatch(batch, function(err, result) {
@@ -508,7 +504,7 @@ function handleUnary(call, handler, metadata) {
*/
function handleServerStreaming(call, handler, metadata) {
var stream = new ServerWritableStream(call, handler.serialize);
- waitForCancel(call, stream);
+ stream.waitForCancel();
stream.metadata = metadata;
var batch = {};
batch[grpc.opType.RECV_MESSAGE] = true;
@@ -537,19 +533,10 @@ function handleServerStreaming(call, handler, metadata) {
*/
function handleClientStreaming(call, handler, metadata) {
var stream = new ServerReadableStream(call, handler.deserialize);
- stream.sendMetadata = function(responseMetadata) {
- if (!call.metadataSent) {
- call.metadataSent = true;
- var batch = {};
- batch[grpc.opType.SEND_INITIAL_METADATA] =
- responseMetadata._getCoreRepresentation();
- call.startBatch(batch, function() {});
- }
- };
stream.on('error', function(error) {
handleError(call, error);
});
- waitForCancel(call, stream);
+ stream.waitForCancel();
stream.metadata = metadata;
handler.func(stream, function(err, value, trailer, flags) {
stream.terminate();
@@ -574,7 +561,7 @@ function handleClientStreaming(call, handler, metadata) {
function handleBidiStreaming(call, handler, metadata) {
var stream = new ServerDuplexStream(call, handler.serialize,
handler.deserialize);
- waitForCancel(call, stream);
+ stream.waitForCancel();
stream.metadata = metadata;
handler.func(stream);
}
diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js
index 39673e4e05..523fda6849 100644
--- a/src/node/test/surface_test.js
+++ b/src/node/test/surface_test.js
@@ -312,6 +312,54 @@ describe('Generic client and server', function() {
});
});
});
+describe('Server-side getPeer', function() {
+ function toString(val) {
+ return val.toString();
+ }
+ function toBuffer(str) {
+ return new Buffer(str);
+ }
+ var string_service_attrs = {
+ 'getPeer' : {
+ path: '/string/getPeer',
+ requestStream: false,
+ responseStream: false,
+ requestSerialize: toBuffer,
+ requestDeserialize: toString,
+ responseSerialize: toBuffer,
+ responseDeserialize: toString
+ }
+ };
+ var client;
+ var server;
+ before(function() {
+ server = new grpc.Server();
+ server.addService(string_service_attrs, {
+ getPeer: function(call, callback) {
+ try {
+ callback(null, call.getPeer());
+ } catch (e) {
+ call.emit('error', e);
+ }
+ }
+ });
+ var port = server.bind('localhost:0', server_insecure_creds);
+ server.start();
+ var Client = grpc.makeGenericClientConstructor(string_service_attrs);
+ client = new Client('localhost:' + port,
+ grpc.credentials.createInsecure());
+ });
+ after(function() {
+ server.forceShutdown();
+ });
+ it('should respond with a string representing the client', function(done) {
+ client.getPeer('', function(err, response) {
+ assert.ifError(err);
+ // We don't expect a specific value, just that it worked without error
+ done();
+ });
+ });
+});
describe('Echo metadata', function() {
var client;
var server;