diff options
author | 2015-07-22 15:02:51 -0700 | |
---|---|---|
committer | 2015-07-22 15:02:51 -0700 | |
commit | 1d8e3dfa52d08cefe581f62f2ae7452f5bfab31b (patch) | |
tree | a3f9bce326e39a0786c30597e781f0cd8efb4ebb /src/node | |
parent | 49c8a15357dc5b47ca47910c964e809b5f8043a8 (diff) |
Add compression disabling without breaking anything else
Diffstat (limited to 'src/node')
-rw-r--r-- | src/node/ext/call.cc | 7 | ||||
-rw-r--r-- | src/node/src/client.js | 22 | ||||
-rw-r--r-- | src/node/src/server.js | 22 |
3 files changed, 37 insertions, 14 deletions
diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index 15c9b2d97d..7d21b8b4a5 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -207,6 +207,13 @@ class SendMessageOp : public Op { if (!::node::Buffer::HasInstance(value)) { return false; } + Handle<Object> object_value = value->ToObject(); + if (object_value->HasOwnProperty(NanNew("grpcWriteFlags"))) { + Handle<Value> flag_value = object_value->Get(NanNew("grpcWriteFlags")); + if (flag_value->IsUint32()) { + out->flags = flag_value->Uint32Value() & GRPC_WRITE_USED_MASK; + } + } out->data.send_message = BufferToByteBuffer(value); Persistent<Value> *handle = new Persistent<Value>(); NanAssignPersistent(*handle, value); diff --git a/src/node/src/client.js b/src/node/src/client.js index b7bad949d4..de9efd7e4a 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -72,13 +72,15 @@ function ClientWritableStream(call, serialize) { * Attempt to write the given chunk. Calls the callback when done. This is an * implementation of a method needed for implementing stream.Writable. * @param {Buffer} chunk The chunk to write - * @param {string} encoding Ignored + * @param {string} encoding Used to pass write flags * @param {function(Error=)} callback Called when the write is complete */ function _write(chunk, encoding, callback) { /* jshint validthis: true */ var batch = {}; - batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk); + var message = this.serialize(chunk); + message.grpcWriteFlags = encoding; + batch[grpc.opType.SEND_MESSAGE] = message; this.call.startBatch(batch, function(err, event) { if (err) { // Something has gone wrong. Stop writing by failing to call callback @@ -207,9 +209,11 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { * call * @param {(number|Date)=} deadline The deadline for processing this request. * Defaults to infinite future + * @param {number=} flags Flags for modifying how the message is sent. + * Defaults to 0. * @return {EventEmitter} An event emitter for stream related events */ - function makeUnaryRequest(argument, callback, metadata, deadline) { + function makeUnaryRequest(argument, callback, metadata, deadline, flags) { /* jshint validthis: true */ if (deadline === undefined) { deadline = Infinity; @@ -229,8 +233,10 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { return; } var client_batch = {}; + var message = serialize(argument); + message.grpcWriteFlags = flags; client_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; - client_batch[grpc.opType.SEND_MESSAGE] = serialize(argument); + client_batch[grpc.opType.SEND_MESSAGE] = message; client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; client_batch[grpc.opType.RECV_INITIAL_METADATA] = true; client_batch[grpc.opType.RECV_MESSAGE] = true; @@ -352,9 +358,11 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) { * call * @param {(number|Date)=} deadline The deadline for processing this request. * Defaults to infinite future + * @param {number=} flags Flags for modifying how the message is sent. + * Defaults to 0. * @return {EventEmitter} An event emitter for stream related events */ - function makeServerStreamRequest(argument, metadata, deadline) { + function makeServerStreamRequest(argument, metadata, deadline, flags) { /* jshint validthis: true */ if (deadline === undefined) { deadline = Infinity; @@ -371,9 +379,11 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) { return; } var start_batch = {}; + var message = serialize(argument); + message.grpcWriteFlags = flags; start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; - start_batch[grpc.opType.SEND_MESSAGE] = serialize(argument); + start_batch[grpc.opType.SEND_MESSAGE] = message; start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; call.startBatch(start_batch, function(err, response) { if (err) { diff --git a/src/node/src/server.js b/src/node/src/server.js index 0a3a0031bd..776fafb96a 100644 --- a/src/node/src/server.js +++ b/src/node/src/server.js @@ -107,8 +107,10 @@ function waitForCancel(call, emitter) { * @param {function(*):Buffer=} serialize Serialization function for the * response * @param {Object=} metadata Optional trailing metadata to send with status + * @param {number=} flags Flags for modifying how the message is sent. + * Defaults to 0. */ -function sendUnaryResponse(call, value, serialize, metadata) { +function sendUnaryResponse(call, value, serialize, metadata, flags) { var end_batch = {}; var status = { code: grpc.status.OK, @@ -122,7 +124,9 @@ function sendUnaryResponse(call, value, serialize, metadata) { end_batch[grpc.opType.SEND_INITIAL_METADATA] = {}; call.metadataSent = true; } - end_batch[grpc.opType.SEND_MESSAGE] = serialize(value); + var message = serialize(value); + message.grpcWriteFlags = flags; + end_batch[grpc.opType.SEND_MESSAGE] = message; end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status; call.startBatch(end_batch, function (){}); } @@ -243,7 +247,7 @@ function ServerWritableStream(call, serialize) { * Start writing a chunk of data. This is an implementation of a method required * for implementing stream.Writable. * @param {Buffer} chunk The chunk of data to write - * @param {string} encoding Ignored + * @param {string} encoding Used to pass write flags * @param {function(Error=)} callback Callback to indicate that the write is * complete */ @@ -254,7 +258,9 @@ function _write(chunk, encoding, callback) { batch[grpc.opType.SEND_INITIAL_METADATA] = {}; this.call.metadataSent = true; } - batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk); + var message = this.serialize(chunk); + message.grpcWriteFlags = encoding; + batch[grpc.opType.SEND_MESSAGE] = message; this.call.startBatch(batch, function(err, value) { if (err) { this.emit('error', err); @@ -411,14 +417,14 @@ function handleUnary(call, handler, metadata) { if (emitter.cancelled) { return; } - handler.func(emitter, function sendUnaryData(err, value, trailer) { + handler.func(emitter, function sendUnaryData(err, value, trailer, flags) { if (err) { if (trailer) { err.metadata = trailer; } handleError(call, err); } else { - sendUnaryResponse(call, value, handler.serialize, trailer); + sendUnaryResponse(call, value, handler.serialize, trailer, flags); } }); }); @@ -473,7 +479,7 @@ function handleClientStreaming(call, handler, metadata) { }); waitForCancel(call, stream); stream.metadata = metadata; - handler.func(stream, function(err, value, trailer) { + handler.func(stream, function(err, value, trailer, flags) { stream.terminate(); if (err) { if (trailer) { @@ -481,7 +487,7 @@ function handleClientStreaming(call, handler, metadata) { } handleError(call, err); } else { - sendUnaryResponse(call, value, handler.serialize, trailer); + sendUnaryResponse(call, value, handler.serialize, trailer, flags); } }); } |