diff options
Diffstat (limited to 'src')
32 files changed, 229 insertions, 85 deletions
diff --git a/src/core/census/initialize.c b/src/core/census/initialize.c index b7af714e0b..ce7ec09b89 100644 --- a/src/core/census/initialize.c +++ b/src/core/census/initialize.c @@ -37,9 +37,7 @@ static int features_enabled = CENSUS_FEATURE_NONE; int census_initialize(int features) { if (features_enabled != CENSUS_FEATURE_NONE) { - return 1; - } - if (features == CENSUS_FEATURE_NONE) { + // Must have been a previous call to census_initialize; return error return 1; } features_enabled = features; diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c index e6ddb1a11f..5b10600ab5 100644 --- a/src/core/client_config/lb_policies/pick_first.c +++ b/src/core/client_config/lb_policies/pick_first.c @@ -76,7 +76,7 @@ typedef struct { } pick_first_lb_policy; #define GET_SELECTED(p) \ - ((grpc_connected_subchannel *)gpr_atm_no_barrier_load(&(p)->selected)) + ((grpc_connected_subchannel *)gpr_atm_acq_load(&(p)->selected)) void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) { pick_first_lb_policy *p = (pick_first_lb_policy *)pol; @@ -268,10 +268,10 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg, selected = grpc_subchannel_get_connected_subchannel(selected_subchannel); GPR_ASSERT(selected != NULL); - gpr_atm_no_barrier_store(&p->selected, (gpr_atm)selected); GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked_first"); /* drop the pick list: we are connected now */ GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels"); + gpr_atm_rel_store(&p->selected, (gpr_atm)selected); grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(destroy_subchannels, p), 1); /* update any calls that were waiting for a pick */ diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c index 2992da8b79..748eef9bed 100644 --- a/src/core/client_config/subchannel.c +++ b/src/core/client_config/subchannel.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -519,7 +519,12 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) { } /* publish */ - GPR_ASSERT(gpr_atm_no_barrier_cas(&c->connected_subchannel, 0, (gpr_atm)con)); + /* TODO(ctiller): this full barrier seems to clear up a TSAN failure. + I'd have expected the rel_cas below to be enough, but + seemingly it's not. + Re-evaluate if we really need this. */ + gpr_atm_full_barrier(); + GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con)); c->connecting = 0; /* setup subchannel watching connected subchannel for changes; subchannel ref diff --git a/src/core/surface/init.c b/src/core/surface/init.c index 19cea4c4f6..e3ab70dba7 100644 --- a/src/core/surface/init.c +++ b/src/core/surface/init.c @@ -117,8 +117,10 @@ void grpc_init(void) { grpc_iomgr_init(); grpc_executor_init(); grpc_tracer_init("GRPC_TRACE"); - /* Only initialize census if noone else has. */ - if (census_enabled() == CENSUS_FEATURE_NONE) { + /* Only initialize census if no one else has and some features are + * available. */ + if (census_enabled() == CENSUS_FEATURE_NONE && + census_supported() != CENSUS_FEATURE_NONE) { if (census_initialize(census_supported())) { /* enable all features. */ gpr_log(GPR_ERROR, "Could not initialize census."); } diff --git a/src/core/surface/version.c b/src/core/surface/version.c index aada18e07e..262a13f184 100644 --- a/src/core/surface/version.c +++ b/src/core/surface/version.c @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc index 2aa532808c..710d7cb5c2 100644 --- a/src/cpp/client/client_context.cc +++ b/src/cpp/client/client_context.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -48,6 +48,7 @@ namespace grpc { class DefaultGlobalClientCallbacks GRPC_FINAL : public ClientContext::GlobalCallbacks { public: + ~DefaultGlobalClientCallbacks() GRPC_OVERRIDE {} void DefaultConstructor(ClientContext* context) GRPC_OVERRIDE {} void Destructor(ClientContext* context) GRPC_OVERRIDE {} }; diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 878775bbee..3bf9f3fa0f 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -53,17 +53,17 @@ namespace grpc { class DefaultGlobalCallbacks GRPC_FINAL : public Server::GlobalCallbacks { public: + ~DefaultGlobalCallbacks() GRPC_OVERRIDE {} void PreSynchronousRequest(ServerContext* context) GRPC_OVERRIDE {} void PostSynchronousRequest(ServerContext* context) GRPC_OVERRIDE {} }; -static Server::GlobalCallbacks* g_callbacks = nullptr; +static std::shared_ptr<Server::GlobalCallbacks> g_callbacks = nullptr; static gpr_once g_once_init_callbacks = GPR_ONCE_INIT; static void InitGlobalCallbacks() { if (g_callbacks == nullptr) { - static DefaultGlobalCallbacks default_global_callbacks; - g_callbacks = &default_global_callbacks; + g_callbacks.reset(new DefaultGlobalCallbacks()); } } @@ -234,12 +234,12 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { } } - void Run() { + void Run(std::shared_ptr<GlobalCallbacks> global_callbacks) { ctx_.BeginCompletionOp(&call_); - g_callbacks->PreSynchronousRequest(&ctx_); + global_callbacks->PreSynchronousRequest(&ctx_); method_->handler()->RunHandler(MethodHandler::HandlerParameter( &call_, &ctx_, request_payload_, call_.max_message_size())); - g_callbacks->PostSynchronousRequest(&ctx_); + global_callbacks->PostSynchronousRequest(&ctx_); request_payload_ = nullptr; void* ignored_tag; bool ignored_ok; @@ -287,6 +287,7 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, thread_pool_(thread_pool), thread_pool_owned_(thread_pool_owned) { gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks); + global_callbacks_ = g_callbacks; grpc_server_register_completion_queue(server_, cq_.cq(), nullptr); } @@ -311,7 +312,7 @@ Server::~Server() { void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) { GPR_ASSERT(g_callbacks == nullptr); GPR_ASSERT(callbacks != nullptr); - g_callbacks = callbacks; + g_callbacks.reset(callbacks); } bool Server::RegisterService(const grpc::string* host, RpcService* service) { @@ -569,7 +570,7 @@ void Server::RunRpc() { } } GPR_TIMER_SCOPE("cd.Run()", 0); - cd.Run(); + cd.Run(global_callbacks_); } } diff --git a/src/cpp/util/byte_buffer.cc b/src/cpp/util/byte_buffer.cc index 2952f94b24..3a2318d1a6 100644 --- a/src/cpp/util/byte_buffer.cc +++ b/src/cpp/util/byte_buffer.cc @@ -31,8 +31,8 @@ * */ -#include <grpc/byte_buffer_reader.h> #include <grpc++/support/byte_buffer.h> +#include <grpc/byte_buffer_reader.h> namespace grpc { @@ -84,8 +84,10 @@ ByteBuffer::ByteBuffer(const ByteBuffer& buf) : buffer_(grpc_byte_buffer_copy(buf.buffer_)) {} ByteBuffer& ByteBuffer::operator=(const ByteBuffer& buf) { - Clear(); // first remove existing data - buffer_ = grpc_byte_buffer_copy(buf.buffer_); // then copy + Clear(); // first remove existing data + if (buf.buffer_) { + buffer_ = grpc_byte_buffer_copy(buf.buffer_); // then copy + } return *this; } diff --git a/src/csharp/Grpc.Core/VersionInfo.cs b/src/csharp/Grpc.Core/VersionInfo.cs index 4c6d50356c..65813909de 100644 --- a/src/csharp/Grpc.Core/VersionInfo.cs +++ b/src/csharp/Grpc.Core/VersionInfo.cs @@ -1,6 +1,6 @@ #region Copyright notice and license -// Copyright 2015, Google Inc. +// Copyright 2015-2016, Google Inc. // All rights reserved. // // Redistribution and use in source and binary forms, with or without diff --git a/src/node/ext/byte_buffer.cc b/src/node/ext/byte_buffer.cc index c306292c04..ee703fdc91 100644 --- a/src/node/ext/byte_buffer.cc +++ b/src/node/ext/byte_buffer.cc @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -69,7 +69,7 @@ Local<Value> ByteBufferToBuffer(grpc_byte_buffer *buffer) { return scope.Escape(Nan::Null()); } size_t length = grpc_byte_buffer_length(buffer); - char *result = reinterpret_cast<char *>(calloc(length, sizeof(char))); + char *result = new char[length]; size_t offset = 0; grpc_byte_buffer_reader reader; grpc_byte_buffer_reader_init(&reader, buffer); diff --git a/src/node/index.js b/src/node/index.js index 0d1a7fd887..7eacdc67b1 100644 --- a/src/node/index.js +++ b/src/node/index.js @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -51,7 +51,7 @@ var server = require('./src/server.js'); var Metadata = require('./src/metadata.js'); -var grpc = require('bindings')('grpc_node'); +var grpc = require('./src/grpc_extension'); /** * Load a gRPC object from an existing ProtoBuf.Reflect object. diff --git a/src/node/src/client.js b/src/node/src/client.js index d57826781d..b5247a69ee 100644 --- a/src/node/src/client.js +++ b/src/node/src/client.js @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -51,7 +51,7 @@ var _ = require('lodash'); -var grpc = require('bindings')('grpc_node'); +var grpc = require('./grpc_extension'); var common = require('./common'); diff --git a/src/node/src/credentials.js b/src/node/src/credentials.js index dcbfac18f4..710ab6d879 100644 --- a/src/node/src/credentials.js +++ b/src/node/src/credentials.js @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -61,7 +61,7 @@ 'use strict'; -var grpc = require('bindings')('grpc_node.node'); +var grpc = require('./grpc_extension'); var CallCredentials = grpc.CallCredentials; diff --git a/src/node/src/grpc_extension.js b/src/node/src/grpc_extension.js new file mode 100644 index 0000000000..d4eca65fcb --- /dev/null +++ b/src/node/src/grpc_extension.js @@ -0,0 +1,40 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +var binary = require('node-pre-gyp'); +var path = require('path'); +var binding_path = binary.find(path.resolve( + path.join(__dirname,'../../../package.json'))); +var binding = require(binding_path); + +module.exports = binding; diff --git a/src/node/src/metadata.js b/src/node/src/metadata.js index fef79f959e..51a9f8a216 100644 --- a/src/node/src/metadata.js +++ b/src/node/src/metadata.js @@ -49,7 +49,7 @@ var _ = require('lodash'); -var grpc = require('bindings')('grpc_node'); +var grpc = require('./grpc_extension'); /** * Class for storing metadata. Keys are normalized to lowercase ASCII. diff --git a/src/node/src/server.js b/src/node/src/server.js index ceaa9f5d1f..e5aadcd565 100644 --- a/src/node/src/server.js +++ b/src/node/src/server.js @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -51,7 +51,7 @@ var _ = require('lodash'); -var grpc = require('bindings')('grpc_node'); +var grpc = require('./grpc_extension'); var common = require('./common'); diff --git a/src/node/test/call_test.js b/src/node/test/call_test.js index f1f86b35db..2300096d03 100644 --- a/src/node/test/call_test.js +++ b/src/node/test/call_test.js @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -34,7 +34,7 @@ 'use strict'; var assert = require('assert'); -var grpc = require('bindings')('grpc_node'); +var grpc = require('../src/grpc_extension'); /** * Helper function to return an absolute deadline given a relative timeout in diff --git a/src/node/test/channel_test.js b/src/node/test/channel_test.js index 7163a5fb5e..c0ae2b769a 100644 --- a/src/node/test/channel_test.js +++ b/src/node/test/channel_test.js @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -34,7 +34,7 @@ 'use strict'; var assert = require('assert'); -var grpc = require('bindings')('grpc_node'); +var grpc = require('../src/grpc_extension'); /** * This is used for testing functions with multiple asynchronous calls that diff --git a/src/node/test/constant_test.js b/src/node/test/constant_test.js index b17cd339cb..712c70706d 100644 --- a/src/node/test/constant_test.js +++ b/src/node/test/constant_test.js @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -34,7 +34,7 @@ 'use strict'; var assert = require('assert'); -var grpc = require('bindings')('grpc_node'); +var grpc = require('../src/grpc_extension'); /** * List of all status names diff --git a/src/node/test/end_to_end_test.js b/src/node/test/end_to_end_test.js index 0f6c5941c4..353c6c761d 100644 --- a/src/node/test/end_to_end_test.js +++ b/src/node/test/end_to_end_test.js @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -34,7 +34,7 @@ 'use strict'; var assert = require('assert'); -var grpc = require('bindings')('grpc_node'); +var grpc = require('../src/grpc_extension'); /** * This is used for testing functions with multiple asynchronous calls that diff --git a/src/node/test/server_test.js b/src/node/test/server_test.js index 592f47e145..71a9647184 100644 --- a/src/node/test/server_test.js +++ b/src/node/test/server_test.js @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -36,7 +36,7 @@ var assert = require('assert'); var fs = require('fs'); var path = require('path'); -var grpc = require('bindings')('grpc_node'); +var grpc = require('../src/grpc_extension'); describe('server', function() { describe('constructor', function() { diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js index fc765ed731..c65d95f767 100644 --- a/src/node/test/surface_test.js +++ b/src/node/test/surface_test.js @@ -1,6 +1,6 @@ /* * - * Copyright 2015, Google Inc. + * Copyright 2015-2016, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -952,6 +952,7 @@ describe('Call propagation', function() { describe('Cancellation', function() { it('With a unary call', function(done) { done = multiDone(done, 2); + var call; proxy_impl.unary = function(parent, callback) { client.unary(parent.request, function(err, value) { try { @@ -969,12 +970,13 @@ describe('Call propagation', function() { proxy.start(); var proxy_client = new Client('localhost:' + proxy_port, grpc.credentials.createInsecure()); - var call = proxy_client.unary({}, function(err, value) { + call = proxy_client.unary({}, function(err, value) { done(); }); }); it('With a client stream call', function(done) { done = multiDone(done, 2); + var call; proxy_impl.clientStream = function(parent, callback) { client.clientStream(function(err, value) { try { @@ -992,12 +994,13 @@ describe('Call propagation', function() { proxy.start(); var proxy_client = new Client('localhost:' + proxy_port, grpc.credentials.createInsecure()); - var call = proxy_client.clientStream(function(err, value) { + call = proxy_client.clientStream(function(err, value) { done(); }); }); it('With a server stream call', function(done) { done = multiDone(done, 2); + var call; proxy_impl.serverStream = function(parent) { var child = client.serverStream(parent.request, null, {parent: parent}); @@ -1013,13 +1016,14 @@ describe('Call propagation', function() { proxy.start(); var proxy_client = new Client('localhost:' + proxy_port, grpc.credentials.createInsecure()); - var call = proxy_client.serverStream({}); + call = proxy_client.serverStream({}); call.on('error', function(err) { done(); }); }); it('With a bidi stream call', function(done) { done = multiDone(done, 2); + var call; proxy_impl.bidiStream = function(parent) { var child = client.bidiStream(null, {parent: parent}); child.on('error', function(err) { @@ -1034,7 +1038,7 @@ describe('Call propagation', function() { proxy.start(); var proxy_client = new Client('localhost:' + proxy_port, grpc.credentials.createInsecure()); - var call = proxy_client.bidiStream(); + call = proxy_client.bidiStream(); call.on('error', function(err) { done(); }); diff --git a/src/proto/grpc/testing/control.proto b/src/proto/grpc/testing/control.proto index 0784ebf91c..7ba6f9856f 100644 --- a/src/proto/grpc/testing/control.proto +++ b/src/proto/grpc/testing/control.proto @@ -1,4 +1,4 @@ -// Copyright 2015, Google Inc. +// Copyright 2015-2016, Google Inc. // All rights reserved. // // Redistribution and use in source and binary forms, with or without @@ -42,6 +42,7 @@ enum ClientType { enum ServerType { SYNC_SERVER = 0; ASYNC_SERVER = 1; + ASYNC_GENERIC_SERVER = 2; } enum RpcType { diff --git a/src/proto/grpc/testing/echo_messages.proto b/src/proto/grpc/testing/echo_messages.proto index f01d645af7..d05a35548d 100644 --- a/src/proto/grpc/testing/echo_messages.proto +++ b/src/proto/grpc/testing/echo_messages.proto @@ -1,5 +1,5 @@ -// Copyright 2015, Google Inc. +// Copyright 2015-2016, Google Inc. // All rights reserved. // // Redistribution and use in source and binary forms, with or without @@ -41,6 +41,7 @@ message RequestParams { int32 response_message_length = 6; bool echo_peer = 7; string expected_client_identity = 8; // will force check_auth_context. + bool skip_cancelled_check = 9; } message EchoRequest { diff --git a/src/python/grpcio/grpc/framework/foundation/logging_pool.py b/src/python/grpcio/grpc/framework/foundation/logging_pool.py index 7c7a6eebfc..f82c7f7fba 100644 --- a/src/python/grpcio/grpc/framework/foundation/logging_pool.py +++ b/src/python/grpcio/grpc/framework/foundation/logging_pool.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -29,7 +29,6 @@ """A thread pool that logs exceptions raised by tasks executed within it.""" -import functools import logging from concurrent import futures @@ -37,12 +36,12 @@ from concurrent import futures def _wrap(behavior): """Wraps an arbitrary callable behavior in exception-logging.""" - @functools.wraps(behavior) def _wrapping(*args, **kwargs): try: return behavior(*args, **kwargs) except Exception as e: - logging.exception('Unexpected exception from task run in logging pool!') + logging.exception( + 'Unexpected exception from %s executed in logging pool!', behavior) raise return _wrapping diff --git a/src/python/grpcio/tests/unit/framework/foundation/_logging_pool_test.py b/src/python/grpcio/tests/unit/framework/foundation/_logging_pool_test.py index 452802da6a..0521e1c102 100644 --- a/src/python/grpcio/tests/unit/framework/foundation/_logging_pool_test.py +++ b/src/python/grpcio/tests/unit/framework/foundation/_logging_pool_test.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -29,6 +29,7 @@ """Tests for grpc.framework.foundation.logging_pool.""" +import threading import unittest from grpc.framework.foundation import logging_pool @@ -36,6 +37,21 @@ from grpc.framework.foundation import logging_pool _POOL_SIZE = 16 +class _CallableObject(object): + + def __init__(self): + self._lock = threading.Lock() + self._passed_values = [] + + def __call__(self, value): + with self._lock: + self._passed_values.append(value) + + def passed_values(self): + with self._lock: + return tuple(self._passed_values) + + class LoggingPoolTest(unittest.TestCase): def testUpAndDown(self): @@ -59,6 +75,14 @@ class LoggingPoolTest(unittest.TestCase): self.assertIsNotNone(raised_exception) + def testCallableObjectExecuted(self): + callable_object = _CallableObject() + passed_object = object() + with logging_pool.pool(_POOL_SIZE) as pool: + future = pool.submit(callable_object, passed_object) + self.assertIsNone(future.result()) + self.assertSequenceEqual((passed_object,), callable_object.passed_values()) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py index 3bcefa601d..c8a3a1bc74 100644 --- a/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -30,9 +30,12 @@ """Test code for the Face layer of RPC Framework.""" import abc +import itertools import unittest +from concurrent import futures # test_interfaces is referenced from specification in this module. +from grpc.framework.foundation import logging_pool from grpc.framework.interfaces.face import face from tests.unit.framework.common import test_constants from tests.unit.framework.common import test_control @@ -139,13 +142,50 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): test_messages.verify(second_request, second_response, self) - @unittest.skip('Parallel invocations impossible with blocking control flow!') def testParallelInvocations(self): - raise NotImplementedError() + pool = logging_pool.pool(test_constants.PARALLELISM) + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = [] + response_futures = [] + for _ in range(test_constants.PARALLELISM): + request = test_messages.request() + response_future = pool.submit( + self._invoker.blocking(group, method), request, + test_constants.LONG_TIMEOUT) + requests.append(request) + response_futures.append(response_future) + + responses = [ + response_future.result() for response_future in response_futures] + + for request, response in zip(requests, responses): + test_messages.verify(request, response, self) + pool.shutdown(wait=True) - @unittest.skip('Parallel invocations impossible with blocking control flow!') def testWaitingForSomeButNotAllParallelInvocations(self): - raise NotImplementedError() + pool = logging_pool.pool(test_constants.PARALLELISM) + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + requests = [] + response_futures_to_indices = {} + for index in range(test_constants.PARALLELISM): + request = test_messages.request() + response_future = pool.submit( + self._invoker.blocking(group, method), request, + test_constants.LONG_TIMEOUT) + requests.append(request) + response_futures_to_indices[response_future] = index + + some_completed_response_futures_iterator = itertools.islice( + futures.as_completed(response_futures_to_indices), + test_constants.PARALLELISM / 2) + for response_future in some_completed_response_futures_iterator: + index = response_futures_to_indices[response_future] + test_messages.verify(requests[index], response_future.result(), self) + pool.shutdown(wait=True) @unittest.skip('Cancellation impossible with blocking control flow!') def testCancelledUnaryRequestUnaryResponse(self): diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py index fc8daa992f..1d36a931e8 100644 --- a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py @@ -31,8 +31,10 @@ import abc import contextlib +import itertools import threading import unittest +from concurrent import futures # test_interfaces is referenced from specification in this module. from grpc.framework.foundation import logging_pool @@ -219,6 +221,23 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): test_messages.verify(second_request, second_response, self) + def testParallelInvocations(self): + for (group, method), test_messages_sequence in ( + self._digest.unary_unary_messages_sequences.iteritems()): + for test_messages in test_messages_sequence: + first_request = test_messages.request() + second_request = test_messages.request() + + first_response_future = self._invoker.future(group, method)( + first_request, test_constants.LONG_TIMEOUT) + second_response_future = self._invoker.future(group, method)( + second_request, test_constants.LONG_TIMEOUT) + first_response = first_response_future.result() + second_response = second_response_future.result() + + test_messages.verify(first_request, first_response, self) + test_messages.verify(second_request, second_response, self) + for (group, method), test_messages_sequence in ( self._digest.unary_unary_messages_sequences.iteritems()): for test_messages in test_messages_sequence: @@ -237,26 +256,28 @@ class TestCase(test_coverage.Coverage, unittest.TestCase): for request, response in zip(requests, responses): test_messages.verify(request, response, self) - def testParallelInvocations(self): + def testWaitingForSomeButNotAllParallelInvocations(self): + pool = logging_pool.pool(test_constants.PARALLELISM) for (group, method), test_messages_sequence in ( self._digest.unary_unary_messages_sequences.iteritems()): for test_messages in test_messages_sequence: - first_request = test_messages.request() - second_request = test_messages.request() - - first_response_future = self._invoker.future(group, method)( - first_request, test_constants.LONG_TIMEOUT) - second_response_future = self._invoker.future(group, method)( - second_request, test_constants.LONG_TIMEOUT) - first_response = first_response_future.result() - second_response = second_response_future.result() - - test_messages.verify(first_request, first_response, self) - test_messages.verify(second_request, second_response, self) - - @unittest.skip('TODO(nathaniel): implement.') - def testWaitingForSomeButNotAllParallelInvocations(self): - raise NotImplementedError() + requests = [] + response_futures_to_indices = {} + for index in range(test_constants.PARALLELISM): + request = test_messages.request() + inner_response_future = self._invoker.future(group, method)( + request, test_constants.LONG_TIMEOUT) + outer_response_future = pool.submit(inner_response_future.result) + requests.append(request) + response_futures_to_indices[outer_response_future] = index + + some_completed_response_futures_iterator = itertools.islice( + futures.as_completed(response_futures_to_indices), + test_constants.PARALLELISM / 2) + for response_future in some_completed_response_futures_iterator: + index = response_futures_to_indices[response_future] + test_messages.verify(requests[index], response_future.result(), self) + pool.shutdown(wait=True) def testCancelledUnaryRequestUnaryResponse(self): for (group, method), test_messages_sequence in ( diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py index 2e444ff09d..42a7f4e3b8 100644 --- a/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py +++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py @@ -1,4 +1,4 @@ -# Copyright 2015, Google Inc. +# Copyright 2015-2016, Google Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -76,7 +76,7 @@ class Receiver(face.ResponseReceiver): def unary_response(self): with self._condition: if self._abortion is not None: - raise AssertionError('Aborted with abortion "%s"!' % self._abortion) + raise AssertionError('Aborted: "{}"!'.format(self._abortion)) elif len(self._responses) != 1: raise AssertionError( '%d responses received, not exactly one!', len(self._responses)) @@ -88,7 +88,7 @@ class Receiver(face.ResponseReceiver): if self._abortion is None: return list(self._responses) else: - raise AssertionError('Aborted with abortion "%s"!' % self._abortion) + raise AssertionError('Aborted: "{}"!'.format(self._abortion)) def abortion(self): with self._condition: diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c index 43adafb73f..8f33d72417 100644 --- a/src/ruby/ext/grpc/rb_call.c +++ b/src/ruby/ext/grpc/rb_call.c @@ -525,12 +525,12 @@ typedef struct run_batch_stack { grpc_status_code recv_status; char *recv_status_details; size_t recv_status_details_capacity; - uint write_flag; + unsigned write_flag; } run_batch_stack; /* grpc_run_batch_stack_init ensures the run_batch_stack is properly * initialized */ -static void grpc_run_batch_stack_init(run_batch_stack *st, uint write_flag) { +static void grpc_run_batch_stack_init(run_batch_stack *st, unsigned write_flag) { MEMZERO(st, run_batch_stack, 1); grpc_metadata_array_init(&st->send_metadata); grpc_metadata_array_init(&st->send_trailing_metadata); @@ -696,7 +696,7 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag, grpc_call_error err; VALUE result = Qnil; VALUE rb_write_flag = rb_ivar_get(self, id_write_flag); - uint write_flag = 0; + unsigned write_flag = 0; TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call); /* Validate the ops args, adding them to a ruby array */ diff --git a/src/ruby/ext/grpc/rb_call_credentials.c b/src/ruby/ext/grpc/rb_call_credentials.c index 4d719d7541..ebcc6592c2 100644 --- a/src/ruby/ext/grpc/rb_call_credentials.c +++ b/src/ruby/ext/grpc/rb_call_credentials.c @@ -79,6 +79,7 @@ static VALUE grpc_rb_call_credentials_callback(VALUE callback_args) { static VALUE grpc_rb_call_credentials_callback_rescue(VALUE args, VALUE exception_object) { VALUE result = rb_hash_new(); + (void)args; rb_hash_aset(result, rb_str_new2("metadata"), Qnil); /* Currently only gives the exception class name. It should be possible get more details */ @@ -132,6 +133,7 @@ static void grpc_rb_call_credentials_plugin_get_metadata( } static void grpc_rb_call_credentials_plugin_destroy(void *state) { + (void)state; // Not sure what needs to be done here } diff --git a/src/ruby/ext/grpc/rb_event_thread.c b/src/ruby/ext/grpc/rb_event_thread.c index 95af091317..516f0bdad2 100644 --- a/src/ruby/ext/grpc/rb_event_thread.c +++ b/src/ruby/ext/grpc/rb_event_thread.c @@ -102,6 +102,7 @@ static void grpc_rb_event_queue_destroy() { static void *grpc_rb_wait_for_event_no_gil(void *param) { grpc_rb_event *event = NULL; + (void)param; gpr_mu_lock(&event_queue.mu); while ((event = grpc_rb_event_queue_dequeue()) == NULL) { gpr_cv_wait(&event_queue.cv, @@ -117,6 +118,7 @@ static void *grpc_rb_wait_for_event_no_gil(void *param) { } static void grpc_rb_event_unblocking_func(void *arg) { + (void)arg; gpr_mu_lock(&event_queue.mu); event_queue.abort = true; gpr_cv_signal(&event_queue.cv); @@ -127,6 +129,7 @@ static void grpc_rb_event_unblocking_func(void *arg) { * events */ static VALUE grpc_rb_event_thread(VALUE arg) { grpc_rb_event *event; + (void)arg; while(true) { event = (grpc_rb_event*)rb_thread_call_without_gvl( grpc_rb_wait_for_event_no_gil, NULL, |