aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Tim Emiola <tbetbetbe@users.noreply.github.com>2015-08-19 10:42:25 -0700
committerGravatar Tim Emiola <tbetbetbe@users.noreply.github.com>2015-08-19 10:42:25 -0700
commit7948de6815c9c35d831c5eb87ef111dae0ad800b (patch)
tree91eb753cef02b5567106b27d4308c10e3731161a
parent4836492061381701f15c4a121f44e6377786a07d (diff)
parent14733bde54e616300cab502f092feba744d1daa9 (diff)
Merge pull request #2960 from murgatroid99/node_compression
Add per-message compression disabling in Node
-rw-r--r--src/node/ext/call.cc11
-rw-r--r--src/node/ext/node_grpc.cc11
-rw-r--r--src/node/index.js5
-rw-r--r--src/node/src/client.js18
-rw-r--r--src/node/src/server.js26
5 files changed, 55 insertions, 16 deletions
diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc
index a79a47427f..dafe44c7a6 100644
--- a/src/node/ext/call.cc
+++ b/src/node/ext/call.cc
@@ -207,6 +207,13 @@ class SendMessageOp : public Op {
if (!::node::Buffer::HasInstance(value)) {
return false;
}
+ Handle<Object> object_value = value->ToObject();
+ if (object_value->HasOwnProperty(NanNew("grpcWriteFlags"))) {
+ Handle<Value> flag_value = object_value->Get(NanNew("grpcWriteFlags"));
+ if (flag_value->IsUint32()) {
+ out->flags = flag_value->Uint32Value() & GRPC_WRITE_USED_MASK;
+ }
+ }
out->data.send_message = BufferToByteBuffer(value);
Persistent<Value> *handle = new Persistent<Value>();
NanAssignPersistent(*handle, value);
@@ -457,10 +464,6 @@ void Call::Init(Handle<Object> exports) {
NanNew<FunctionTemplate>(GetPeer)->GetFunction());
NanAssignPersistent(fun_tpl, tpl);
Handle<Function> ctr = tpl->GetFunction();
- ctr->Set(NanNew("WRITE_BUFFER_HINT"),
- NanNew<Uint32, uint32_t>(GRPC_WRITE_BUFFER_HINT));
- ctr->Set(NanNew("WRITE_NO_COMPRESS"),
- NanNew<Uint32, uint32_t>(GRPC_WRITE_NO_COMPRESS));
exports->Set(NanNew("Call"), ctr);
constructor = new NanCallback(ctr);
}
diff --git a/src/node/ext/node_grpc.cc b/src/node/ext/node_grpc.cc
index d93dafda79..0cf30da922 100644
--- a/src/node/ext/node_grpc.cc
+++ b/src/node/ext/node_grpc.cc
@@ -196,6 +196,16 @@ void InitConnectivityStateConstants(Handle<Object> exports) {
channel_state->Set(NanNew("FATAL_FAILURE"), FATAL_FAILURE);
}
+void InitWriteFlags(Handle<Object> exports) {
+ NanScope();
+ Handle<Object> write_flags = NanNew<Object>();
+ exports->Set(NanNew("writeFlags"), write_flags);
+ Handle<Value> BUFFER_HINT(NanNew<Uint32, uint32_t>(GRPC_WRITE_BUFFER_HINT));
+ write_flags->Set(NanNew("BUFFER_HINT"), BUFFER_HINT);
+ Handle<Value> NO_COMPRESS(NanNew<Uint32, uint32_t>(GRPC_WRITE_NO_COMPRESS));
+ write_flags->Set(NanNew("NO_COMPRESS"), NO_COMPRESS);
+}
+
void init(Handle<Object> exports) {
NanScope();
grpc_init();
@@ -204,6 +214,7 @@ void init(Handle<Object> exports) {
InitOpTypeConstants(exports);
InitPropagateConstants(exports);
InitConnectivityStateConstants(exports);
+ InitWriteFlags(exports);
grpc::node::Call::Init(exports);
grpc::node::Channel::Init(exports);
diff --git a/src/node/index.js b/src/node/index.js
index 93c65ac5c4..889b0ac0e9 100644
--- a/src/node/index.js
+++ b/src/node/index.js
@@ -145,6 +145,11 @@ exports.propagate = grpc.propagate;
exports.callError = grpc.callError;
/**
+ * Write flag name to code number mapping
+ */
+exports.writeFlags = grpc.writeFlags;
+
+/**
* Credentials factories
*/
exports.Credentials = grpc.Credentials;
diff --git a/src/node/src/client.js b/src/node/src/client.js
index 50cbf4a133..48fe0dd3b7 100644
--- a/src/node/src/client.js
+++ b/src/node/src/client.js
@@ -79,13 +79,19 @@ function ClientWritableStream(call, serialize) {
* implementation of a method needed for implementing stream.Writable.
* @access private
* @param {Buffer} chunk The chunk to write
- * @param {string} encoding Ignored
+ * @param {string} encoding Used to pass write flags
* @param {function(Error=)} callback Called when the write is complete
*/
function _write(chunk, encoding, callback) {
/* jshint validthis: true */
var batch = {};
- batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk);
+ var message = this.serialize(chunk);
+ if (_.isFinite(encoding)) {
+ /* Attach the encoding if it is a finite number. This is the closest we
+ * can get to checking that it is valid flags */
+ message.grpcWriteFlags = encoding;
+ }
+ batch[grpc.opType.SEND_MESSAGE] = message;
this.call.startBatch(batch, function(err, event) {
if (err) {
// Something has gone wrong. Stop writing by failing to call callback
@@ -273,8 +279,10 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
return;
}
var client_batch = {};
+ var message = serialize(argument);
+ message.grpcWriteFlags = options.flags;
client_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
- client_batch[grpc.opType.SEND_MESSAGE] = serialize(argument);
+ client_batch[grpc.opType.SEND_MESSAGE] = message;
client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
client_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
client_batch[grpc.opType.RECV_MESSAGE] = true;
@@ -407,9 +415,11 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) {
return;
}
var start_batch = {};
+ var message = serialize(argument);
+ message.grpcWriteFlags = options.flags;
start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata;
start_batch[grpc.opType.RECV_INITIAL_METADATA] = true;
- start_batch[grpc.opType.SEND_MESSAGE] = serialize(argument);
+ start_batch[grpc.opType.SEND_MESSAGE] = message;
start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true;
call.startBatch(start_batch, function(err, response) {
if (err) {
diff --git a/src/node/src/server.js b/src/node/src/server.js
index 8b86173f08..5037abae43 100644
--- a/src/node/src/server.js
+++ b/src/node/src/server.js
@@ -115,8 +115,10 @@ function waitForCancel(call, emitter) {
* @param {function(*):Buffer=} serialize Serialization function for the
* response
* @param {Object=} metadata Optional trailing metadata to send with status
+ * @param {number=} flags Flags for modifying how the message is sent.
+ * Defaults to 0.
*/
-function sendUnaryResponse(call, value, serialize, metadata) {
+function sendUnaryResponse(call, value, serialize, metadata, flags) {
var end_batch = {};
var status = {
code: grpc.status.OK,
@@ -130,7 +132,9 @@ function sendUnaryResponse(call, value, serialize, metadata) {
end_batch[grpc.opType.SEND_INITIAL_METADATA] = {};
call.metadataSent = true;
}
- end_batch[grpc.opType.SEND_MESSAGE] = serialize(value);
+ var message = serialize(value);
+ message.grpcWriteFlags = flags;
+ end_batch[grpc.opType.SEND_MESSAGE] = message;
end_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status;
call.startBatch(end_batch, function (){});
}
@@ -254,7 +258,7 @@ function ServerWritableStream(call, serialize) {
* for implementing stream.Writable.
* @access private
* @param {Buffer} chunk The chunk of data to write
- * @param {string} encoding Ignored
+ * @param {string} encoding Used to pass write flags
* @param {function(Error=)} callback Callback to indicate that the write is
* complete
*/
@@ -265,7 +269,13 @@ function _write(chunk, encoding, callback) {
batch[grpc.opType.SEND_INITIAL_METADATA] = {};
this.call.metadataSent = true;
}
- batch[grpc.opType.SEND_MESSAGE] = this.serialize(chunk);
+ var message = this.serialize(chunk);
+ if (_.isFinite(encoding)) {
+ /* Attach the encoding if it is a finite number. This is the closest we
+ * can get to checking that it is valid flags */
+ message.grpcWriteFlags = encoding;
+ }
+ batch[grpc.opType.SEND_MESSAGE] = message;
this.call.startBatch(batch, function(err, value) {
if (err) {
this.emit('error', err);
@@ -450,14 +460,14 @@ function handleUnary(call, handler, metadata) {
if (emitter.cancelled) {
return;
}
- handler.func(emitter, function sendUnaryData(err, value, trailer) {
+ handler.func(emitter, function sendUnaryData(err, value, trailer, flags) {
if (err) {
if (trailer) {
err.metadata = trailer;
}
handleError(call, err);
} else {
- sendUnaryResponse(call, value, handler.serialize, trailer);
+ sendUnaryResponse(call, value, handler.serialize, trailer, flags);
}
});
});
@@ -514,7 +524,7 @@ function handleClientStreaming(call, handler, metadata) {
});
waitForCancel(call, stream);
stream.metadata = metadata;
- handler.func(stream, function(err, value, trailer) {
+ handler.func(stream, function(err, value, trailer, flags) {
stream.terminate();
if (err) {
if (trailer) {
@@ -522,7 +532,7 @@ function handleClientStreaming(call, handler, metadata) {
}
handleError(call, err);
} else {
- sendUnaryResponse(call, value, handler.serialize, trailer);
+ sendUnaryResponse(call, value, handler.serialize, trailer, flags);
}
});
}