aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Nicolas Noble <nnoble@google.com>2015-01-26 17:03:08 -0800
committerGravatar Nicolas Noble <nnoble@google.com>2015-01-26 17:03:08 -0800
commitf66ccc44d7509303a22a5ed602b366d436864afd (patch)
tree440b5572ae86ebebee04f8878eeb76be802eec4e /src
parente04455a7ff1ead05445daa95d7bc822a7f40b6f5 (diff)
parent49724152edc6907fa4b872f9e7580d4b78a06f77 (diff)
Merge branch 'master' of github.com:google/grpc into json
Diffstat (limited to 'src')
-rw-r--r--src/core/channel/call_op_string.c96
-rw-r--r--src/core/channel/connected_channel.c1
-rw-r--r--src/core/httpcli/format_request.c82
-rw-r--r--src/core/support/string.c48
-rw-r--r--src/core/support/string.h21
-rw-r--r--src/core/surface/event_string.c80
-rw-r--r--src/node/client.js8
-rw-r--r--src/node/server.js1
-rw-r--r--src/node/surface_client.js23
-rw-r--r--src/node/surface_server.js9
-rw-r--r--src/node/test/client_server_test.js28
-rw-r--r--src/node/test/interop_sanity_test.js3
-rw-r--r--src/node/test/surface_test.js53
-rwxr-xr-xsrc/ruby/README.md27
14 files changed, 334 insertions, 146 deletions
diff --git a/src/core/channel/call_op_string.c b/src/core/channel/call_op_string.c
index 789913901a..d36d51e789 100644
--- a/src/core/channel/call_op_string.c
+++ b/src/core/channel/call_op_string.c
@@ -41,110 +41,86 @@
#include <grpc/support/alloc.h>
#include <grpc/support/useful.h>
-#define MAX_APPEND 1024
+static void put_metadata(gpr_strvec *b, grpc_mdelem *md) {
+ gpr_strvec_add(b, gpr_strdup(" key="));
+ gpr_strvec_add(b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->key->slice),
+ GPR_SLICE_LENGTH(md->key->slice), GPR_HEXDUMP_PLAINTEXT));
-typedef struct {
- size_t cap;
- size_t len;
- char *buffer;
-} buf;
-
-static void bprintf(buf *b, const char *fmt, ...) {
- va_list arg;
- if (b->len + MAX_APPEND > b->cap) {
- b->cap = GPR_MAX(b->len + MAX_APPEND, b->cap * 3 / 2);
- b->buffer = gpr_realloc(b->buffer, b->cap);
- }
- va_start(arg, fmt);
- b->len += vsprintf(b->buffer + b->len, fmt, arg);
- va_end(arg);
-}
-
-static void bputs(buf *b, const char *s) {
- size_t slen = strlen(s);
- if (b->len + slen + 1 > b->cap) {
- b->cap = GPR_MAX(b->len + slen + 1, b->cap * 3 / 2);
- b->buffer = gpr_realloc(b->buffer, b->cap);
- }
- strcat(b->buffer, s);
- b->len += slen;
-}
-
-static void put_metadata(buf *b, grpc_mdelem *md) {
- char *txt;
-
- txt = gpr_hexdump((char *)GPR_SLICE_START_PTR(md->key->slice),
- GPR_SLICE_LENGTH(md->key->slice), GPR_HEXDUMP_PLAINTEXT);
- bputs(b, " key=");
- bputs(b, txt);
- gpr_free(txt);
-
- txt = gpr_hexdump((char *)GPR_SLICE_START_PTR(md->value->slice),
- GPR_SLICE_LENGTH(md->value->slice), GPR_HEXDUMP_PLAINTEXT);
- bputs(b, " value=");
- bputs(b, txt);
- gpr_free(txt);
+ gpr_strvec_add(b, gpr_strdup(" value="));
+ gpr_strvec_add(b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->value->slice),
+ GPR_SLICE_LENGTH(md->value->slice), GPR_HEXDUMP_PLAINTEXT));
}
char *grpc_call_op_string(grpc_call_op *op) {
- buf b = {0, 0, 0};
+ char *tmp;
+ char *out;
+
+ gpr_strvec b;
+ gpr_strvec_init(&b);
switch (op->dir) {
case GRPC_CALL_DOWN:
- bprintf(&b, ">");
+ gpr_strvec_add(&b, gpr_strdup(">"));
break;
case GRPC_CALL_UP:
- bprintf(&b, "<");
+ gpr_strvec_add(&b, gpr_strdup("<"));
break;
}
switch (op->type) {
case GRPC_SEND_METADATA:
- bprintf(&b, "SEND_METADATA");
+ gpr_strvec_add(&b, gpr_strdup("SEND_METADATA"));
put_metadata(&b, op->data.metadata);
break;
case GRPC_SEND_DEADLINE:
- bprintf(&b, "SEND_DEADLINE %d.%09d", op->data.deadline.tv_sec,
+ gpr_asprintf(&tmp, "SEND_DEADLINE %d.%09d", op->data.deadline.tv_sec,
op->data.deadline.tv_nsec);
+ gpr_strvec_add(&b, tmp);
break;
case GRPC_SEND_START:
- bprintf(&b, "SEND_START pollset=%p", op->data.start.pollset);
+ gpr_asprintf(&tmp, "SEND_START pollset=%p", op->data.start.pollset);
+ gpr_strvec_add(&b, tmp);
break;
case GRPC_SEND_MESSAGE:
- bprintf(&b, "SEND_MESSAGE");
+ gpr_strvec_add(&b, gpr_strdup("SEND_MESSAGE"));
break;
case GRPC_SEND_FINISH:
- bprintf(&b, "SEND_FINISH");
+ gpr_strvec_add(&b, gpr_strdup("SEND_FINISH"));
break;
case GRPC_REQUEST_DATA:
- bprintf(&b, "REQUEST_DATA");
+ gpr_strvec_add(&b, gpr_strdup("REQUEST_DATA"));
break;
case GRPC_RECV_METADATA:
- bprintf(&b, "RECV_METADATA");
+ gpr_strvec_add(&b, gpr_strdup("RECV_METADATA"));
put_metadata(&b, op->data.metadata);
break;
case GRPC_RECV_DEADLINE:
- bprintf(&b, "RECV_DEADLINE %d.%09d", op->data.deadline.tv_sec,
+ gpr_asprintf(&tmp, "RECV_DEADLINE %d.%09d", op->data.deadline.tv_sec,
op->data.deadline.tv_nsec);
+ gpr_strvec_add(&b, tmp);
break;
case GRPC_RECV_END_OF_INITIAL_METADATA:
- bprintf(&b, "RECV_END_OF_INITIAL_METADATA");
+ gpr_strvec_add(&b, gpr_strdup("RECV_END_OF_INITIAL_METADATA"));
break;
case GRPC_RECV_MESSAGE:
- bprintf(&b, "RECV_MESSAGE");
+ gpr_strvec_add(&b, gpr_strdup("RECV_MESSAGE"));
break;
case GRPC_RECV_HALF_CLOSE:
- bprintf(&b, "RECV_HALF_CLOSE");
+ gpr_strvec_add(&b, gpr_strdup("RECV_HALF_CLOSE"));
break;
case GRPC_RECV_FINISH:
- bprintf(&b, "RECV_FINISH");
+ gpr_strvec_add(&b, gpr_strdup("RECV_FINISH"));
break;
case GRPC_CANCEL_OP:
- bprintf(&b, "CANCEL_OP");
+ gpr_strvec_add(&b, gpr_strdup("CANCEL_OP"));
break;
}
- bprintf(&b, " flags=0x%08x", op->flags);
+ gpr_asprintf(&tmp, " flags=0x%08x", op->flags);
+ gpr_strvec_add(&b, tmp);
+
+ out = gpr_strvec_flatten(&b, NULL);
+ gpr_strvec_destroy(&b);
- return b.buffer;
+ return out;
}
void grpc_call_log_op(char *file, int line, gpr_log_severity severity,
diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c
index 47c0ed3b88..55486ed5c3 100644
--- a/src/core/channel/connected_channel.c
+++ b/src/core/channel/connected_channel.c
@@ -450,6 +450,7 @@ static void recv_batch(void *user_data, grpc_transport *transport,
(int)calld->incoming_message.length,
(int)calld->incoming_message_length);
recv_error(chand, calld, __LINE__, message);
+ gpr_free(message);
}
call_op.type = GRPC_RECV_HALF_CLOSE;
call_op.dir = GRPC_CALL_UP;
diff --git a/src/core/httpcli/format_request.c b/src/core/httpcli/format_request.c
index 7a44f1266f..58bb7c740e 100644
--- a/src/core/httpcli/format_request.c
+++ b/src/core/httpcli/format_request.c
@@ -37,67 +37,57 @@
#include <stdio.h>
#include <string.h>
+#include "src/core/support/string.h"
#include <grpc/support/alloc.h>
#include <grpc/support/slice.h>
#include <grpc/support/useful.h>
-typedef struct {
- size_t length;
- size_t capacity;
- char *data;
-} sbuf;
-
-static void sbuf_append(sbuf *buf, const char *bytes, size_t len) {
- if (buf->length + len > buf->capacity) {
- buf->capacity = GPR_MAX(buf->length + len, buf->capacity * 3 / 2);
- buf->data = gpr_realloc(buf->data, buf->capacity);
- }
- memcpy(buf->data + buf->length, bytes, len);
- buf->length += len;
-}
-
-static void sbprintf(sbuf *buf, const char *fmt, ...) {
- char temp[GRPC_HTTPCLI_MAX_HEADER_LENGTH];
- size_t len;
- va_list args;
-
- va_start(args, fmt);
- len = vsprintf(temp, fmt, args);
- va_end(args);
-
- sbuf_append(buf, temp, len);
-}
-
-static void fill_common_header(const grpc_httpcli_request *request, sbuf *buf) {
+static void fill_common_header(const grpc_httpcli_request *request, gpr_strvec *buf) {
size_t i;
- sbprintf(buf, "%s HTTP/1.0\r\n", request->path);
+ gpr_strvec_add(buf, gpr_strdup(request->path));
+ gpr_strvec_add(buf, gpr_strdup(" HTTP/1.0\r\n"));
/* just in case some crazy server really expects HTTP/1.1 */
- sbprintf(buf, "Host: %s\r\n", request->host);
- sbprintf(buf, "Connection: close\r\n");
- sbprintf(buf, "User-Agent: %s\r\n", GRPC_HTTPCLI_USER_AGENT);
+ gpr_strvec_add(buf, gpr_strdup("Host: "));
+ gpr_strvec_add(buf, gpr_strdup(request->host));
+ gpr_strvec_add(buf, gpr_strdup("\r\n"));
+ gpr_strvec_add(buf, gpr_strdup("Connection: close\r\n"));
+ gpr_strvec_add(buf, gpr_strdup("User-Agent: "GRPC_HTTPCLI_USER_AGENT"\r\n"));
/* user supplied headers */
for (i = 0; i < request->hdr_count; i++) {
- sbprintf(buf, "%s: %s\r\n", request->hdrs[i].key, request->hdrs[i].value);
+ gpr_strvec_add(buf, gpr_strdup(request->hdrs[i].key));
+ gpr_strvec_add(buf, gpr_strdup(": "));
+ gpr_strvec_add(buf, gpr_strdup(request->hdrs[i].value));
+ gpr_strvec_add(buf, gpr_strdup("\r\n"));
}
}
gpr_slice grpc_httpcli_format_get_request(const grpc_httpcli_request *request) {
- sbuf out = {0, 0, NULL};
+ gpr_strvec out;
+ char *flat;
+ size_t flat_len;
- sbprintf(&out, "GET ");
+ gpr_strvec_init(&out);
+ gpr_strvec_add(&out, gpr_strdup("GET "));
fill_common_header(request, &out);
- sbprintf(&out, "\r\n");
+ gpr_strvec_add(&out, gpr_strdup("\r\n"));
- return gpr_slice_new(out.data, out.length, gpr_free);
+ flat = gpr_strvec_flatten(&out, &flat_len);
+ gpr_strvec_destroy(&out);
+
+ return gpr_slice_new(flat, flat_len, gpr_free);
}
gpr_slice grpc_httpcli_format_post_request(const grpc_httpcli_request *request,
const char *body_bytes,
size_t body_size) {
- sbuf out = {0, 0, NULL};
+ gpr_strvec out;
+ char *tmp;
+ size_t out_len;
size_t i;
- sbprintf(&out, "POST ");
+ gpr_strvec_init(&out);
+
+ gpr_strvec_add(&out, gpr_strdup("POST "));
fill_common_header(request, &out);
if (body_bytes) {
gpr_uint8 has_content_type = 0;
@@ -108,14 +98,18 @@ gpr_slice grpc_httpcli_format_post_request(const grpc_httpcli_request *request,
}
}
if (!has_content_type) {
- sbprintf(&out, "Content-Type: text/plain\r\n");
+ gpr_strvec_add(&out, gpr_strdup("Content-Type: text/plain\r\n"));
}
- sbprintf(&out, "Content-Length: %lu\r\n", (unsigned long)body_size);
+ gpr_asprintf(&tmp, "Content-Length: %lu\r\n", (unsigned long)body_size);
+ gpr_strvec_add(&out, tmp);
}
- sbprintf(&out, "\r\n");
+ gpr_strvec_add(&out, gpr_strdup("\r\n"));
+ tmp = gpr_strvec_flatten(&out, &out_len);
if (body_bytes) {
- sbuf_append(&out, body_bytes, body_size);
+ tmp = gpr_realloc(tmp, out_len + body_size);
+ memcpy(tmp + out_len, body_bytes, body_size);
+ out_len += body_size;
}
- return gpr_slice_new(out.data, out.length, gpr_free);
+ return gpr_slice_new(tmp, out_len, gpr_free);
}
diff --git a/src/core/support/string.c b/src/core/support/string.c
index 9b5cac7596..97bce60f94 100644
--- a/src/core/support/string.c
+++ b/src/core/support/string.c
@@ -14,7 +14,7 @@
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
+ * contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
@@ -152,3 +152,49 @@ int gpr_ltoa(long value, char *string) {
string[i] = 0;
return i;
}
+
+char *gpr_strjoin(const char **strs, size_t nstrs, size_t *final_length) {
+ size_t out_length = 0;
+ size_t i;
+ char *out;
+ for (i = 0; i < nstrs; i++) {
+ out_length += strlen(strs[i]);
+ }
+ out_length += 1; /* null terminator */
+ out = gpr_malloc(out_length);
+ out_length = 0;
+ for (i = 0; i < nstrs; i++) {
+ size_t slen = strlen(strs[i]);
+ memcpy(out + out_length, strs[i], slen);
+ out_length += slen;
+ }
+ out[out_length] = 0;
+ if (final_length != NULL) {
+ *final_length = out_length;
+ }
+ return out;
+}
+
+void gpr_strvec_init(gpr_strvec *sv) {
+ memset(sv, 0, sizeof(*sv));
+}
+
+void gpr_strvec_destroy(gpr_strvec *sv) {
+ size_t i;
+ for (i = 0; i < sv->count; i++) {
+ gpr_free(sv->strs[i]);
+ }
+ gpr_free(sv->strs);
+}
+
+void gpr_strvec_add(gpr_strvec *sv, char *str) {
+ if (sv->count == sv->capacity) {
+ sv->capacity = GPR_MAX(sv->capacity + 8, sv->capacity * 2);
+ sv->strs = gpr_realloc(sv->strs, sizeof(char*) * sv->capacity);
+ }
+ sv->strs[sv->count++] = str;
+}
+
+char *gpr_strvec_flatten(gpr_strvec *sv, size_t *final_length) {
+ return gpr_strjoin((const char**)sv->strs, sv->count, final_length);
+}
diff --git a/src/core/support/string.h b/src/core/support/string.h
index 28b7029ecd..64e06d3b6a 100644
--- a/src/core/support/string.h
+++ b/src/core/support/string.h
@@ -81,6 +81,27 @@ void gpr_reverse_bytes(char *str, int len);
the result is undefined. */
int gpr_asprintf(char **strp, const char *format, ...);
+/* Join a set of strings, returning the resulting string.
+ Total combined length (excluding null terminator) is returned in total_length
+ if it is non-null. */
+char *gpr_strjoin(const char **strs, size_t nstrs, size_t *total_length);
+
+/* A vector of strings... for building up a final string one piece at a time */
+typedef struct {
+ char **strs;
+ size_t count;
+ size_t capacity;
+} gpr_strvec;
+
+/* Initialize/destroy */
+void gpr_strvec_init(gpr_strvec *strs);
+void gpr_strvec_destroy(gpr_strvec *strs);
+/* Add a string to a strvec, takes ownership of the string */
+void gpr_strvec_add(gpr_strvec *strs, char *add);
+/* Return a joined string with all added substrings, optionally setting
+ total_length as per gpr_strjoin */
+char *gpr_strvec_flatten(gpr_strvec *strs, size_t *total_length);
+
#ifdef __cplusplus
}
#endif
diff --git a/src/core/surface/event_string.c b/src/core/surface/event_string.c
index e38ef06c9f..8975d312ee 100644
--- a/src/core/surface/event_string.c
+++ b/src/core/surface/event_string.c
@@ -38,8 +38,10 @@
#include "src/core/support/string.h"
#include <grpc/byte_buffer.h>
-static size_t addhdr(char *p, grpc_event *ev) {
- return sprintf(p, "tag:%p call:%p", ev->tag, (void *)ev->call);
+static void addhdr(gpr_strvec *buf, grpc_event *ev) {
+ char *tmp;
+ gpr_asprintf(&tmp, "tag:%p call:%p", ev->tag, (void *)ev->call);
+ gpr_strvec_add(buf, tmp);
}
static const char *errstr(grpc_op_error err) {
@@ -52,72 +54,84 @@ static const char *errstr(grpc_op_error err) {
return "UNKNOWN_UNKNOWN";
}
-static size_t adderr(char *p, grpc_op_error err) {
- return sprintf(p, " err=%s", errstr(err));
+static void adderr(gpr_strvec *buf, grpc_op_error err) {
+ char *tmp;
+ gpr_asprintf(&tmp, " err=%s", errstr(err));
+ gpr_strvec_add(buf, tmp);
}
char *grpc_event_string(grpc_event *ev) {
- char buffer[1024];
- char *p = buffer;
+ char *out;
+ char *tmp;
+ gpr_strvec buf;
if (ev == NULL) return gpr_strdup("null");
+ gpr_strvec_init(&buf);
+
switch (ev->type) {
case GRPC_SERVER_SHUTDOWN:
- p += sprintf(p, "SERVER_SHUTDOWN");
+ gpr_strvec_add(&buf, gpr_strdup("SERVER_SHUTDOWN"));
break;
case GRPC_QUEUE_SHUTDOWN:
- p += sprintf(p, "QUEUE_SHUTDOWN");
+ gpr_strvec_add(&buf, gpr_strdup("QUEUE_SHUTDOWN"));
break;
case GRPC_READ:
- p += sprintf(p, "READ: ");
- p += addhdr(p, ev);
+ gpr_strvec_add(&buf, gpr_strdup("READ: "));
+ addhdr(&buf, ev);
if (ev->data.read) {
- p += sprintf(p, " %d bytes",
+ gpr_asprintf(&tmp, " %d bytes",
(int)grpc_byte_buffer_length(ev->data.read));
+ gpr_strvec_add(&buf, tmp);
} else {
- p += sprintf(p, " end-of-stream");
+ gpr_strvec_add(&buf, gpr_strdup(" end-of-stream"));
}
break;
case GRPC_INVOKE_ACCEPTED:
- p += sprintf(p, "INVOKE_ACCEPTED: ");
- p += addhdr(p, ev);
- p += adderr(p, ev->data.invoke_accepted);
+ gpr_strvec_add(&buf, gpr_strdup("INVOKE_ACCEPTED: "));
+ addhdr(&buf, ev);
+ adderr(&buf, ev->data.invoke_accepted);
break;
case GRPC_WRITE_ACCEPTED:
- p += sprintf(p, "WRITE_ACCEPTED: ");
- p += addhdr(p, ev);
- p += adderr(p, ev->data.write_accepted);
+ gpr_strvec_add(&buf, gpr_strdup("WRITE_ACCEPTED: "));
+ addhdr(&buf, ev);
+ adderr(&buf, ev->data.write_accepted);
break;
case GRPC_FINISH_ACCEPTED:
- p += sprintf(p, "FINISH_ACCEPTED: ");
- p += addhdr(p, ev);
- p += adderr(p, ev->data.write_accepted);
+ gpr_strvec_add(&buf, gpr_strdup("FINISH_ACCEPTED: "));
+ addhdr(&buf, ev);
+ adderr(&buf, ev->data.write_accepted);
break;
case GRPC_CLIENT_METADATA_READ:
- p += sprintf(p, "CLIENT_METADATA_READ: ");
- p += addhdr(p, ev);
- p += sprintf(p, " %d elements", (int)ev->data.client_metadata_read.count);
+ gpr_strvec_add(&buf, gpr_strdup("CLIENT_METADATA_READ: "));
+ addhdr(&buf, ev);
+ gpr_asprintf(&tmp, " %d elements",
+ (int)ev->data.client_metadata_read.count);
+ gpr_strvec_add(&buf, tmp);
break;
case GRPC_FINISHED:
- p += sprintf(p, "FINISHED: ");
- p += addhdr(p, ev);
- p += sprintf(p, " status=%d details='%s' %d metadata elements",
+ gpr_strvec_add(&buf, gpr_strdup("FINISHED: "));
+ addhdr(&buf, ev);
+ gpr_asprintf(&tmp, " status=%d details='%s' %d metadata elements",
ev->data.finished.status, ev->data.finished.details,
(int)ev->data.finished.metadata_count);
+ gpr_strvec_add(&buf, tmp);
break;
case GRPC_SERVER_RPC_NEW:
- p += sprintf(p, "SERVER_RPC_NEW: ");
- p += addhdr(p, ev);
- p += sprintf(p, " method='%s' host='%s' %d metadata elements",
+ gpr_strvec_add(&buf, gpr_strdup("SERVER_RPC_NEW: "));
+ addhdr(&buf, ev);
+ gpr_asprintf(&tmp, " method='%s' host='%s' %d metadata elements",
ev->data.server_rpc_new.method, ev->data.server_rpc_new.host,
(int)ev->data.server_rpc_new.metadata_count);
+ gpr_strvec_add(&buf, tmp);
break;
case GRPC_COMPLETION_DO_NOT_USE:
- p += sprintf(p, "DO_NOT_USE (this is a bug)");
- p += addhdr(p, ev);
+ gpr_strvec_add(&buf, gpr_strdup("DO_NOT_USE (this is a bug)"));
+ addhdr(&buf, ev);
break;
}
- return gpr_strdup(buffer);
+ out = gpr_strvec_flatten(&buf, NULL);
+ gpr_strvec_destroy(&buf);
+ return out;
}
diff --git a/src/node/client.js b/src/node/client.js
index 7007852b93..3a1c9eef84 100644
--- a/src/node/client.js
+++ b/src/node/client.js
@@ -161,6 +161,14 @@ GrpcClientStream.prototype._write = function(chunk, encoding, callback) {
};
/**
+ * Cancel the ongoing call. If the call has not already finished, it will finish
+ * with status CANCELLED.
+ */
+GrpcClientStream.prototype.cancel = function() {
+ this._call.cancel();
+};
+
+/**
* Make a request on the channel to the given method with the given arguments
* @param {grpc.Channel} channel The channel on which to make the request
* @param {string} method The method to request
diff --git a/src/node/server.js b/src/node/server.js
index fe50acb5a1..03cdbe6f98 100644
--- a/src/node/server.js
+++ b/src/node/server.js
@@ -246,6 +246,7 @@ function Server(options) {
call.serverAccept(function(event) {
if (event.data.code === grpc.status.CANCELLED) {
cancelled = true;
+ stream.emit('cancelled');
}
}, 0);
call.serverEndInitialMetadata(0);
diff --git a/src/node/surface_client.js b/src/node/surface_client.js
index b63ae13e8d..16c31809f4 100644
--- a/src/node/surface_client.js
+++ b/src/node/surface_client.js
@@ -129,6 +129,16 @@ function _write(chunk, encoding, callback) {
ClientWritableObjectStream.prototype._write = _write;
/**
+ * Cancel the underlying call
+ */
+function cancel() {
+ this._stream.cancel();
+}
+
+ClientReadableObjectStream.prototype.cancel = cancel;
+ClientWritableObjectStream.prototype.cancel = cancel;
+
+/**
* Get a function that can make unary requests to the specified method.
* @param {string} method The name of the method to request
* @param {function(*):Buffer} serialize The serialization function for inputs
@@ -155,6 +165,9 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
var stream = client.makeRequest(this.channel, method, serialize,
deserialize, metadata, deadline);
var emitter = new EventEmitter();
+ emitter.cancel = function cancel() {
+ stream.cancel();
+ };
forwardEvent(stream, emitter, 'status');
forwardEvent(stream, emitter, 'metadata');
stream.write(argument);
@@ -166,6 +179,11 @@ function makeUnaryRequestFunction(method, serialize, deserialize) {
callback(e);
}
});
+ stream.on('status', function forwardStatus(status) {
+ if (status.code !== client.status.OK) {
+ callback(status);
+ }
+ });
return emitter;
}
return makeUnaryRequest;
@@ -203,6 +221,11 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) {
callback(e);
}
});
+ stream.on('status', function forwardStatus(status) {
+ if (status.code !== client.status.OK) {
+ callback(status);
+ }
+ });
return obj_stream;
}
return makeClientStreamRequest;
diff --git a/src/node/surface_server.js b/src/node/surface_server.js
index 07c5339f62..af23ec211c 100644
--- a/src/node/surface_server.js
+++ b/src/node/surface_server.js
@@ -63,6 +63,9 @@ function ServerReadableObjectStream(stream) {
get: function() { return stream.cancelled; }
});
var self = this;
+ this._stream.on('cancelled', function() {
+ self.emit('cancelled');
+ });
this._stream.on('data', function forwardData(chunk) {
if (!self.push(chunk)) {
self._stream.pause();
@@ -100,6 +103,9 @@ function ServerWritableObjectStream(stream) {
var options = {objectMode: true};
Writable.call(this, options);
this._stream = stream;
+ this._stream.on('cancelled', function() {
+ self.emit('cancelled');
+ });
this.on('finish', function() {
this._stream.end();
});
@@ -138,6 +144,9 @@ function makeUnaryHandler(handler) {
Object.defineProperty(call, 'cancelled', {
get: function() { return stream.cancelled;}
});
+ stream.on('cancelled', function() {
+ call.emit('cancelled');
+ });
handler(call, function sendUnaryData(err, value) {
if (err) {
stream.emit('error', err);
diff --git a/src/node/test/client_server_test.js b/src/node/test/client_server_test.js
index 2a25908684..99438a1659 100644
--- a/src/node/test/client_server_test.js
+++ b/src/node/test/client_server_test.js
@@ -77,6 +77,14 @@ function errorHandler(stream) {
};
}
+/**
+ * Wait for a cancellation instead of responding
+ * @param {Stream} stream
+ */
+function cancelHandler(stream) {
+ // do nothing
+}
+
describe('echo client', function() {
it('should receive echo responses', function(done) {
var server = new Server();
@@ -125,6 +133,26 @@ describe('echo client', function() {
done();
});
});
+ it('should be able to cancel a call', function(done) {
+ var server = new Server();
+ var port_num = server.bind('0.0.0.0:0');
+ server.register('cancellation', cancelHandler);
+ server.start();
+
+ var channel = new grpc.Channel('localhost:' + port_num);
+ var stream = client.makeRequest(
+ channel,
+ 'cancellation',
+ null,
+ getDeadline(1));
+
+ stream.cancel();
+ stream.on('status', function(status) {
+ assert.equal(status.code, grpc.status.CANCELLED);
+ server.shutdown();
+ done();
+ });
+ });
});
/* TODO(mlumish): explore options for reducing duplication between this test
* and the insecure echo client test */
diff --git a/src/node/test/interop_sanity_test.js b/src/node/test/interop_sanity_test.js
index 8ea48c359f..6cc7d444cd 100644
--- a/src/node/test/interop_sanity_test.js
+++ b/src/node/test/interop_sanity_test.js
@@ -48,6 +48,9 @@ describe('Interop tests', function() {
port = 'localhost:' + server_obj.port;
done();
});
+ after(function() {
+ server.shutdown();
+ });
// This depends on not using a binary stream
it('should pass empty_unary', function(done) {
interop_client.runTest(port, name_override, 'empty_unary', true, done);
diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js
index 34f1a156eb..16e4869d83 100644
--- a/src/node/test/surface_test.js
+++ b/src/node/test/surface_test.js
@@ -35,6 +35,8 @@ var assert = require('assert');
var surface_server = require('../surface_server.js');
+var surface_client = require('../surface_client.js');
+
var ProtoBuf = require('protobufjs');
var grpc = require('..');
@@ -73,3 +75,54 @@ describe('Surface server constructor', function() {
}, /math.Math/);
});
});
+describe('Surface client', function() {
+ var client;
+ var server;
+ before(function() {
+ var Server = grpc.buildServer([mathService]);
+ server = new Server({
+ 'math.Math': {
+ 'div': function(stream) {},
+ 'divMany': function(stream) {},
+ 'fib': function(stream) {},
+ 'sum': function(stream) {}
+ }
+ });
+ var port = server.bind('localhost:0');
+ var Client = surface_client.makeClientConstructor(mathService);
+ client = new Client('localhost:' + port);
+ });
+ after(function() {
+ server.shutdown();
+ });
+ it('Should correctly cancel a unary call', function(done) {
+ var call = client.div({'divisor': 0, 'dividend': 0}, function(err, resp) {
+ assert.strictEqual(err.code, surface_client.status.CANCELLED);
+ done();
+ });
+ call.cancel();
+ });
+ it('Should correctly cancel a client stream call', function(done) {
+ var call = client.sum(function(err, resp) {
+ assert.strictEqual(err.code, surface_client.status.CANCELLED);
+ done();
+ });
+ call.cancel();
+ });
+ it('Should correctly cancel a server stream call', function(done) {
+ var call = client.fib({'limit': 5});
+ call.on('status', function(status) {
+ assert.strictEqual(status.code, surface_client.status.CANCELLED);
+ done();
+ });
+ call.cancel();
+ });
+ it('Should correctly cancel a bidi stream call', function(done) {
+ var call = client.divMany();
+ call.on('status', function(status) {
+ assert.strictEqual(status.code, surface_client.status.CANCELLED);
+ done();
+ });
+ call.cancel();
+ });
+});
diff --git a/src/ruby/README.md b/src/ruby/README.md
index 7f7558dc67..7ece7e2706 100755
--- a/src/ruby/README.md
+++ b/src/ruby/README.md
@@ -14,9 +14,10 @@ INSTALLING
----------
- Install the gRPC core library
-TODO: describe this, once the core distribution mechanism is defined.
-
+ TODO: describe this, once the core distribution mechanism is defined.
+```
$ gem install grpc
+```
Installing from source
@@ -24,37 +25,47 @@ Installing from source
- Build or Install the gRPC core
E.g, from the root of the grpc [git repo](https://github.com/google/grpc)
+```
$ cd ../..
$ make && sudo make install
+```
- Install Ruby 2.x. Consider doing this with [RVM](http://rvm.io), it's a nice way of controlling
the exact ruby version that's used.
+```
$ command curl -sSL https://rvm.io/mpapis.asc | gpg --import -
$ \curl -sSL https://get.rvm.io | bash -s stable --ruby
$
$ # follow the instructions to ensure that your're using the latest stable version of Ruby
$ # and that the rvm command is installed
+```
- Install [bundler](http://bundler.io/)
+```
$ gem install bundler
+```
- Finally, install grpc ruby locally.
+```
$ cd <install_dir>
$ bundle install
$ rake # compiles the extension, runs the unit tests, see rake -T for other options
-
+```
CONTENTS
--------
Directory structure is the layout for [ruby extensions](http://guides.rubygems.org/gems-with-extensions/)
- * ext: the extension code
- * lib: the entrypoint grpc ruby library to be used in a 'require' statement
- * spec: tests
- * bin: example gRPC clients and servers, e.g,
+- ext:
+ the gRPC ruby extension
+- lib:
+ the entrypoint grpc ruby library to be used in a 'require' statement
+- spec:
+ Rspec unittest
+- bin:
+ example gRPC clients and servers, e.g,
```ruby
-# client
stub = Math::Math::Stub.new('my.test.math.server.com:8080')
req = Math::DivArgs.new(dividend: 7, divisor: 3)
logger.info("div(7/3): req=#{req.inspect}")