diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/compiler/csharp_generator.cc | 7 | ||||
-rw-r--r-- | src/compiler/csharp_generator.h | 3 | ||||
-rw-r--r-- | src/node/ext/call.cc | 23 | ||||
-rw-r--r-- | src/node/index.js | 15 | ||||
-rw-r--r-- | src/node/interop/interop_client.js | 8 | ||||
-rw-r--r-- | src/node/src/client.js | 61 | ||||
-rw-r--r-- | src/node/src/metadata.js | 181 | ||||
-rw-r--r-- | src/node/src/server.js | 65 | ||||
-rw-r--r-- | src/node/test/metadata_test.js | 193 | ||||
-rw-r--r-- | src/node/test/surface_test.js | 62 | ||||
-rw-r--r-- | src/python/grpcio/grpc/_links/service.py | 18 | ||||
-rw-r--r-- | src/python/grpcio/grpc/framework/core/_end.py | 6 | ||||
-rw-r--r-- | src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py | 2 | ||||
-rw-r--r-- | src/python/grpcio_test/grpc_test/_crust_over_core_over_links_face_interface_test.py | 2 | ||||
-rw-r--r-- | src/python/grpcio_test/grpc_test/_links/_transmission_test.py | 6 |
15 files changed, 533 insertions, 119 deletions
diff --git a/src/compiler/csharp_generator.cc b/src/compiler/csharp_generator.cc index 51d8d982e2..7b497df7f4 100644 --- a/src/compiler/csharp_generator.cc +++ b/src/compiler/csharp_generator.cc @@ -33,6 +33,7 @@ #include <cctype> #include <map> +#include <sstream> #include <vector> #include "src/compiler/csharp_generator.h" @@ -44,7 +45,6 @@ using google::protobuf::compiler::csharp::GetFileNamespace; using google::protobuf::compiler::csharp::GetClassName; using google::protobuf::compiler::csharp::GetUmbrellaClassName; -using google::protobuf::SimpleItoa; using grpc::protobuf::FileDescriptor; using grpc::protobuf::Descriptor; using grpc::protobuf::ServiceDescriptor; @@ -228,11 +228,14 @@ void GenerateStaticMethodField(Printer* out, const MethodDescriptor *method) { } void GenerateServiceDescriptorProperty(Printer* out, const ServiceDescriptor *service) { + std::ostringstream index; + index << service->index(); out->Print("// service descriptor\n"); out->Print("public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor\n"); out->Print("{\n"); out->Print(" get { return $umbrella$.Descriptor.Services[$index$]; }\n", - "umbrella", GetUmbrellaClassName(service->file()), "index", SimpleItoa(service->index())); + "umbrella", GetUmbrellaClassName(service->file()), "index", + index.str()); out->Print("}\n"); out->Print("\n"); } diff --git a/src/compiler/csharp_generator.h b/src/compiler/csharp_generator.h index 67e3ee30b5..90eb7e2984 100644 --- a/src/compiler/csharp_generator.h +++ b/src/compiler/csharp_generator.h @@ -36,10 +36,7 @@ #include "src/compiler/config.h" -using namespace std; - #include <google/protobuf/compiler/csharp/csharp_names.h> -#include <google/protobuf/stubs/strutil.h> namespace grpc_csharp_generator { diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc index 18858fa334..fddc1e214f 100644 --- a/src/node/ext/call.cc +++ b/src/node/ext/call.cc @@ -111,17 +111,19 @@ bool CreateMetadataArray(Handle<Object> metadata, grpc_metadata_array *array, NanAssignPersistent(*handle, value); resources->handles.push_back(unique_ptr<PersistentHolder>( new PersistentHolder(handle))); - continue; + } else { + return false; } - } - if (value->IsString()) { - Handle<String> string_value = value->ToString(); - NanUtf8String *utf8_value = new NanUtf8String(string_value); - resources->strings.push_back(unique_ptr<NanUtf8String>(utf8_value)); - current->value = **utf8_value; - current->value_length = string_value->Length(); } else { - return false; + if (value->IsString()) { + Handle<String> string_value = value->ToString(); + NanUtf8String *utf8_value = new NanUtf8String(string_value); + resources->strings.push_back(unique_ptr<NanUtf8String>(utf8_value)); + current->value = **utf8_value; + current->value_length = string_value->Length(); + } else { + return false; + } } array->count += 1; } @@ -156,8 +158,7 @@ Handle<Value> ParseMetadata(const grpc_metadata_array *metadata_array) { } if (EndsWith(elem->key, "-bin")) { array->Set(index_map[elem->key], - MakeFastBuffer( - NanNewBufferHandle(elem->value, elem->value_length))); + NanNewBufferHandle(elem->value, elem->value_length)); } else { array->Set(index_map[elem->key], NanNew(elem->value)); } diff --git a/src/node/index.js b/src/node/index.js index 889b0ac0e9..51d3fa590c 100644 --- a/src/node/index.js +++ b/src/node/index.js @@ -41,6 +41,8 @@ var client = require('./src/client.js'); var server = require('./src/server.js'); +var Metadata = require('./src/metadata.js'); + var grpc = require('bindings')('grpc'); /** @@ -107,18 +109,12 @@ exports.getGoogleAuthDelegate = function getGoogleAuthDelegate(credential) { * @param {function(Error, Object)} callback */ return function updateMetadata(authURI, metadata, callback) { - metadata = _.clone(metadata); - if (metadata.Authorization) { - metadata.Authorization = _.clone(metadata.Authorization); - } else { - metadata.Authorization = []; - } credential.getRequestMetadata(authURI, function(err, header) { if (err) { callback(err); return; } - metadata.Authorization.push(header.Authorization); + metadata.add('authorization', header.Authorization); callback(null, metadata); }); }; @@ -130,6 +126,11 @@ exports.getGoogleAuthDelegate = function getGoogleAuthDelegate(credential) { exports.Server = server.Server; /** + * @see module:src/metadata + */ +exports.Metadata = Metadata; + +/** * Status name to code number mapping */ exports.status = grpc.status; diff --git a/src/node/interop/interop_client.js b/src/node/interop/interop_client.js index 612dcf01f6..8fb8d66920 100644 --- a/src/node/interop/interop_client.js +++ b/src/node/interop/interop_client.js @@ -321,13 +321,7 @@ function oauth2Test(expected_user, scope, per_rpc, client, done) { credential.getAccessToken(function(err, token) { assert.ifError(err); var updateMetadata = function(authURI, metadata, callback) { - metadata = _.clone(metadata); - if (metadata.Authorization) { - metadata.Authorization = _.clone(metadata.Authorization); - } else { - metadata.Authorization = []; - } - metadata.Authorization.push('Bearer ' + token); + metadata.Add('authorization', 'Bearer ' + token); callback(null, metadata); }; var makeTestCall = function(error, client_metadata) { diff --git a/src/node/src/client.js b/src/node/src/client.js index 7b7eae51d2..e1bed3512e 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -42,7 +42,9 @@ var _ = require('lodash'); var grpc = require('bindings')('grpc.node'); -var common = require('./common.js'); +var common = require('./common'); + +var Metadata = require('./metadata'); var EventEmitter = require('events').EventEmitter; @@ -254,8 +256,7 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { * serialize * @param {function(?Error, value=)} callback The callback to for when the * response is received - * @param {array=} metadata Array of metadata key/value pairs to add to the - * call + * @param {Metadata=} metadata Metadata to add to the call * @param {Object=} options Options map * @return {EventEmitter} An event emitter for stream related events */ @@ -264,7 +265,9 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { var emitter = new EventEmitter(); var call = getCall(this.channel, method, options); if (metadata === null || metadata === undefined) { - metadata = {}; + metadata = new Metadata(); + } else { + metadata = metadata.clone(); } emitter.cancel = function cancel() { call.cancel(); @@ -283,13 +286,16 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { if (options) { message.grpcWriteFlags = options.flags; } - client_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; + client_batch[grpc.opType.SEND_INITIAL_METADATA] = + metadata._getCoreRepresentation(); client_batch[grpc.opType.SEND_MESSAGE] = message; client_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; client_batch[grpc.opType.RECV_INITIAL_METADATA] = true; client_batch[grpc.opType.RECV_MESSAGE] = true; client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; call.startBatch(client_batch, function(err, response) { + response.status.metadata = Metadata._fromCoreRepresentation( + response.status.metadata); emitter.emit('status', response.status); if (response.status.code !== grpc.status.OK) { var error = new Error(response.status.details); @@ -304,7 +310,8 @@ function makeUnaryRequestFunction(method, serialize, deserialize) { return; } } - emitter.emit('metadata', response.metadata); + emitter.emit('metadata', Metadata._fromCoreRepresentation( + response.metadata)); callback(null, deserialize(response.read)); }); }); @@ -328,7 +335,7 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { * @this {Client} Client object. Must have a channel member. * @param {function(?Error, value=)} callback The callback to for when the * response is received - * @param {array=} metadata Array of metadata key/value pairs to add to the + * @param {Metadata=} metadata Array of metadata key/value pairs to add to the * call * @param {Object=} options Options map * @return {EventEmitter} An event emitter for stream related events @@ -337,7 +344,9 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { /* jshint validthis: true */ var call = getCall(this.channel, method, options); if (metadata === null || metadata === undefined) { - metadata = {}; + metadata = new Metadata(); + } else { + metadata = metadata.clone(); } var stream = new ClientWritableStream(call, serialize); this.updateMetadata(this.auth_uri, metadata, function(error, metadata) { @@ -347,7 +356,8 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { return; } var metadata_batch = {}; - metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; + metadata_batch[grpc.opType.SEND_INITIAL_METADATA] = + metadata._getCoreRepresentation(); metadata_batch[grpc.opType.RECV_INITIAL_METADATA] = true; call.startBatch(metadata_batch, function(err, response) { if (err) { @@ -355,12 +365,15 @@ function makeClientStreamRequestFunction(method, serialize, deserialize) { // in the other batch. return; } - stream.emit('metadata', response.metadata); + stream.emit('metadata', Metadata._fromCoreRepresentation( + response.metadata)); }); var client_batch = {}; client_batch[grpc.opType.RECV_MESSAGE] = true; client_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; call.startBatch(client_batch, function(err, response) { + response.status.metadata = Metadata._fromCoreRepresentation( + response.status.metadata); stream.emit('status', response.status); if (response.status.code !== grpc.status.OK) { var error = new Error(response.status.details); @@ -398,7 +411,7 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) { * @this {SurfaceClient} Client object. Must have a channel member. * @param {*} argument The argument to the call. Should be serializable with * serialize - * @param {array=} metadata Array of metadata key/value pairs to add to the + * @param {Metadata=} metadata Array of metadata key/value pairs to add to the * call * @param {Object} options Options map * @return {EventEmitter} An event emitter for stream related events @@ -407,7 +420,9 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) { /* jshint validthis: true */ var call = getCall(this.channel, method, options); if (metadata === null || metadata === undefined) { - metadata = {}; + metadata = new Metadata(); + } else { + metadata = metadata.clone(); } var stream = new ClientReadableStream(call, deserialize); this.updateMetadata(this.auth_uri, metadata, function(error, metadata) { @@ -421,7 +436,8 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) { if (options) { message.grpcWriteFlags = options.flags; } - start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; + start_batch[grpc.opType.SEND_INITIAL_METADATA] = + metadata._getCoreRepresentation(); start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; start_batch[grpc.opType.SEND_MESSAGE] = message; start_batch[grpc.opType.SEND_CLOSE_FROM_CLIENT] = true; @@ -431,11 +447,14 @@ function makeServerStreamRequestFunction(method, serialize, deserialize) { // in the other batch. return; } - stream.emit('metadata', response.metadata); + stream.emit('metadata', Metadata._fromCoreRepresentation( + response.metadata)); }); var status_batch = {}; status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; call.startBatch(status_batch, function(err, response) { + response.status.metadata = Metadata._fromCoreRepresentation( + response.status.metadata); stream.emit('status', response.status); if (response.status.code !== grpc.status.OK) { var error = new Error(response.status.details); @@ -470,7 +489,7 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) { /** * Make a bidirectional stream request with this method on the given channel. * @this {SurfaceClient} Client object. Must have a channel member. - * @param {array=} metadata Array of metadata key/value pairs to add to the + * @param {Metadata=} metadata Array of metadata key/value pairs to add to the * call * @param {Options} options Options map * @return {EventEmitter} An event emitter for stream related events @@ -479,7 +498,9 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) { /* jshint validthis: true */ var call = getCall(this.channel, method, options); if (metadata === null || metadata === undefined) { - metadata = {}; + metadata = new Metadata(); + } else { + metadata = metadata.clone(); } var stream = new ClientDuplexStream(call, serialize, deserialize); this.updateMetadata(this.auth_uri, metadata, function(error, metadata) { @@ -489,7 +510,8 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) { return; } var start_batch = {}; - start_batch[grpc.opType.SEND_INITIAL_METADATA] = metadata; + start_batch[grpc.opType.SEND_INITIAL_METADATA] = + metadata._getCoreRepresentation(); start_batch[grpc.opType.RECV_INITIAL_METADATA] = true; call.startBatch(start_batch, function(err, response) { if (err) { @@ -497,11 +519,14 @@ function makeBidiStreamRequestFunction(method, serialize, deserialize) { // in the other batch. return; } - stream.emit('metadata', response.metadata); + stream.emit('metadata', Metadata._fromCoreRepresentation( + response.metadata)); }); var status_batch = {}; status_batch[grpc.opType.RECV_STATUS_ON_CLIENT] = true; call.startBatch(status_batch, function(err, response) { + response.status.metadata = Metadata._fromCoreRepresentation( + response.status.metadata); stream.emit('status', response.status); if (response.status.code !== grpc.status.OK) { var error = new Error(response.status.details); diff --git a/src/node/src/metadata.js b/src/node/src/metadata.js new file mode 100644 index 0000000000..c1da70b197 --- /dev/null +++ b/src/node/src/metadata.js @@ -0,0 +1,181 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +/** + * Metadata module + * @module + */ + +'use strict'; + +var _ = require('lodash'); + +/** + * Class for storing metadata. Keys are normalized to lowercase ASCII. + * @constructor + */ +function Metadata() { + this._internal_repr = {}; +} + +function normalizeKey(key) { + if (!(/^[A-Za-z\d_-]+$/.test(key))) { + throw new Error('Metadata keys must be nonempty strings containing only ' + + 'alphanumeric characters and hyphens'); + } + return key.toLowerCase(); +} + +function validate(key, value) { + if (_.endsWith(key, '-bin')) { + if (!(value instanceof Buffer)) { + throw new Error('keys that end with \'-bin\' must have Buffer values'); + } + } else { + if (!_.isString(value)) { + throw new Error( + 'keys that don\'t end with \'-bin\' must have String values'); + } + if (!(/^[\x20-\x7E]*$/.test(value))) { + throw new Error('Metadata string values can only contain printable ' + + 'ASCII characters and space'); + } + } +} + +/** + * Sets the given value for the given key, replacing any other values associated + * with that key. Normalizes the key. + * @param {String} key The key to set + * @param {String|Buffer} value The value to set. Must be a buffer if and only + * if the normalized key ends with '-bin' + */ +Metadata.prototype.set = function(key, value) { + key = normalizeKey(key); + validate(key, value); + this._internal_repr[key] = [value]; +}; + +/** + * Adds the given value for the given key. Normalizes the key. + * @param {String} key The key to add to. + * @param {String|Buffer} value The value to add. Must be a buffer if and only + * if the normalized key ends with '-bin' + */ +Metadata.prototype.add = function(key, value) { + key = normalizeKey(key); + validate(key, value); + if (!this._internal_repr[key]) { + this._internal_repr[key] = []; + } + this._internal_repr[key].push(value); +}; + +/** + * Remove the given key and any associated values. Normalizes the key. + * @param {String} key The key to remove + */ +Metadata.prototype.remove = function(key) { + key = normalizeKey(key); + if (Object.prototype.hasOwnProperty.call(this._internal_repr, key)) { + delete this._internal_repr[key]; + } +}; + +/** + * Gets a list of all values associated with the key. Normalizes the key. + * @param {String} key The key to get + * @return {Array.<String|Buffer>} The values associated with that key + */ +Metadata.prototype.get = function(key) { + key = normalizeKey(key); + if (Object.prototype.hasOwnProperty.call(this._internal_repr, key)) { + return this._internal_repr[key]; + } else { + return []; + } +}; + +/** + * Get a map of each key to a single associated value. This reflects the most + * common way that people will want to see metadata. + * @return {Object.<String,String|Buffer>} A key/value mapping of the metadata + */ +Metadata.prototype.getMap = function() { + var result = {}; + _.forOwn(this._internal_repr, function(values, key) { + if(values.length > 0) { + result[key] = values[0]; + } + }); + return result; +}; + +/** + * Clone the metadata object. + * @return {Metadata} The new cloned object + */ +Metadata.prototype.clone = function() { + var copy = new Metadata(); + _.forOwn(this._internal_repr, function(value, key) { + copy._internal_repr[key] = _.clone(value); + }); + return copy; +}; + +/** + * Gets the metadata in the format used by interal code. Intended for internal + * use only. API stability is not guaranteed. + * @private + * @return {Object.<String, Array.<String|Buffer>>} The metadata + */ +Metadata.prototype._getCoreRepresentation = function() { + return this._internal_repr; +}; + +/** + * Creates a Metadata object from a metadata map in the internal format. + * Intended for internal use only. API stability is not guaranteed. + * @private + * @param {Object.<String, Array.<String|Buffer>>} The metadata + * @return {Metadata} The new Metadata object + */ +Metadata._fromCoreRepresentation = function(metadata) { + var newMetadata = new Metadata(); + if (metadata) { + newMetadata._internal_repr = _.cloneDeep(metadata); + } + return newMetadata; +}; + +module.exports = Metadata; diff --git a/src/node/src/server.js b/src/node/src/server.js index 137f60ed12..b6f162adf8 100644 --- a/src/node/src/server.js +++ b/src/node/src/server.js @@ -44,6 +44,8 @@ var grpc = require('bindings')('grpc.node'); var common = require('./common'); +var Metadata = require('./metadata'); + var stream = require('stream'); var Readable = stream.Readable; @@ -60,10 +62,10 @@ var EventEmitter = require('events').EventEmitter; * @param {Object} error The error object */ function handleError(call, error) { + var statusMetadata = new Metadata(); var status = { code: grpc.status.UNKNOWN, - details: 'Unknown Error', - metadata: {} + details: 'Unknown Error' }; if (error.hasOwnProperty('message')) { status.details = error.message; @@ -75,11 +77,13 @@ function handleError(call, error) { } } if (error.hasOwnProperty('metadata')) { - status.metadata = error.metadata; + statusMetadata = error.metadata; } + status.metadata = statusMetadata._getCoreRepresentation(); var error_batch = {}; if (!call.metadataSent) { - error_batch[grpc.opType.SEND_INITIAL_METADATA] = {}; + error_batch[grpc.opType.SEND_INITIAL_METADATA] = + (new Metadata())._getCoreRepresentation(); } error_batch[grpc.opType.SEND_STATUS_FROM_SERVER] = status; call.startBatch(error_batch, function(){}); @@ -114,22 +118,24 @@ function waitForCancel(call, emitter) { * @param {*} value The value to respond with * @param {function(*):Buffer=} serialize Serialization function for the * response - * @param {Object=} metadata Optional trailing metadata to send with status + * @param {Metadata=} metadata Optional trailing metadata to send with status * @param {number=} flags Flags for modifying how the message is sent. * Defaults to 0. */ function sendUnaryResponse(call, value, serialize, metadata, flags) { var end_batch = {}; + var statusMetadata = new Metadata(); var status = { code: grpc.status.OK, - details: 'OK', - metadata: {} + details: 'OK' }; if (metadata) { - status.metadata = metadata; + statusMetadata = metadata; } + status.metadata = statusMetadata._getCoreRepresentation(); if (!call.metadataSent) { - end_batch[grpc.opType.SEND_INITIAL_METADATA] = {}; + end_batch[grpc.opType.SEND_INITIAL_METADATA] = + (new Metadata())._getCoreRepresentation(); call.metadataSent = true; } var message = serialize(value); @@ -151,14 +157,19 @@ function setUpWritable(stream, serialize) { stream.status = { code : grpc.status.OK, details : 'OK', - metadata : {} + metadata : new Metadata() }; stream.serialize = common.wrapIgnoreNull(serialize); function sendStatus() { var batch = {}; if (!stream.call.metadataSent) { stream.call.metadataSent = true; - batch[grpc.opType.SEND_INITIAL_METADATA] = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = + (new Metadata())._getCoreRepresentation(); + } + + if (stream.status.metadata) { + stream.status.metadata = stream.status.metadata._getCoreRepresentation(); } batch[grpc.opType.SEND_STATUS_FROM_SERVER] = stream.status; stream.call.startBatch(batch, function(){}); @@ -173,7 +184,7 @@ function setUpWritable(stream, serialize) { function setStatus(err) { var code = grpc.status.UNKNOWN; var details = 'Unknown Error'; - var metadata = {}; + var metadata = new Metadata(); if (err.hasOwnProperty('message')) { details = err.message; } @@ -203,7 +214,7 @@ function setUpWritable(stream, serialize) { /** * Override of Writable#end method that allows for sending metadata with a * success status. - * @param {Object=} metadata Metadata to send with the status + * @param {Metadata=} metadata Metadata to send with the status */ stream.end = function(metadata) { if (metadata) { @@ -266,7 +277,8 @@ function _write(chunk, encoding, callback) { /* jshint validthis: true */ var batch = {}; if (!this.call.metadataSent) { - batch[grpc.opType.SEND_INITIAL_METADATA] = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = + (new Metadata())._getCoreRepresentation(); this.call.metadataSent = true; } var message = this.serialize(chunk); @@ -289,15 +301,15 @@ ServerWritableStream.prototype._write = _write; /** * Send the initial metadata for a writable stream. - * @param {Object<String, Array<(String|Buffer)>>} responseMetadata Metadata - * to send + * @param {Metadata} responseMetadata Metadata to send */ function sendMetadata(responseMetadata) { /* jshint validthis: true */ if (!this.call.metadataSent) { this.call.metadataSent = true; var batch = []; - batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata; + batch[grpc.opType.SEND_INITIAL_METADATA] = + responseMetadata._getCoreRepresentation(); this.call.startBatch(batch, function(err) { if (err) { this.emit('error', err); @@ -422,7 +434,7 @@ ServerDuplexStream.prototype.getPeer = getPeer; * @access private * @param {grpc.Call} call The call to handle * @param {Object} handler Request handler object for the method that was called - * @param {Object} metadata Metadata from the client + * @param {Metadata} metadata Metadata from the client */ function handleUnary(call, handler, metadata) { var emitter = new EventEmitter(); @@ -430,7 +442,8 @@ function handleUnary(call, handler, metadata) { if (!call.metadataSent) { call.metadataSent = true; var batch = {}; - batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata; + batch[grpc.opType.SEND_INITIAL_METADATA] = + responseMetadata._getCoreRepresentation(); call.startBatch(batch, function() {}); } }; @@ -478,7 +491,7 @@ function handleUnary(call, handler, metadata) { * @access private * @param {grpc.Call} call The call to handle * @param {Object} handler Request handler object for the method that was called - * @param {Object} metadata Metadata from the client + * @param {Metadata} metadata Metadata from the client */ function handleServerStreaming(call, handler, metadata) { var stream = new ServerWritableStream(call, handler.serialize); @@ -507,7 +520,7 @@ function handleServerStreaming(call, handler, metadata) { * @access private * @param {grpc.Call} call The call to handle * @param {Object} handler Request handler object for the method that was called - * @param {Object} metadata Metadata from the client + * @param {Metadata} metadata Metadata from the client */ function handleClientStreaming(call, handler, metadata) { var stream = new ServerReadableStream(call, handler.deserialize); @@ -515,7 +528,8 @@ function handleClientStreaming(call, handler, metadata) { if (!call.metadataSent) { call.metadataSent = true; var batch = {}; - batch[grpc.opType.SEND_INITIAL_METADATA] = responseMetadata; + batch[grpc.opType.SEND_INITIAL_METADATA] = + responseMetadata._getCoreRepresentation(); call.startBatch(batch, function() {}); } }; @@ -542,7 +556,7 @@ function handleClientStreaming(call, handler, metadata) { * @access private * @param {grpc.Call} call The call to handle * @param {Object} handler Request handler object for the method that was called - * @param {Object} metadata Metadata from the client + * @param {Metadata} metadata Metadata from the client */ function handleBidiStreaming(call, handler, metadata) { var stream = new ServerDuplexStream(call, handler.serialize, @@ -599,7 +613,7 @@ function Server(options) { var details = event.new_call; var call = details.call; var method = details.method; - var metadata = details.metadata; + var metadata = Metadata._fromCoreRepresentation(details.metadata); if (method === null) { return; } @@ -609,7 +623,8 @@ function Server(options) { handler = handlers[method]; } else { var batch = {}; - batch[grpc.opType.SEND_INITIAL_METADATA] = {}; + batch[grpc.opType.SEND_INITIAL_METADATA] = + (new Metadata())._getCoreRepresentation(); batch[grpc.opType.SEND_STATUS_FROM_SERVER] = { code: grpc.status.UNIMPLEMENTED, details: 'This method is not available on this server.', diff --git a/src/node/test/metadata_test.js b/src/node/test/metadata_test.js new file mode 100644 index 0000000000..86383f1bad --- /dev/null +++ b/src/node/test/metadata_test.js @@ -0,0 +1,193 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +'use strict'; + +var Metadata = require('../src/metadata.js'); + +var assert = require('assert'); + +describe('Metadata', function() { + var metadata; + beforeEach(function() { + metadata = new Metadata(); + }); + describe('#set', function() { + it('Only accepts string values for non "-bin" keys', function() { + assert.throws(function() { + metadata.set('key', new Buffer('value')); + }); + assert.doesNotThrow(function() { + metadata.set('key', 'value'); + }); + }); + it('Only accepts Buffer values for "-bin" keys', function() { + assert.throws(function() { + metadata.set('key-bin', 'value'); + }); + assert.doesNotThrow(function() { + metadata.set('key-bin', new Buffer('value')); + }); + }); + it('Rejects invalid keys', function() { + assert.throws(function() { + metadata.set('key$', 'value'); + }); + assert.throws(function() { + metadata.set('', 'value'); + }); + }); + it('Rejects values with non-ASCII characters', function() { + assert.throws(function() { + metadata.set('key', 'résumé'); + }); + }); + it('Saves values that can be retrieved', function() { + metadata.set('key', 'value'); + assert.deepEqual(metadata.get('key'), ['value']); + }); + it('Overwrites previous values', function() { + metadata.set('key', 'value1'); + metadata.set('key', 'value2'); + assert.deepEqual(metadata.get('key'), ['value2']); + }); + it('Normalizes keys', function() { + metadata.set('Key', 'value1'); + assert.deepEqual(metadata.get('key'), ['value1']); + metadata.set('KEY', 'value2'); + assert.deepEqual(metadata.get('key'), ['value2']); + }); + }); + describe('#add', function() { + it('Only accepts string values for non "-bin" keys', function() { + assert.throws(function() { + metadata.add('key', new Buffer('value')); + }); + assert.doesNotThrow(function() { + metadata.add('key', 'value'); + }); + }); + it('Only accepts Buffer values for "-bin" keys', function() { + assert.throws(function() { + metadata.add('key-bin', 'value'); + }); + assert.doesNotThrow(function() { + metadata.add('key-bin', new Buffer('value')); + }); + }); + it('Rejects invalid keys', function() { + assert.throws(function() { + metadata.add('key$', 'value'); + }); + assert.throws(function() { + metadata.add('', 'value'); + }); + }); + it('Saves values that can be retrieved', function() { + metadata.add('key', 'value'); + assert.deepEqual(metadata.get('key'), ['value']); + }); + it('Combines with previous values', function() { + metadata.add('key', 'value1'); + metadata.add('key', 'value2'); + assert.deepEqual(metadata.get('key'), ['value1', 'value2']); + }); + it('Normalizes keys', function() { + metadata.add('Key', 'value1'); + assert.deepEqual(metadata.get('key'), ['value1']); + metadata.add('KEY', 'value2'); + assert.deepEqual(metadata.get('key'), ['value1', 'value2']); + }); + }); + describe('#remove', function() { + it('clears values from a key', function() { + metadata.add('key', 'value'); + metadata.remove('key'); + assert.deepEqual(metadata.get('key'), []); + }); + it('Normalizes keys', function() { + metadata.add('key', 'value'); + metadata.remove('KEY'); + assert.deepEqual(metadata.get('key'), []); + }); + }); + describe('#get', function() { + beforeEach(function() { + metadata.add('key', 'value1'); + metadata.add('key', 'value2'); + metadata.add('key-bin', new Buffer('value')); + }); + it('gets all values associated with a key', function() { + assert.deepEqual(metadata.get('key'), ['value1', 'value2']); + }); + it('Normalizes keys', function() { + assert.deepEqual(metadata.get('KEY'), ['value1', 'value2']); + }); + it('returns an empty list for non-existent keys', function() { + assert.deepEqual(metadata.get('non-existent-key'), []); + }); + it('returns Buffers for "-bin" keys', function() { + assert(metadata.get('key-bin')[0] instanceof Buffer); + }); + }); + describe('#getMap', function() { + it('gets a map of keys to values', function() { + metadata.add('key1', 'value1'); + metadata.add('Key2', 'value2'); + metadata.add('KEY3', 'value3'); + assert.deepEqual(metadata.getMap(), + {key1: 'value1', + key2: 'value2', + key3: 'value3'}); + }); + }); + describe('#clone', function() { + it('retains values from the original', function() { + metadata.add('key', 'value'); + var copy = metadata.clone(); + assert.deepEqual(copy.get('key'), ['value']); + }); + it('Does not see newly added values', function() { + metadata.add('key', 'value1'); + var copy = metadata.clone(); + metadata.add('key', 'value2'); + assert.deepEqual(copy.get('key'), ['value1']); + }); + it('Does not add new values to the original', function() { + metadata.add('key', 'value1'); + var copy = metadata.clone(); + copy.add('key', 'value2'); + assert.deepEqual(metadata.get('key'), ['value1']); + }); + }); +}); diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js index 6e45871fc8..7c2a8d7258 100644 --- a/src/node/test/surface_test.js +++ b/src/node/test/surface_test.js @@ -262,6 +262,7 @@ describe('Generic client and server', function() { describe('Echo metadata', function() { var client; var server; + var metadata; before(function() { var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto'); var test_service = test_proto.lookup('TestService'); @@ -294,6 +295,8 @@ describe('Echo metadata', function() { var Client = surface_client.makeProtobufClientConstructor(test_service); client = new Client('localhost:' + port, grpc.Credentials.createInsecure()); server.start(); + metadata = new grpc.Metadata(); + metadata.set('key', 'value'); }); after(function() { server.forceShutdown(); @@ -301,35 +304,35 @@ describe('Echo metadata', function() { it('with unary call', function(done) { var call = client.unary({}, function(err, data) { assert.ifError(err); - }, {key: ['value']}); + }, metadata); call.on('metadata', function(metadata) { - assert.deepEqual(metadata.key, ['value']); + assert.deepEqual(metadata.get('key'), ['value']); done(); }); }); it('with client stream call', function(done) { var call = client.clientStream(function(err, data) { assert.ifError(err); - }, {key: ['value']}); + }, metadata); call.on('metadata', function(metadata) { - assert.deepEqual(metadata.key, ['value']); + assert.deepEqual(metadata.get('key'), ['value']); done(); }); call.end(); }); it('with server stream call', function(done) { - var call = client.serverStream({}, {key: ['value']}); + var call = client.serverStream({}, metadata); call.on('data', function() {}); call.on('metadata', function(metadata) { - assert.deepEqual(metadata.key, ['value']); + assert.deepEqual(metadata.get('key'), ['value']); done(); }); }); it('with bidi stream call', function(done) { - var call = client.bidiStream({key: ['value']}); + var call = client.bidiStream(metadata); call.on('data', function() {}); call.on('metadata', function(metadata) { - assert.deepEqual(metadata.key, ['value']); + assert.deepEqual(metadata.get('key'), ['value']); done(); }); call.end(); @@ -337,9 +340,10 @@ describe('Echo metadata', function() { it('shows the correct user-agent string', function(done) { var version = require('../package.json').version; var call = client.unary({}, function(err, data) { assert.ifError(err); }, - {key: ['value']}); + metadata); call.on('metadata', function(metadata) { - assert(_.startsWith(metadata['user-agent'], 'grpc-node/' + version)); + assert(_.startsWith(metadata.get('user-agent')[0], + 'grpc-node/' + version)); done(); }); }); @@ -354,13 +358,15 @@ describe('Other conditions', function() { var test_proto = ProtoBuf.loadProtoFile(__dirname + '/test_service.proto'); test_service = test_proto.lookup('TestService'); server = new grpc.Server(); + var trailer_metadata = new grpc.Metadata(); + trailer_metadata.add('trailer-present', 'yes'); server.addProtoService(test_service, { unary: function(call, cb) { var req = call.request; if (req.error) { - cb(new Error('Requested error'), null, {trailer_present: ['yes']}); + cb(new Error('Requested error'), null, trailer_metadata); } else { - cb(null, {count: 1}, {trailer_present: ['yes']}); + cb(null, {count: 1}, trailer_metadata); } }, clientStream: function(stream, cb){ @@ -369,14 +375,14 @@ describe('Other conditions', function() { stream.on('data', function(data) { if (data.error) { errored = true; - cb(new Error('Requested error'), null, {trailer_present: ['yes']}); + cb(new Error('Requested error'), null, trailer_metadata); } else { count += 1; } }); stream.on('end', function() { if (!errored) { - cb(null, {count: count}, {trailer_present: ['yes']}); + cb(null, {count: count}, trailer_metadata); } }); }, @@ -384,13 +390,13 @@ describe('Other conditions', function() { var req = stream.request; if (req.error) { var err = new Error('Requested error'); - err.metadata = {trailer_present: ['yes']}; + err.metadata = trailer_metadata; stream.emit('error', err); } else { for (var i = 0; i < 5; i++) { stream.write({count: i}); } - stream.end({trailer_present: ['yes']}); + stream.end(trailer_metadata); } }, bidiStream: function(stream) { @@ -398,10 +404,8 @@ describe('Other conditions', function() { stream.on('data', function(data) { if (data.error) { var err = new Error('Requested error'); - err.metadata = { - trailer_present: ['yes'], - count: ['' + count] - }; + err.metadata = trailer_metadata.clone(); + err.metadata.add('count', '' + count); stream.emit('error', err); } else { stream.write({count: count}); @@ -409,7 +413,7 @@ describe('Other conditions', function() { } }); stream.on('end', function() { - stream.end({trailer_present: ['yes']}); + stream.end(trailer_metadata); }); } }); @@ -510,7 +514,7 @@ describe('Other conditions', function() { assert.ifError(err); }); call.on('status', function(status) { - assert.deepEqual(status.metadata.trailer_present, ['yes']); + assert.deepEqual(status.metadata.get('trailer-present'), ['yes']); done(); }); }); @@ -519,7 +523,7 @@ describe('Other conditions', function() { assert(err); }); call.on('status', function(status) { - assert.deepEqual(status.metadata.trailer_present, ['yes']); + assert.deepEqual(status.metadata.get('trailer-present'), ['yes']); done(); }); }); @@ -531,7 +535,7 @@ describe('Other conditions', function() { call.write({error: false}); call.end(); call.on('status', function(status) { - assert.deepEqual(status.metadata.trailer_present, ['yes']); + assert.deepEqual(status.metadata.get('trailer-present'), ['yes']); done(); }); }); @@ -543,7 +547,7 @@ describe('Other conditions', function() { call.write({error: true}); call.end(); call.on('status', function(status) { - assert.deepEqual(status.metadata.trailer_present, ['yes']); + assert.deepEqual(status.metadata.get('trailer-present'), ['yes']); done(); }); }); @@ -552,7 +556,7 @@ describe('Other conditions', function() { call.on('data', function(){}); call.on('status', function(status) { assert.strictEqual(status.code, grpc.status.OK); - assert.deepEqual(status.metadata.trailer_present, ['yes']); + assert.deepEqual(status.metadata.get('trailer-present'), ['yes']); done(); }); }); @@ -560,7 +564,7 @@ describe('Other conditions', function() { var call = client.serverStream({error: true}); call.on('data', function(){}); call.on('error', function(error) { - assert.deepEqual(error.metadata.trailer_present, ['yes']); + assert.deepEqual(error.metadata.get('trailer-present'), ['yes']); done(); }); }); @@ -572,7 +576,7 @@ describe('Other conditions', function() { call.on('data', function(){}); call.on('status', function(status) { assert.strictEqual(status.code, grpc.status.OK); - assert.deepEqual(status.metadata.trailer_present, ['yes']); + assert.deepEqual(status.metadata.get('trailer-present'), ['yes']); done(); }); }); @@ -583,7 +587,7 @@ describe('Other conditions', function() { call.end(); call.on('data', function(){}); call.on('error', function(error) { - assert.deepEqual(error.metadata.trailer_present, ['yes']); + assert.deepEqual(error.metadata.get('trailer-present'), ['yes']); done(); }); }); diff --git a/src/python/grpcio/grpc/_links/service.py b/src/python/grpcio/grpc/_links/service.py index 10634e43b5..393f80c1cc 100644 --- a/src/python/grpcio/grpc/_links/service.py +++ b/src/python/grpcio/grpc/_links/service.py @@ -316,9 +316,8 @@ class _Kernel(object): call.status(status, call) self._rpc_states.pop(call, None) - def add_port(self, port, server_credentials): + def add_port(self, address, server_credentials): with self._lock: - address = '[::]:%d' % port if self._server is None: self._completion_queue = _intermediary_low.CompletionQueue() self._server = _intermediary_low.Server(self._completion_queue) @@ -362,17 +361,20 @@ class ServiceLink(links.Link): """ @abc.abstractmethod - def add_port(self, port, server_credentials): + def add_port(self, address, server_credentials): """Adds a port on which to service RPCs after this link has been started. Args: - port: The port on which to service RPCs, or zero to request that a port - be automatically selected and used. + address: The address on which to service RPCs with a port number of zero + requesting that a port number be automatically selected and used. server_credentials: An _intermediary_low.ServerCredentials object, or None for insecure service. Returns: - A port on which RPCs will be serviced after this link has been started. + A integer port on which RPCs will be serviced after this link has been + started. This is typically the same number as the port number contained + in the passed address, but will likely be different if the port number + contained in the passed address was zero. """ raise NotImplementedError() @@ -417,8 +419,8 @@ class _ServiceLink(ServiceLink): def join_link(self, link): self._relay.set_behavior(link.accept_ticket) - def add_port(self, port, server_credentials): - return self._kernel.add_port(port, server_credentials) + def add_port(self, address, server_credentials): + return self._kernel.add_port(address, server_credentials) def start(self): self._relay.start() diff --git a/src/python/grpcio/grpc/framework/core/_end.py b/src/python/grpcio/grpc/framework/core/_end.py index fb2c532df6..5ef2f6d3a3 100644 --- a/src/python/grpcio/grpc/framework/core/_end.py +++ b/src/python/grpcio/grpc/framework/core/_end.py @@ -30,7 +30,6 @@ """Implementation of base.End.""" import abc -import enum import threading import uuid @@ -75,7 +74,7 @@ def _abort(operations): def _cancel_futures(futures): for future in futures: - futures.cancel() + future.cancel() def _future_shutdown(lock, cycle, event): @@ -83,8 +82,6 @@ def _future_shutdown(lock, cycle, event): with lock: _abort(cycle.operations.values()) _cancel_futures(cycle.futures) - pool = cycle.pool - cycle.pool.shutdown(wait=True) return in_future @@ -113,6 +110,7 @@ def _termination_action(lock, stats, operation_id, cycle): cycle.idle_actions = [] if cycle.grace: _cancel_futures(cycle.futures) + cycle.pool.shutdown(wait=False) return termination_action diff --git a/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py b/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py index c8e325aea0..5ed5ec0b9a 100644 --- a/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py +++ b/src/python/grpcio_test/grpc_test/_core_over_links_base_interface_test.py @@ -91,7 +91,7 @@ class _Implementation(test_interfaces.Implementation): service_grpc_link = service.service_link( serialization_behaviors.request_deserializers, serialization_behaviors.response_serializers) - port = service_grpc_link.add_port(0, None) + port = service_grpc_link.add_port('[::]:0', None) channel = _intermediary_low.Channel('localhost:%d' % port, None) invocation_grpc_link = invocation.invocation_link( channel, b'localhost', diff --git a/src/python/grpcio_test/grpc_test/_crust_over_core_over_links_face_interface_test.py b/src/python/grpcio_test/grpc_test/_crust_over_core_over_links_face_interface_test.py index 5ba6d4afe6..ce7c6f9e7a 100644 --- a/src/python/grpcio_test/grpc_test/_crust_over_core_over_links_face_interface_test.py +++ b/src/python/grpcio_test/grpc_test/_crust_over_core_over_links_face_interface_test.py @@ -84,7 +84,7 @@ class _Implementation(test_interfaces.Implementation): service_grpc_link = service.service_link( serialization_behaviors.request_deserializers, serialization_behaviors.response_serializers) - port = service_grpc_link.add_port(0, None) + port = service_grpc_link.add_port('[::]:0', None) channel = _intermediary_low.Channel('localhost:%d' % port, None) invocation_grpc_link = invocation.invocation_link( channel, b'localhost', diff --git a/src/python/grpcio_test/grpc_test/_links/_transmission_test.py b/src/python/grpcio_test/grpc_test/_links/_transmission_test.py index db011bca66..0fef9b0c5a 100644 --- a/src/python/grpcio_test/grpc_test/_links/_transmission_test.py +++ b/src/python/grpcio_test/grpc_test/_links/_transmission_test.py @@ -50,7 +50,7 @@ class TransmissionTest(test_cases.TransmissionTest, unittest.TestCase): service_link = service.service_link( {self.group_and_method(): self.deserialize_request}, {self.group_and_method(): self.serialize_response}) - port = service_link.add_port(0, None) + port = service_link.add_port('[::]:0', None) service_link.start() channel = _intermediary_low.Channel('localhost:%d' % port, None) invocation_link = invocation.invocation_link( @@ -116,7 +116,7 @@ class RoundTripTest(unittest.TestCase): identity_transformation, identity_transformation) service_mate = test_utilities.RecordingLink() service_link.join_link(service_mate) - port = service_link.add_port(0, None) + port = service_link.add_port('[::]:0', None) service_link.start() channel = _intermediary_low.Channel('localhost:%d' % port, None) invocation_link = invocation.invocation_link( @@ -160,7 +160,7 @@ class RoundTripTest(unittest.TestCase): {(test_group, test_method): scenario.serialize_response}) service_mate = test_utilities.RecordingLink() service_link.join_link(service_mate) - port = service_link.add_port(0, None) + port = service_link.add_port('[::]:0', None) service_link.start() channel = _intermediary_low.Channel('localhost:%d' % port, None) invocation_link = invocation.invocation_link( |