diff options
author | Nicolas Noble <nnoble@google.com> | 2015-01-26 17:03:08 -0800 |
---|---|---|
committer | Nicolas Noble <nnoble@google.com> | 2015-01-26 17:03:08 -0800 |
commit | f66ccc44d7509303a22a5ed602b366d436864afd (patch) | |
tree | 440b5572ae86ebebee04f8878eeb76be802eec4e /src | |
parent | e04455a7ff1ead05445daa95d7bc822a7f40b6f5 (diff) | |
parent | 49724152edc6907fa4b872f9e7580d4b78a06f77 (diff) |
Merge branch 'master' of github.com:google/grpc into json
Diffstat (limited to 'src')
-rw-r--r-- | src/core/channel/call_op_string.c | 96 | ||||
-rw-r--r-- | src/core/channel/connected_channel.c | 1 | ||||
-rw-r--r-- | src/core/httpcli/format_request.c | 82 | ||||
-rw-r--r-- | src/core/support/string.c | 48 | ||||
-rw-r--r-- | src/core/support/string.h | 21 | ||||
-rw-r--r-- | src/core/surface/event_string.c | 80 | ||||
-rw-r--r-- | src/node/client.js | 8 | ||||
-rw-r--r-- | src/node/server.js | 1 | ||||
-rw-r--r-- | src/node/surface_client.js | 23 | ||||
-rw-r--r-- | src/node/surface_server.js | 9 | ||||
-rw-r--r-- | src/node/test/client_server_test.js | 28 | ||||
-rw-r--r-- | src/node/test/interop_sanity_test.js | 3 | ||||
-rw-r--r-- | src/node/test/surface_test.js | 53 | ||||
-rwxr-xr-x | src/ruby/README.md | 27 |
14 files changed, 334 insertions, 146 deletions
diff --git a/src/core/channel/call_op_string.c b/src/core/channel/call_op_string.c index 789913901a..d36d51e789 100644 --- a/src/core/channel/call_op_string.c +++ b/src/core/channel/call_op_string.c @@ -41,110 +41,86 @@ #include <grpc/support/alloc.h> #include <grpc/support/useful.h> -#define MAX_APPEND 1024 +static void put_metadata(gpr_strvec *b, grpc_mdelem *md) { + gpr_strvec_add(b, gpr_strdup(" key=")); + gpr_strvec_add(b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->key->slice), + GPR_SLICE_LENGTH(md->key->slice), GPR_HEXDUMP_PLAINTEXT)); -typedef struct { - size_t cap; - size_t len; - char *buffer; -} buf; - -static void bprintf(buf *b, const char *fmt, ...) { - va_list arg; - if (b->len + MAX_APPEND > b->cap) { - b->cap = GPR_MAX(b->len + MAX_APPEND, b->cap * 3 / 2); - b->buffer = gpr_realloc(b->buffer, b->cap); - } - va_start(arg, fmt); - b->len += vsprintf(b->buffer + b->len, fmt, arg); - va_end(arg); -} - -static void bputs(buf *b, const char *s) { - size_t slen = strlen(s); - if (b->len + slen + 1 > b->cap) { - b->cap = GPR_MAX(b->len + slen + 1, b->cap * 3 / 2); - b->buffer = gpr_realloc(b->buffer, b->cap); - } - strcat(b->buffer, s); - b->len += slen; -} - -static void put_metadata(buf *b, grpc_mdelem *md) { - char *txt; - - txt = gpr_hexdump((char *)GPR_SLICE_START_PTR(md->key->slice), - GPR_SLICE_LENGTH(md->key->slice), GPR_HEXDUMP_PLAINTEXT); - bputs(b, " key="); - bputs(b, txt); - gpr_free(txt); - - txt = gpr_hexdump((char *)GPR_SLICE_START_PTR(md->value->slice), - GPR_SLICE_LENGTH(md->value->slice), GPR_HEXDUMP_PLAINTEXT); - bputs(b, " value="); - bputs(b, txt); - gpr_free(txt); + gpr_strvec_add(b, gpr_strdup(" value=")); + gpr_strvec_add(b, gpr_hexdump((char *)GPR_SLICE_START_PTR(md->value->slice), + GPR_SLICE_LENGTH(md->value->slice), GPR_HEXDUMP_PLAINTEXT)); } char *grpc_call_op_string(grpc_call_op *op) { - buf b = {0, 0, 0}; + char *tmp; + char *out; + + gpr_strvec b; + gpr_strvec_init(&b); switch (op->dir) { case GRPC_CALL_DOWN: - bprintf(&b, ">"); + gpr_strvec_add(&b, gpr_strdup(">")); break; case GRPC_CALL_UP: - bprintf(&b, "<"); + gpr_strvec_add(&b, gpr_strdup("<")); break; } switch (op->type) { case GRPC_SEND_METADATA: - bprintf(&b, "SEND_METADATA"); + gpr_strvec_add(&b, gpr_strdup("SEND_METADATA")); put_metadata(&b, op->data.metadata); break; case GRPC_SEND_DEADLINE: - bprintf(&b, "SEND_DEADLINE %d.%09d", op->data.deadline.tv_sec, + gpr_asprintf(&tmp, "SEND_DEADLINE %d.%09d", op->data.deadline.tv_sec, op->data.deadline.tv_nsec); + gpr_strvec_add(&b, tmp); break; case GRPC_SEND_START: - bprintf(&b, "SEND_START pollset=%p", op->data.start.pollset); + gpr_asprintf(&tmp, "SEND_START pollset=%p", op->data.start.pollset); + gpr_strvec_add(&b, tmp); break; case GRPC_SEND_MESSAGE: - bprintf(&b, "SEND_MESSAGE"); + gpr_strvec_add(&b, gpr_strdup("SEND_MESSAGE")); break; case GRPC_SEND_FINISH: - bprintf(&b, "SEND_FINISH"); + gpr_strvec_add(&b, gpr_strdup("SEND_FINISH")); break; case GRPC_REQUEST_DATA: - bprintf(&b, "REQUEST_DATA"); + gpr_strvec_add(&b, gpr_strdup("REQUEST_DATA")); break; case GRPC_RECV_METADATA: - bprintf(&b, "RECV_METADATA"); + gpr_strvec_add(&b, gpr_strdup("RECV_METADATA")); put_metadata(&b, op->data.metadata); break; case GRPC_RECV_DEADLINE: - bprintf(&b, "RECV_DEADLINE %d.%09d", op->data.deadline.tv_sec, + gpr_asprintf(&tmp, "RECV_DEADLINE %d.%09d", op->data.deadline.tv_sec, op->data.deadline.tv_nsec); + gpr_strvec_add(&b, tmp); break; case GRPC_RECV_END_OF_INITIAL_METADATA: - bprintf(&b, "RECV_END_OF_INITIAL_METADATA"); + gpr_strvec_add(&b, gpr_strdup("RECV_END_OF_INITIAL_METADATA")); break; case GRPC_RECV_MESSAGE: - bprintf(&b, "RECV_MESSAGE"); + gpr_strvec_add(&b, gpr_strdup("RECV_MESSAGE")); break; case GRPC_RECV_HALF_CLOSE: - bprintf(&b, "RECV_HALF_CLOSE"); + gpr_strvec_add(&b, gpr_strdup("RECV_HALF_CLOSE")); break; case GRPC_RECV_FINISH: - bprintf(&b, "RECV_FINISH"); + gpr_strvec_add(&b, gpr_strdup("RECV_FINISH")); break; case GRPC_CANCEL_OP: - bprintf(&b, "CANCEL_OP"); + gpr_strvec_add(&b, gpr_strdup("CANCEL_OP")); break; } - bprintf(&b, " flags=0x%08x", op->flags); + gpr_asprintf(&tmp, " flags=0x%08x", op->flags); + gpr_strvec_add(&b, tmp); + + out = gpr_strvec_flatten(&b, NULL); + gpr_strvec_destroy(&b); - return b.buffer; + return out; } void grpc_call_log_op(char *file, int line, gpr_log_severity severity, diff --git a/src/core/channel/connected_channel.c b/src/core/channel/connected_channel.c index 47c0ed3b88..55486ed5c3 100644 --- a/src/core/channel/connected_channel.c +++ b/src/core/channel/connected_channel.c @@ -450,6 +450,7 @@ static void recv_batch(void *user_data, grpc_transport *transport, (int)calld->incoming_message.length, (int)calld->incoming_message_length); recv_error(chand, calld, __LINE__, message); + gpr_free(message); } call_op.type = GRPC_RECV_HALF_CLOSE; call_op.dir = GRPC_CALL_UP; diff --git a/src/core/httpcli/format_request.c b/src/core/httpcli/format_request.c index 7a44f1266f..58bb7c740e 100644 --- a/src/core/httpcli/format_request.c +++ b/src/core/httpcli/format_request.c @@ -37,67 +37,57 @@ #include <stdio.h> #include <string.h> +#include "src/core/support/string.h" #include <grpc/support/alloc.h> #include <grpc/support/slice.h> #include <grpc/support/useful.h> -typedef struct { - size_t length; - size_t capacity; - char *data; -} sbuf; - -static void sbuf_append(sbuf *buf, const char *bytes, size_t len) { - if (buf->length + len > buf->capacity) { - buf->capacity = GPR_MAX(buf->length + len, buf->capacity * 3 / 2); - buf->data = gpr_realloc(buf->data, buf->capacity); - } - memcpy(buf->data + buf->length, bytes, len); - buf->length += len; -} - -static void sbprintf(sbuf *buf, const char *fmt, ...) { - char temp[GRPC_HTTPCLI_MAX_HEADER_LENGTH]; - size_t len; - va_list args; - - va_start(args, fmt); - len = vsprintf(temp, fmt, args); - va_end(args); - - sbuf_append(buf, temp, len); -} - -static void fill_common_header(const grpc_httpcli_request *request, sbuf *buf) { +static void fill_common_header(const grpc_httpcli_request *request, gpr_strvec *buf) { size_t i; - sbprintf(buf, "%s HTTP/1.0\r\n", request->path); + gpr_strvec_add(buf, gpr_strdup(request->path)); + gpr_strvec_add(buf, gpr_strdup(" HTTP/1.0\r\n")); /* just in case some crazy server really expects HTTP/1.1 */ - sbprintf(buf, "Host: %s\r\n", request->host); - sbprintf(buf, "Connection: close\r\n"); - sbprintf(buf, "User-Agent: %s\r\n", GRPC_HTTPCLI_USER_AGENT); + gpr_strvec_add(buf, gpr_strdup("Host: ")); + gpr_strvec_add(buf, gpr_strdup(request->host)); + gpr_strvec_add(buf, gpr_strdup("\r\n")); + gpr_strvec_add(buf, gpr_strdup("Connection: close\r\n")); + gpr_strvec_add(buf, gpr_strdup("User-Agent: "GRPC_HTTPCLI_USER_AGENT"\r\n")); /* user supplied headers */ for (i = 0; i < request->hdr_count; i++) { - sbprintf(buf, "%s: %s\r\n", request->hdrs[i].key, request->hdrs[i].value); + gpr_strvec_add(buf, gpr_strdup(request->hdrs[i].key)); + gpr_strvec_add(buf, gpr_strdup(": ")); + gpr_strvec_add(buf, gpr_strdup(request->hdrs[i].value)); + gpr_strvec_add(buf, gpr_strdup("\r\n")); } } gpr_slice grpc_httpcli_format_get_request(const grpc_httpcli_request *request) { - sbuf out = {0, 0, NULL}; + gpr_strvec out; + char *flat; + size_t flat_len; - sbprintf(&out, "GET "); + gpr_strvec_init(&out); + gpr_strvec_add(&out, gpr_strdup("GET ")); fill_common_header(request, &out); - sbprintf(&out, "\r\n"); + gpr_strvec_add(&out, gpr_strdup("\r\n")); - return gpr_slice_new(out.data, out.length, gpr_free); + flat = gpr_strvec_flatten(&out, &flat_len); + gpr_strvec_destroy(&out); + + return gpr_slice_new(flat, flat_len, gpr_free); } gpr_slice grpc_httpcli_format_post_request(const grpc_httpcli_request *request, const char *body_bytes, size_t body_size) { - sbuf out = {0, 0, NULL}; + gpr_strvec out; + char *tmp; + size_t out_len; size_t i; - sbprintf(&out, "POST "); + gpr_strvec_init(&out); + + gpr_strvec_add(&out, gpr_strdup("POST ")); fill_common_header(request, &out); if (body_bytes) { gpr_uint8 has_content_type = 0; @@ -108,14 +98,18 @@ gpr_slice grpc_httpcli_format_post_request(const grpc_httpcli_request *request, } } if (!has_content_type) { - sbprintf(&out, "Content-Type: text/plain\r\n"); + gpr_strvec_add(&out, gpr_strdup("Content-Type: text/plain\r\n")); } - sbprintf(&out, "Content-Length: %lu\r\n", (unsigned long)body_size); + gpr_asprintf(&tmp, "Content-Length: %lu\r\n", (unsigned long)body_size); + gpr_strvec_add(&out, tmp); } - sbprintf(&out, "\r\n"); + gpr_strvec_add(&out, gpr_strdup("\r\n")); + tmp = gpr_strvec_flatten(&out, &out_len); if (body_bytes) { - sbuf_append(&out, body_bytes, body_size); + tmp = gpr_realloc(tmp, out_len + body_size); + memcpy(tmp + out_len, body_bytes, body_size); + out_len += body_size; } - return gpr_slice_new(out.data, out.length, gpr_free); + return gpr_slice_new(tmp, out_len, gpr_free); } diff --git a/src/core/support/string.c b/src/core/support/string.c index 9b5cac7596..97bce60f94 100644 --- a/src/core/support/string.c +++ b/src/core/support/string.c @@ -14,7 +14,7 @@ * in the documentation and/or other materials provided with the * distribution. * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from + * contributors may be used to endorse or promote products derived from * this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS @@ -152,3 +152,49 @@ int gpr_ltoa(long value, char *string) { string[i] = 0; return i; } + +char *gpr_strjoin(const char **strs, size_t nstrs, size_t *final_length) { + size_t out_length = 0; + size_t i; + char *out; + for (i = 0; i < nstrs; i++) { + out_length += strlen(strs[i]); + } + out_length += 1; /* null terminator */ + out = gpr_malloc(out_length); + out_length = 0; + for (i = 0; i < nstrs; i++) { + size_t slen = strlen(strs[i]); + memcpy(out + out_length, strs[i], slen); + out_length += slen; + } + out[out_length] = 0; + if (final_length != NULL) { + *final_length = out_length; + } + return out; +} + +void gpr_strvec_init(gpr_strvec *sv) { + memset(sv, 0, sizeof(*sv)); +} + +void gpr_strvec_destroy(gpr_strvec *sv) { + size_t i; + for (i = 0; i < sv->count; i++) { + gpr_free(sv->strs[i]); + } + gpr_free(sv->strs); +} + +void gpr_strvec_add(gpr_strvec *sv, char *str) { + if (sv->count == sv->capacity) { + sv->capacity = GPR_MAX(sv->capacity + 8, sv->capacity * 2); + sv->strs = gpr_realloc(sv->strs, sizeof(char*) * sv->capacity); + } + sv->strs[sv->count++] = str; +} + +char *gpr_strvec_flatten(gpr_strvec *sv, size_t *final_length) { + return gpr_strjoin((const char**)sv->strs, sv->count, final_length); +} diff --git a/src/core/support/string.h b/src/core/support/string.h index 28b7029ecd..64e06d3b6a 100644 --- a/src/core/support/string.h +++ b/src/core/support/string.h @@ -81,6 +81,27 @@ void gpr_reverse_bytes(char *str, int len); the result is undefined. */ int gpr_asprintf(char **strp, const char *format, ...); +/* Join a set of strings, returning the resulting string. + Total combined length (excluding null terminator) is returned in total_length + if it is non-null. */ +char *gpr_strjoin(const char **strs, size_t nstrs, size_t *total_length); + +/* A vector of strings... for building up a final string one piece at a time */ +typedef struct { + char **strs; + size_t count; + size_t capacity; +} gpr_strvec; + +/* Initialize/destroy */ +void gpr_strvec_init(gpr_strvec *strs); +void gpr_strvec_destroy(gpr_strvec *strs); +/* Add a string to a strvec, takes ownership of the string */ +void gpr_strvec_add(gpr_strvec *strs, char *add); +/* Return a joined string with all added substrings, optionally setting + total_length as per gpr_strjoin */ +char *gpr_strvec_flatten(gpr_strvec *strs, size_t *total_length); + #ifdef __cplusplus } #endif diff --git a/src/core/surface/event_string.c b/src/core/surface/event_string.c index e38ef06c9f..8975d312ee 100644 --- a/src/core/surface/event_string.c +++ b/src/core/surface/event_string.c @@ -38,8 +38,10 @@ #include "src/core/support/string.h" #include <grpc/byte_buffer.h> -static size_t addhdr(char *p, grpc_event *ev) { - return sprintf(p, "tag:%p call:%p", ev->tag, (void *)ev->call); +static void addhdr(gpr_strvec *buf, grpc_event *ev) { + char *tmp; + gpr_asprintf(&tmp, "tag:%p call:%p", ev->tag, (void *)ev->call); + gpr_strvec_add(buf, tmp); } static const char *errstr(grpc_op_error err) { @@ -52,72 +54,84 @@ static const char *errstr(grpc_op_error err) { return "UNKNOWN_UNKNOWN"; } -static size_t adderr(char *p, grpc_op_error err) { - return sprintf(p, " err=%s", errstr(err)); +static void adderr(gpr_strvec *buf, grpc_op_error err) { + char *tmp; + gpr_asprintf(&tmp, " err=%s", errstr(err)); + gpr_strvec_add(buf, tmp); } char *grpc_event_string(grpc_event *ev) { - char buffer[1024]; - char *p = buffer; + char *out; + char *tmp; + gpr_strvec buf; if (ev == NULL) return gpr_strdup("null"); + gpr_strvec_init(&buf); + switch (ev->type) { case GRPC_SERVER_SHUTDOWN: - p += sprintf(p, "SERVER_SHUTDOWN"); + gpr_strvec_add(&buf, gpr_strdup("SERVER_SHUTDOWN")); break; case GRPC_QUEUE_SHUTDOWN: - p += sprintf(p, "QUEUE_SHUTDOWN"); + gpr_strvec_add(&buf, gpr_strdup("QUEUE_SHUTDOWN")); break; case GRPC_READ: - p += sprintf(p, "READ: "); - p += addhdr(p, ev); + gpr_strvec_add(&buf, gpr_strdup("READ: ")); + addhdr(&buf, ev); if (ev->data.read) { - p += sprintf(p, " %d bytes", + gpr_asprintf(&tmp, " %d bytes", (int)grpc_byte_buffer_length(ev->data.read)); + gpr_strvec_add(&buf, tmp); } else { - p += sprintf(p, " end-of-stream"); + gpr_strvec_add(&buf, gpr_strdup(" end-of-stream")); } break; case GRPC_INVOKE_ACCEPTED: - p += sprintf(p, "INVOKE_ACCEPTED: "); - p += addhdr(p, ev); - p += adderr(p, ev->data.invoke_accepted); + gpr_strvec_add(&buf, gpr_strdup("INVOKE_ACCEPTED: ")); + addhdr(&buf, ev); + adderr(&buf, ev->data.invoke_accepted); break; case GRPC_WRITE_ACCEPTED: - p += sprintf(p, "WRITE_ACCEPTED: "); - p += addhdr(p, ev); - p += adderr(p, ev->data.write_accepted); + gpr_strvec_add(&buf, gpr_strdup("WRITE_ACCEPTED: ")); + addhdr(&buf, ev); + adderr(&buf, ev->data.write_accepted); break; case GRPC_FINISH_ACCEPTED: - p += sprintf(p, "FINISH_ACCEPTED: "); - p += addhdr(p, ev); - p += adderr(p, ev->data.write_accepted); + gpr_strvec_add(&buf, gpr_strdup("FINISH_ACCEPTED: ")); + addhdr(&buf, ev); + adderr(&buf, ev->data.write_accepted); break; case GRPC_CLIENT_METADATA_READ: - p += sprintf(p, "CLIENT_METADATA_READ: "); - p += addhdr(p, ev); - p += sprintf(p, " %d elements", (int)ev->data.client_metadata_read.count); + gpr_strvec_add(&buf, gpr_strdup("CLIENT_METADATA_READ: ")); + addhdr(&buf, ev); + gpr_asprintf(&tmp, " %d elements", + (int)ev->data.client_metadata_read.count); + gpr_strvec_add(&buf, tmp); break; case GRPC_FINISHED: - p += sprintf(p, "FINISHED: "); - p += addhdr(p, ev); - p += sprintf(p, " status=%d details='%s' %d metadata elements", + gpr_strvec_add(&buf, gpr_strdup("FINISHED: ")); + addhdr(&buf, ev); + gpr_asprintf(&tmp, " status=%d details='%s' %d metadata elements", ev->data.finished.status, ev->data.finished.details, (int)ev->data.finished.metadata_count); + gpr_strvec_add(&buf, tmp); break; case GRPC_SERVER_RPC_NEW: - p += sprintf(p, "SERVER_RPC_NEW: "); - p += addhdr(p, ev); - p += sprintf(p, " method='%s' host='%s' %d metadata elements", + gpr_strvec_add(&buf, gpr_strdup("SERVER_RPC_NEW: ")); + addhdr(&buf, ev); + gpr_asprintf(&tmp, " method='%s' host='%s' %d metadata elements", ev->data.server_rpc_new.method, ev->data.server_rpc_new.host, (int)ev->data.server_rpc_new.metadata_count); + gpr_strvec_add(&buf, tmp); break; case GRPC_COMPLETION_DO_NOT_USE: - p += sprintf(p, "DO_NOT_USE (this is a bug)"); - p += addhdr(p, ev); + gpr_strvec_add(&buf, gpr_strdup("DO_NOT_USE (this is a bug)")); + addhdr(&buf, ev); break; } - return gpr_strdup(buffer); + out = gpr_strvec_flatten(&buf, NULL); + gpr_strvec_destroy(&buf); + return out; } diff --git a/src/node/client.js b/src/node/client.js index 7007852b93..3a1c9eef84 100644 --- a/src/node/client.js +++ b/src/node/client.js @@ -161,6 +161,14 @@ GrpcClientStream.prototype._write = function(chunk, encoding, callback) { }; /** + * Cancel the ongoing call. If the call has not already finished, it will finish + * with status CANCELLED. + */ +GrpcClientStream.prototype.cancel = function() { + this._call.cancel(); +}; + +/** * Make a request on the channel to the given method with the given arguments * @param {grpc.Channel} channel The channel on which to make the request * @param {string} method The method to request diff --git a/src/node/server.js b/src/node/server.js index fe50acb5a1..03cdbe6f98 100644 --- a/src/node/server.js +++ b/src/node/server.js @@ -246,6 +246,7 @@ function Server(options) { call.serverAccept(function(event) { if (event.data.code === grpc.status.CANCELLED) { cancelled = true; + stream.emit('cancelled'); } }, 0); call.serverEndInitialMetadata(0); diff --git a/src/node/surface_client.js b/src/node/surface_client.js index b63ae13e8d..16c31809f4 100644 --- a/src/node/surface_client.js +++ b/src/node/surface_client.js @@ -129,6 +129,16 @@ function _write(chunk, encoding, callback) { ClientWritableObjectStream.prototype._write = _write; /** + * Cancel the underlying call + */ +function cancel() { + this._stream.cancel(); +} + +ClientReadableObjectStream.prototype.cancel = cancel; +ClientWritableObjectStream.prototype.cancel = cancel; + +/** * Get a function that can make unary requests to the specified method. * @param {string} method The name of the method to request * @param {function(*):Buffer} serialize The serialization function for inputs @@ -155,6 +165,9 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { var stream = client.makeRequest(this.channel, method, serialize, deserialize, metadata, deadline); var emitter = new EventEmitter(); + emitter.cancel = function cancel() { + stream.cancel(); + }; forwardEvent(stream, emitter, 'status'); forwardEvent(stream, emitter, 'metadata'); stream.write(argument); @@ -166,6 +179,11 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { callback(e); } }); + stream.on('status', function forwardStatus(status) { + if (status.code !== client.status.OK) { + callback(status); + } + }); return emitter; } return makeUnaryRequest; @@ -203,6 +221,11 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { callback(e); } }); + stream.on('status', function forwardStatus(status) { + if (status.code !== client.status.OK) { + callback(status); + } + }); return obj_stream; } return makeClientStreamRequest; diff --git a/src/node/surface_server.js b/src/node/surface_server.js index 07c5339f62..af23ec211c 100644 --- a/src/node/surface_server.js +++ b/src/node/surface_server.js @@ -63,6 +63,9 @@ function ServerReadableObjectStream(stream) { get: function() { return stream.cancelled; } }); var self = this; + this._stream.on('cancelled', function() { + self.emit('cancelled'); + }); this._stream.on('data', function forwardData(chunk) { if (!self.push(chunk)) { self._stream.pause(); @@ -100,6 +103,9 @@ function ServerWritableObjectStream(stream) { var options = {objectMode: true}; Writable.call(this, options); this._stream = stream; + this._stream.on('cancelled', function() { + self.emit('cancelled'); + }); this.on('finish', function() { this._stream.end(); }); @@ -138,6 +144,9 @@ function makeUnaryHandler(handler) { Object.defineProperty(call, 'cancelled', { get: function() { return stream.cancelled;} }); + stream.on('cancelled', function() { + call.emit('cancelled'); + }); handler(call, function sendUnaryData(err, value) { if (err) { stream.emit('error', err); diff --git a/src/node/test/client_server_test.js b/src/node/test/client_server_test.js index 2a25908684..99438a1659 100644 --- a/src/node/test/client_server_test.js +++ b/src/node/test/client_server_test.js @@ -77,6 +77,14 @@ function errorHandler(stream) { }; } +/** + * Wait for a cancellation instead of responding + * @param {Stream} stream + */ +function cancelHandler(stream) { + // do nothing +} + describe('echo client', function() { it('should receive echo responses', function(done) { var server = new Server(); @@ -125,6 +133,26 @@ describe('echo client', function() { done(); }); }); + it('should be able to cancel a call', function(done) { + var server = new Server(); + var port_num = server.bind('0.0.0.0:0'); + server.register('cancellation', cancelHandler); + server.start(); + + var channel = new grpc.Channel('localhost:' + port_num); + var stream = client.makeRequest( + channel, + 'cancellation', + null, + getDeadline(1)); + + stream.cancel(); + stream.on('status', function(status) { + assert.equal(status.code, grpc.status.CANCELLED); + server.shutdown(); + done(); + }); + }); }); /* TODO(mlumish): explore options for reducing duplication between this test * and the insecure echo client test */ diff --git a/src/node/test/interop_sanity_test.js b/src/node/test/interop_sanity_test.js index 8ea48c359f..6cc7d444cd 100644 --- a/src/node/test/interop_sanity_test.js +++ b/src/node/test/interop_sanity_test.js @@ -48,6 +48,9 @@ describe('Interop tests', function() { port = 'localhost:' + server_obj.port; done(); }); + after(function() { + server.shutdown(); + }); // This depends on not using a binary stream it('should pass empty_unary', function(done) { interop_client.runTest(port, name_override, 'empty_unary', true, done); diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js index 34f1a156eb..16e4869d83 100644 --- a/src/node/test/surface_test.js +++ b/src/node/test/surface_test.js @@ -35,6 +35,8 @@ var assert = require('assert'); var surface_server = require('../surface_server.js'); +var surface_client = require('../surface_client.js'); + var ProtoBuf = require('protobufjs'); var grpc = require('..'); @@ -73,3 +75,54 @@ describe('Surface server constructor', function() { }, /math.Math/); }); }); +describe('Surface client', function() { + var client; + var server; + before(function() { + var Server = grpc.buildServer([mathService]); + server = new Server({ + 'math.Math': { + 'div': function(stream) {}, + 'divMany': function(stream) {}, + 'fib': function(stream) {}, + 'sum': function(stream) {} + } + }); + var port = server.bind('localhost:0'); + var Client = surface_client.makeClientConstructor(mathService); + client = new Client('localhost:' + port); + }); + after(function() { + server.shutdown(); + }); + it('Should correctly cancel a unary call', function(done) { + var call = client.div({'divisor': 0, 'dividend': 0}, function(err, resp) { + assert.strictEqual(err.code, surface_client.status.CANCELLED); + done(); + }); + call.cancel(); + }); + it('Should correctly cancel a client stream call', function(done) { + var call = client.sum(function(err, resp) { + assert.strictEqual(err.code, surface_client.status.CANCELLED); + done(); + }); + call.cancel(); + }); + it('Should correctly cancel a server stream call', function(done) { + var call = client.fib({'limit': 5}); + call.on('status', function(status) { + assert.strictEqual(status.code, surface_client.status.CANCELLED); + done(); + }); + call.cancel(); + }); + it('Should correctly cancel a bidi stream call', function(done) { + var call = client.divMany(); + call.on('status', function(status) { + assert.strictEqual(status.code, surface_client.status.CANCELLED); + done(); + }); + call.cancel(); + }); +}); diff --git a/src/ruby/README.md b/src/ruby/README.md index 7f7558dc67..7ece7e2706 100755 --- a/src/ruby/README.md +++ b/src/ruby/README.md @@ -14,9 +14,10 @@ INSTALLING ---------- - Install the gRPC core library -TODO: describe this, once the core distribution mechanism is defined. - + TODO: describe this, once the core distribution mechanism is defined. +``` $ gem install grpc +``` Installing from source @@ -24,37 +25,47 @@ Installing from source - Build or Install the gRPC core E.g, from the root of the grpc [git repo](https://github.com/google/grpc) +``` $ cd ../.. $ make && sudo make install +``` - Install Ruby 2.x. Consider doing this with [RVM](http://rvm.io), it's a nice way of controlling the exact ruby version that's used. +``` $ command curl -sSL https://rvm.io/mpapis.asc | gpg --import - $ \curl -sSL https://get.rvm.io | bash -s stable --ruby $ $ # follow the instructions to ensure that your're using the latest stable version of Ruby $ # and that the rvm command is installed +``` - Install [bundler](http://bundler.io/) +``` $ gem install bundler +``` - Finally, install grpc ruby locally. +``` $ cd <install_dir> $ bundle install $ rake # compiles the extension, runs the unit tests, see rake -T for other options - +``` CONTENTS -------- Directory structure is the layout for [ruby extensions](http://guides.rubygems.org/gems-with-extensions/) - * ext: the extension code - * lib: the entrypoint grpc ruby library to be used in a 'require' statement - * spec: tests - * bin: example gRPC clients and servers, e.g, +- ext: + the gRPC ruby extension +- lib: + the entrypoint grpc ruby library to be used in a 'require' statement +- spec: + Rspec unittest +- bin: + example gRPC clients and servers, e.g, ```ruby -# client stub = Math::Math::Stub.new('my.test.math.server.com:8080') req = Math::DivArgs.new(dividend: 7, divisor: 3) logger.info("div(7/3): req=#{req.inspect}") |