aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/census/initialize.c4
-rw-r--r--src/core/client_config/lb_policies/pick_first.c4
-rw-r--r--src/core/client_config/subchannel.c9
-rw-r--r--src/core/surface/init.c6
-rw-r--r--src/core/surface/version.c2
-rw-r--r--src/cpp/client/client_context.cc3
-rw-r--r--src/cpp/server/server.cc19
-rw-r--r--src/cpp/util/byte_buffer.cc8
-rw-r--r--src/csharp/Grpc.Core/VersionInfo.cs2
-rw-r--r--src/node/ext/byte_buffer.cc4
-rw-r--r--src/node/index.js4
-rw-r--r--src/node/src/client.js4
-rw-r--r--src/node/src/credentials.js4
-rw-r--r--src/node/src/grpc_extension.js40
-rw-r--r--src/node/src/metadata.js2
-rw-r--r--src/node/src/server.js4
-rw-r--r--src/node/test/call_test.js4
-rw-r--r--src/node/test/channel_test.js4
-rw-r--r--src/node/test/constant_test.js4
-rw-r--r--src/node/test/end_to_end_test.js4
-rw-r--r--src/node/test/server_test.js4
-rw-r--r--src/node/test/surface_test.js14
-rw-r--r--src/proto/grpc/testing/control.proto3
-rw-r--r--src/proto/grpc/testing/echo_messages.proto3
-rw-r--r--src/python/grpcio/grpc/framework/foundation/logging_pool.py7
-rw-r--r--src/python/grpcio/tests/unit/framework/foundation/_logging_pool_test.py26
-rw-r--r--src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py50
-rw-r--r--src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py55
-rw-r--r--src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py6
-rw-r--r--src/ruby/ext/grpc/rb_call.c6
-rw-r--r--src/ruby/ext/grpc/rb_call_credentials.c2
-rw-r--r--src/ruby/ext/grpc/rb_event_thread.c3
32 files changed, 229 insertions, 85 deletions
diff --git a/src/core/census/initialize.c b/src/core/census/initialize.c
index b7af714e0b..ce7ec09b89 100644
--- a/src/core/census/initialize.c
+++ b/src/core/census/initialize.c
@@ -37,9 +37,7 @@ static int features_enabled = CENSUS_FEATURE_NONE;
int census_initialize(int features) {
if (features_enabled != CENSUS_FEATURE_NONE) {
- return 1;
- }
- if (features == CENSUS_FEATURE_NONE) {
+ // Must have been a previous call to census_initialize; return error
return 1;
}
features_enabled = features;
diff --git a/src/core/client_config/lb_policies/pick_first.c b/src/core/client_config/lb_policies/pick_first.c
index e6ddb1a11f..5b10600ab5 100644
--- a/src/core/client_config/lb_policies/pick_first.c
+++ b/src/core/client_config/lb_policies/pick_first.c
@@ -76,7 +76,7 @@ typedef struct {
} pick_first_lb_policy;
#define GET_SELECTED(p) \
- ((grpc_connected_subchannel *)gpr_atm_no_barrier_load(&(p)->selected))
+ ((grpc_connected_subchannel *)gpr_atm_acq_load(&(p)->selected))
void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
@@ -268,10 +268,10 @@ static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
selected =
grpc_subchannel_get_connected_subchannel(selected_subchannel);
GPR_ASSERT(selected != NULL);
- gpr_atm_no_barrier_store(&p->selected, (gpr_atm)selected);
GRPC_CONNECTED_SUBCHANNEL_REF(selected, "picked_first");
/* drop the pick list: we are connected now */
GRPC_LB_POLICY_WEAK_REF(&p->base, "destroy_subchannels");
+ gpr_atm_rel_store(&p->selected, (gpr_atm)selected);
grpc_exec_ctx_enqueue(exec_ctx,
grpc_closure_create(destroy_subchannels, p), 1);
/* update any calls that were waiting for a pick */
diff --git a/src/core/client_config/subchannel.c b/src/core/client_config/subchannel.c
index 2992da8b79..748eef9bed 100644
--- a/src/core/client_config/subchannel.c
+++ b/src/core/client_config/subchannel.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -519,7 +519,12 @@ static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
}
/* publish */
- GPR_ASSERT(gpr_atm_no_barrier_cas(&c->connected_subchannel, 0, (gpr_atm)con));
+ /* TODO(ctiller): this full barrier seems to clear up a TSAN failure.
+ I'd have expected the rel_cas below to be enough, but
+ seemingly it's not.
+ Re-evaluate if we really need this. */
+ gpr_atm_full_barrier();
+ GPR_ASSERT(gpr_atm_rel_cas(&c->connected_subchannel, 0, (gpr_atm)con));
c->connecting = 0;
/* setup subchannel watching connected subchannel for changes; subchannel ref
diff --git a/src/core/surface/init.c b/src/core/surface/init.c
index 19cea4c4f6..e3ab70dba7 100644
--- a/src/core/surface/init.c
+++ b/src/core/surface/init.c
@@ -117,8 +117,10 @@ void grpc_init(void) {
grpc_iomgr_init();
grpc_executor_init();
grpc_tracer_init("GRPC_TRACE");
- /* Only initialize census if noone else has. */
- if (census_enabled() == CENSUS_FEATURE_NONE) {
+ /* Only initialize census if no one else has and some features are
+ * available. */
+ if (census_enabled() == CENSUS_FEATURE_NONE &&
+ census_supported() != CENSUS_FEATURE_NONE) {
if (census_initialize(census_supported())) { /* enable all features. */
gpr_log(GPR_ERROR, "Could not initialize census.");
}
diff --git a/src/core/surface/version.c b/src/core/surface/version.c
index aada18e07e..262a13f184 100644
--- a/src/core/surface/version.c
+++ b/src/core/surface/version.c
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc
index 2aa532808c..710d7cb5c2 100644
--- a/src/cpp/client/client_context.cc
+++ b/src/cpp/client/client_context.cc
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -48,6 +48,7 @@ namespace grpc {
class DefaultGlobalClientCallbacks GRPC_FINAL
: public ClientContext::GlobalCallbacks {
public:
+ ~DefaultGlobalClientCallbacks() GRPC_OVERRIDE {}
void DefaultConstructor(ClientContext* context) GRPC_OVERRIDE {}
void Destructor(ClientContext* context) GRPC_OVERRIDE {}
};
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 878775bbee..3bf9f3fa0f 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -53,17 +53,17 @@ namespace grpc {
class DefaultGlobalCallbacks GRPC_FINAL : public Server::GlobalCallbacks {
public:
+ ~DefaultGlobalCallbacks() GRPC_OVERRIDE {}
void PreSynchronousRequest(ServerContext* context) GRPC_OVERRIDE {}
void PostSynchronousRequest(ServerContext* context) GRPC_OVERRIDE {}
};
-static Server::GlobalCallbacks* g_callbacks = nullptr;
+static std::shared_ptr<Server::GlobalCallbacks> g_callbacks = nullptr;
static gpr_once g_once_init_callbacks = GPR_ONCE_INIT;
static void InitGlobalCallbacks() {
if (g_callbacks == nullptr) {
- static DefaultGlobalCallbacks default_global_callbacks;
- g_callbacks = &default_global_callbacks;
+ g_callbacks.reset(new DefaultGlobalCallbacks());
}
}
@@ -234,12 +234,12 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
}
}
- void Run() {
+ void Run(std::shared_ptr<GlobalCallbacks> global_callbacks) {
ctx_.BeginCompletionOp(&call_);
- g_callbacks->PreSynchronousRequest(&ctx_);
+ global_callbacks->PreSynchronousRequest(&ctx_);
method_->handler()->RunHandler(MethodHandler::HandlerParameter(
&call_, &ctx_, request_payload_, call_.max_message_size()));
- g_callbacks->PostSynchronousRequest(&ctx_);
+ global_callbacks->PostSynchronousRequest(&ctx_);
request_payload_ = nullptr;
void* ignored_tag;
bool ignored_ok;
@@ -287,6 +287,7 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
thread_pool_(thread_pool),
thread_pool_owned_(thread_pool_owned) {
gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks);
+ global_callbacks_ = g_callbacks;
grpc_server_register_completion_queue(server_, cq_.cq(), nullptr);
}
@@ -311,7 +312,7 @@ Server::~Server() {
void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
GPR_ASSERT(g_callbacks == nullptr);
GPR_ASSERT(callbacks != nullptr);
- g_callbacks = callbacks;
+ g_callbacks.reset(callbacks);
}
bool Server::RegisterService(const grpc::string* host, RpcService* service) {
@@ -569,7 +570,7 @@ void Server::RunRpc() {
}
}
GPR_TIMER_SCOPE("cd.Run()", 0);
- cd.Run();
+ cd.Run(global_callbacks_);
}
}
diff --git a/src/cpp/util/byte_buffer.cc b/src/cpp/util/byte_buffer.cc
index 2952f94b24..3a2318d1a6 100644
--- a/src/cpp/util/byte_buffer.cc
+++ b/src/cpp/util/byte_buffer.cc
@@ -31,8 +31,8 @@
*
*/
-#include <grpc/byte_buffer_reader.h>
#include <grpc++/support/byte_buffer.h>
+#include <grpc/byte_buffer_reader.h>
namespace grpc {
@@ -84,8 +84,10 @@ ByteBuffer::ByteBuffer(const ByteBuffer& buf)
: buffer_(grpc_byte_buffer_copy(buf.buffer_)) {}
ByteBuffer& ByteBuffer::operator=(const ByteBuffer& buf) {
- Clear(); // first remove existing data
- buffer_ = grpc_byte_buffer_copy(buf.buffer_); // then copy
+ Clear(); // first remove existing data
+ if (buf.buffer_) {
+ buffer_ = grpc_byte_buffer_copy(buf.buffer_); // then copy
+ }
return *this;
}
diff --git a/src/csharp/Grpc.Core/VersionInfo.cs b/src/csharp/Grpc.Core/VersionInfo.cs
index 4c6d50356c..65813909de 100644
--- a/src/csharp/Grpc.Core/VersionInfo.cs
+++ b/src/csharp/Grpc.Core/VersionInfo.cs
@@ -1,6 +1,6 @@
#region Copyright notice and license
-// Copyright 2015, Google Inc.
+// Copyright 2015-2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
diff --git a/src/node/ext/byte_buffer.cc b/src/node/ext/byte_buffer.cc
index c306292c04..ee703fdc91 100644
--- a/src/node/ext/byte_buffer.cc
+++ b/src/node/ext/byte_buffer.cc
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -69,7 +69,7 @@ Local<Value> ByteBufferToBuffer(grpc_byte_buffer *buffer) {
return scope.Escape(Nan::Null());
}
size_t length = grpc_byte_buffer_length(buffer);
- char *result = reinterpret_cast<char *>(calloc(length, sizeof(char)));
+ char *result = new char[length];
size_t offset = 0;
grpc_byte_buffer_reader reader;
grpc_byte_buffer_reader_init(&reader, buffer);
diff --git a/src/node/index.js b/src/node/index.js
index 0d1a7fd887..7eacdc67b1 100644
--- a/src/node/index.js
+++ b/src/node/index.js
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -51,7 +51,7 @@ var server = require('./src/server.js');
var Metadata = require('./src/metadata.js');
-var grpc = require('bindings')('grpc_node');
+var grpc = require('./src/grpc_extension');
/**
* Load a gRPC object from an existing ProtoBuf.Reflect object.
diff --git a/src/node/src/client.js b/src/node/src/client.js
index d57826781d..b5247a69ee 100644
--- a/src/node/src/client.js
+++ b/src/node/src/client.js
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -51,7 +51,7 @@
var _ = require('lodash');
-var grpc = require('bindings')('grpc_node');
+var grpc = require('./grpc_extension');
var common = require('./common');
diff --git a/src/node/src/credentials.js b/src/node/src/credentials.js
index dcbfac18f4..710ab6d879 100644
--- a/src/node/src/credentials.js
+++ b/src/node/src/credentials.js
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -61,7 +61,7 @@
'use strict';
-var grpc = require('bindings')('grpc_node.node');
+var grpc = require('./grpc_extension');
var CallCredentials = grpc.CallCredentials;
diff --git a/src/node/src/grpc_extension.js b/src/node/src/grpc_extension.js
new file mode 100644
index 0000000000..d4eca65fcb
--- /dev/null
+++ b/src/node/src/grpc_extension.js
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+var binary = require('node-pre-gyp');
+var path = require('path');
+var binding_path = binary.find(path.resolve(
+ path.join(__dirname,'../../../package.json')));
+var binding = require(binding_path);
+
+module.exports = binding;
diff --git a/src/node/src/metadata.js b/src/node/src/metadata.js
index fef79f959e..51a9f8a216 100644
--- a/src/node/src/metadata.js
+++ b/src/node/src/metadata.js
@@ -49,7 +49,7 @@
var _ = require('lodash');
-var grpc = require('bindings')('grpc_node');
+var grpc = require('./grpc_extension');
/**
* Class for storing metadata. Keys are normalized to lowercase ASCII.
diff --git a/src/node/src/server.js b/src/node/src/server.js
index ceaa9f5d1f..e5aadcd565 100644
--- a/src/node/src/server.js
+++ b/src/node/src/server.js
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -51,7 +51,7 @@
var _ = require('lodash');
-var grpc = require('bindings')('grpc_node');
+var grpc = require('./grpc_extension');
var common = require('./common');
diff --git a/src/node/test/call_test.js b/src/node/test/call_test.js
index f1f86b35db..2300096d03 100644
--- a/src/node/test/call_test.js
+++ b/src/node/test/call_test.js
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -34,7 +34,7 @@
'use strict';
var assert = require('assert');
-var grpc = require('bindings')('grpc_node');
+var grpc = require('../src/grpc_extension');
/**
* Helper function to return an absolute deadline given a relative timeout in
diff --git a/src/node/test/channel_test.js b/src/node/test/channel_test.js
index 7163a5fb5e..c0ae2b769a 100644
--- a/src/node/test/channel_test.js
+++ b/src/node/test/channel_test.js
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -34,7 +34,7 @@
'use strict';
var assert = require('assert');
-var grpc = require('bindings')('grpc_node');
+var grpc = require('../src/grpc_extension');
/**
* This is used for testing functions with multiple asynchronous calls that
diff --git a/src/node/test/constant_test.js b/src/node/test/constant_test.js
index b17cd339cb..712c70706d 100644
--- a/src/node/test/constant_test.js
+++ b/src/node/test/constant_test.js
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -34,7 +34,7 @@
'use strict';
var assert = require('assert');
-var grpc = require('bindings')('grpc_node');
+var grpc = require('../src/grpc_extension');
/**
* List of all status names
diff --git a/src/node/test/end_to_end_test.js b/src/node/test/end_to_end_test.js
index 0f6c5941c4..353c6c761d 100644
--- a/src/node/test/end_to_end_test.js
+++ b/src/node/test/end_to_end_test.js
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -34,7 +34,7 @@
'use strict';
var assert = require('assert');
-var grpc = require('bindings')('grpc_node');
+var grpc = require('../src/grpc_extension');
/**
* This is used for testing functions with multiple asynchronous calls that
diff --git a/src/node/test/server_test.js b/src/node/test/server_test.js
index 592f47e145..71a9647184 100644
--- a/src/node/test/server_test.js
+++ b/src/node/test/server_test.js
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -36,7 +36,7 @@
var assert = require('assert');
var fs = require('fs');
var path = require('path');
-var grpc = require('bindings')('grpc_node');
+var grpc = require('../src/grpc_extension');
describe('server', function() {
describe('constructor', function() {
diff --git a/src/node/test/surface_test.js b/src/node/test/surface_test.js
index fc765ed731..c65d95f767 100644
--- a/src/node/test/surface_test.js
+++ b/src/node/test/surface_test.js
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015, Google Inc.
+ * Copyright 2015-2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -952,6 +952,7 @@ describe('Call propagation', function() {
describe('Cancellation', function() {
it('With a unary call', function(done) {
done = multiDone(done, 2);
+ var call;
proxy_impl.unary = function(parent, callback) {
client.unary(parent.request, function(err, value) {
try {
@@ -969,12 +970,13 @@ describe('Call propagation', function() {
proxy.start();
var proxy_client = new Client('localhost:' + proxy_port,
grpc.credentials.createInsecure());
- var call = proxy_client.unary({}, function(err, value) {
+ call = proxy_client.unary({}, function(err, value) {
done();
});
});
it('With a client stream call', function(done) {
done = multiDone(done, 2);
+ var call;
proxy_impl.clientStream = function(parent, callback) {
client.clientStream(function(err, value) {
try {
@@ -992,12 +994,13 @@ describe('Call propagation', function() {
proxy.start();
var proxy_client = new Client('localhost:' + proxy_port,
grpc.credentials.createInsecure());
- var call = proxy_client.clientStream(function(err, value) {
+ call = proxy_client.clientStream(function(err, value) {
done();
});
});
it('With a server stream call', function(done) {
done = multiDone(done, 2);
+ var call;
proxy_impl.serverStream = function(parent) {
var child = client.serverStream(parent.request, null,
{parent: parent});
@@ -1013,13 +1016,14 @@ describe('Call propagation', function() {
proxy.start();
var proxy_client = new Client('localhost:' + proxy_port,
grpc.credentials.createInsecure());
- var call = proxy_client.serverStream({});
+ call = proxy_client.serverStream({});
call.on('error', function(err) {
done();
});
});
it('With a bidi stream call', function(done) {
done = multiDone(done, 2);
+ var call;
proxy_impl.bidiStream = function(parent) {
var child = client.bidiStream(null, {parent: parent});
child.on('error', function(err) {
@@ -1034,7 +1038,7 @@ describe('Call propagation', function() {
proxy.start();
var proxy_client = new Client('localhost:' + proxy_port,
grpc.credentials.createInsecure());
- var call = proxy_client.bidiStream();
+ call = proxy_client.bidiStream();
call.on('error', function(err) {
done();
});
diff --git a/src/proto/grpc/testing/control.proto b/src/proto/grpc/testing/control.proto
index 0784ebf91c..7ba6f9856f 100644
--- a/src/proto/grpc/testing/control.proto
+++ b/src/proto/grpc/testing/control.proto
@@ -1,4 +1,4 @@
-// Copyright 2015, Google Inc.
+// Copyright 2015-2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
@@ -42,6 +42,7 @@ enum ClientType {
enum ServerType {
SYNC_SERVER = 0;
ASYNC_SERVER = 1;
+ ASYNC_GENERIC_SERVER = 2;
}
enum RpcType {
diff --git a/src/proto/grpc/testing/echo_messages.proto b/src/proto/grpc/testing/echo_messages.proto
index f01d645af7..d05a35548d 100644
--- a/src/proto/grpc/testing/echo_messages.proto
+++ b/src/proto/grpc/testing/echo_messages.proto
@@ -1,5 +1,5 @@
-// Copyright 2015, Google Inc.
+// Copyright 2015-2016, Google Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
@@ -41,6 +41,7 @@ message RequestParams {
int32 response_message_length = 6;
bool echo_peer = 7;
string expected_client_identity = 8; // will force check_auth_context.
+ bool skip_cancelled_check = 9;
}
message EchoRequest {
diff --git a/src/python/grpcio/grpc/framework/foundation/logging_pool.py b/src/python/grpcio/grpc/framework/foundation/logging_pool.py
index 7c7a6eebfc..f82c7f7fba 100644
--- a/src/python/grpcio/grpc/framework/foundation/logging_pool.py
+++ b/src/python/grpcio/grpc/framework/foundation/logging_pool.py
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -29,7 +29,6 @@
"""A thread pool that logs exceptions raised by tasks executed within it."""
-import functools
import logging
from concurrent import futures
@@ -37,12 +36,12 @@ from concurrent import futures
def _wrap(behavior):
"""Wraps an arbitrary callable behavior in exception-logging."""
- @functools.wraps(behavior)
def _wrapping(*args, **kwargs):
try:
return behavior(*args, **kwargs)
except Exception as e:
- logging.exception('Unexpected exception from task run in logging pool!')
+ logging.exception(
+ 'Unexpected exception from %s executed in logging pool!', behavior)
raise
return _wrapping
diff --git a/src/python/grpcio/tests/unit/framework/foundation/_logging_pool_test.py b/src/python/grpcio/tests/unit/framework/foundation/_logging_pool_test.py
index 452802da6a..0521e1c102 100644
--- a/src/python/grpcio/tests/unit/framework/foundation/_logging_pool_test.py
+++ b/src/python/grpcio/tests/unit/framework/foundation/_logging_pool_test.py
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -29,6 +29,7 @@
"""Tests for grpc.framework.foundation.logging_pool."""
+import threading
import unittest
from grpc.framework.foundation import logging_pool
@@ -36,6 +37,21 @@ from grpc.framework.foundation import logging_pool
_POOL_SIZE = 16
+class _CallableObject(object):
+
+ def __init__(self):
+ self._lock = threading.Lock()
+ self._passed_values = []
+
+ def __call__(self, value):
+ with self._lock:
+ self._passed_values.append(value)
+
+ def passed_values(self):
+ with self._lock:
+ return tuple(self._passed_values)
+
+
class LoggingPoolTest(unittest.TestCase):
def testUpAndDown(self):
@@ -59,6 +75,14 @@ class LoggingPoolTest(unittest.TestCase):
self.assertIsNotNone(raised_exception)
+ def testCallableObjectExecuted(self):
+ callable_object = _CallableObject()
+ passed_object = object()
+ with logging_pool.pool(_POOL_SIZE) as pool:
+ future = pool.submit(callable_object, passed_object)
+ self.assertIsNone(future.result())
+ self.assertSequenceEqual((passed_object,), callable_object.passed_values())
+
if __name__ == '__main__':
unittest.main(verbosity=2)
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
index 3bcefa601d..c8a3a1bc74 100644
--- a/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
+++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_blocking_invocation_inline_service.py
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -30,9 +30,12 @@
"""Test code for the Face layer of RPC Framework."""
import abc
+import itertools
import unittest
+from concurrent import futures
# test_interfaces is referenced from specification in this module.
+from grpc.framework.foundation import logging_pool
from grpc.framework.interfaces.face import face
from tests.unit.framework.common import test_constants
from tests.unit.framework.common import test_control
@@ -139,13 +142,50 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
test_messages.verify(second_request, second_response, self)
- @unittest.skip('Parallel invocations impossible with blocking control flow!')
def testParallelInvocations(self):
- raise NotImplementedError()
+ pool = logging_pool.pool(test_constants.PARALLELISM)
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = []
+ response_futures = []
+ for _ in range(test_constants.PARALLELISM):
+ request = test_messages.request()
+ response_future = pool.submit(
+ self._invoker.blocking(group, method), request,
+ test_constants.LONG_TIMEOUT)
+ requests.append(request)
+ response_futures.append(response_future)
+
+ responses = [
+ response_future.result() for response_future in response_futures]
+
+ for request, response in zip(requests, responses):
+ test_messages.verify(request, response, self)
+ pool.shutdown(wait=True)
- @unittest.skip('Parallel invocations impossible with blocking control flow!')
def testWaitingForSomeButNotAllParallelInvocations(self):
- raise NotImplementedError()
+ pool = logging_pool.pool(test_constants.PARALLELISM)
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ requests = []
+ response_futures_to_indices = {}
+ for index in range(test_constants.PARALLELISM):
+ request = test_messages.request()
+ response_future = pool.submit(
+ self._invoker.blocking(group, method), request,
+ test_constants.LONG_TIMEOUT)
+ requests.append(request)
+ response_futures_to_indices[response_future] = index
+
+ some_completed_response_futures_iterator = itertools.islice(
+ futures.as_completed(response_futures_to_indices),
+ test_constants.PARALLELISM / 2)
+ for response_future in some_completed_response_futures_iterator:
+ index = response_futures_to_indices[response_future]
+ test_messages.verify(requests[index], response_future.result(), self)
+ pool.shutdown(wait=True)
@unittest.skip('Cancellation impossible with blocking control flow!')
def testCancelledUnaryRequestUnaryResponse(self):
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
index fc8daa992f..1d36a931e8 100644
--- a/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
+++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py
@@ -31,8 +31,10 @@
import abc
import contextlib
+import itertools
import threading
import unittest
+from concurrent import futures
# test_interfaces is referenced from specification in this module.
from grpc.framework.foundation import logging_pool
@@ -219,6 +221,23 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
test_messages.verify(second_request, second_response, self)
+ def testParallelInvocations(self):
+ for (group, method), test_messages_sequence in (
+ self._digest.unary_unary_messages_sequences.iteritems()):
+ for test_messages in test_messages_sequence:
+ first_request = test_messages.request()
+ second_request = test_messages.request()
+
+ first_response_future = self._invoker.future(group, method)(
+ first_request, test_constants.LONG_TIMEOUT)
+ second_response_future = self._invoker.future(group, method)(
+ second_request, test_constants.LONG_TIMEOUT)
+ first_response = first_response_future.result()
+ second_response = second_response_future.result()
+
+ test_messages.verify(first_request, first_response, self)
+ test_messages.verify(second_request, second_response, self)
+
for (group, method), test_messages_sequence in (
self._digest.unary_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
@@ -237,26 +256,28 @@ class TestCase(test_coverage.Coverage, unittest.TestCase):
for request, response in zip(requests, responses):
test_messages.verify(request, response, self)
- def testParallelInvocations(self):
+ def testWaitingForSomeButNotAllParallelInvocations(self):
+ pool = logging_pool.pool(test_constants.PARALLELISM)
for (group, method), test_messages_sequence in (
self._digest.unary_unary_messages_sequences.iteritems()):
for test_messages in test_messages_sequence:
- first_request = test_messages.request()
- second_request = test_messages.request()
-
- first_response_future = self._invoker.future(group, method)(
- first_request, test_constants.LONG_TIMEOUT)
- second_response_future = self._invoker.future(group, method)(
- second_request, test_constants.LONG_TIMEOUT)
- first_response = first_response_future.result()
- second_response = second_response_future.result()
-
- test_messages.verify(first_request, first_response, self)
- test_messages.verify(second_request, second_response, self)
-
- @unittest.skip('TODO(nathaniel): implement.')
- def testWaitingForSomeButNotAllParallelInvocations(self):
- raise NotImplementedError()
+ requests = []
+ response_futures_to_indices = {}
+ for index in range(test_constants.PARALLELISM):
+ request = test_messages.request()
+ inner_response_future = self._invoker.future(group, method)(
+ request, test_constants.LONG_TIMEOUT)
+ outer_response_future = pool.submit(inner_response_future.result)
+ requests.append(request)
+ response_futures_to_indices[outer_response_future] = index
+
+ some_completed_response_futures_iterator = itertools.islice(
+ futures.as_completed(response_futures_to_indices),
+ test_constants.PARALLELISM / 2)
+ for response_future in some_completed_response_futures_iterator:
+ index = response_futures_to_indices[response_future]
+ test_messages.verify(requests[index], response_future.result(), self)
+ pool.shutdown(wait=True)
def testCancelledUnaryRequestUnaryResponse(self):
for (group, method), test_messages_sequence in (
diff --git a/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py b/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py
index 2e444ff09d..42a7f4e3b8 100644
--- a/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py
+++ b/src/python/grpcio/tests/unit/framework/interfaces/face/_receiver.py
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -76,7 +76,7 @@ class Receiver(face.ResponseReceiver):
def unary_response(self):
with self._condition:
if self._abortion is not None:
- raise AssertionError('Aborted with abortion "%s"!' % self._abortion)
+ raise AssertionError('Aborted: "{}"!'.format(self._abortion))
elif len(self._responses) != 1:
raise AssertionError(
'%d responses received, not exactly one!', len(self._responses))
@@ -88,7 +88,7 @@ class Receiver(face.ResponseReceiver):
if self._abortion is None:
return list(self._responses)
else:
- raise AssertionError('Aborted with abortion "%s"!' % self._abortion)
+ raise AssertionError('Aborted: "{}"!'.format(self._abortion))
def abortion(self):
with self._condition:
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index 43adafb73f..8f33d72417 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -525,12 +525,12 @@ typedef struct run_batch_stack {
grpc_status_code recv_status;
char *recv_status_details;
size_t recv_status_details_capacity;
- uint write_flag;
+ unsigned write_flag;
} run_batch_stack;
/* grpc_run_batch_stack_init ensures the run_batch_stack is properly
* initialized */
-static void grpc_run_batch_stack_init(run_batch_stack *st, uint write_flag) {
+static void grpc_run_batch_stack_init(run_batch_stack *st, unsigned write_flag) {
MEMZERO(st, run_batch_stack, 1);
grpc_metadata_array_init(&st->send_metadata);
grpc_metadata_array_init(&st->send_trailing_metadata);
@@ -696,7 +696,7 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
grpc_call_error err;
VALUE result = Qnil;
VALUE rb_write_flag = rb_ivar_get(self, id_write_flag);
- uint write_flag = 0;
+ unsigned write_flag = 0;
TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
/* Validate the ops args, adding them to a ruby array */
diff --git a/src/ruby/ext/grpc/rb_call_credentials.c b/src/ruby/ext/grpc/rb_call_credentials.c
index 4d719d7541..ebcc6592c2 100644
--- a/src/ruby/ext/grpc/rb_call_credentials.c
+++ b/src/ruby/ext/grpc/rb_call_credentials.c
@@ -79,6 +79,7 @@ static VALUE grpc_rb_call_credentials_callback(VALUE callback_args) {
static VALUE grpc_rb_call_credentials_callback_rescue(VALUE args,
VALUE exception_object) {
VALUE result = rb_hash_new();
+ (void)args;
rb_hash_aset(result, rb_str_new2("metadata"), Qnil);
/* Currently only gives the exception class name. It should be possible get
more details */
@@ -132,6 +133,7 @@ static void grpc_rb_call_credentials_plugin_get_metadata(
}
static void grpc_rb_call_credentials_plugin_destroy(void *state) {
+ (void)state;
// Not sure what needs to be done here
}
diff --git a/src/ruby/ext/grpc/rb_event_thread.c b/src/ruby/ext/grpc/rb_event_thread.c
index 95af091317..516f0bdad2 100644
--- a/src/ruby/ext/grpc/rb_event_thread.c
+++ b/src/ruby/ext/grpc/rb_event_thread.c
@@ -102,6 +102,7 @@ static void grpc_rb_event_queue_destroy() {
static void *grpc_rb_wait_for_event_no_gil(void *param) {
grpc_rb_event *event = NULL;
+ (void)param;
gpr_mu_lock(&event_queue.mu);
while ((event = grpc_rb_event_queue_dequeue()) == NULL) {
gpr_cv_wait(&event_queue.cv,
@@ -117,6 +118,7 @@ static void *grpc_rb_wait_for_event_no_gil(void *param) {
}
static void grpc_rb_event_unblocking_func(void *arg) {
+ (void)arg;
gpr_mu_lock(&event_queue.mu);
event_queue.abort = true;
gpr_cv_signal(&event_queue.cv);
@@ -127,6 +129,7 @@ static void grpc_rb_event_unblocking_func(void *arg) {
* events */
static VALUE grpc_rb_event_thread(VALUE arg) {
grpc_rb_event *event;
+ (void)arg;
while(true) {
event = (grpc_rb_event*)rb_thread_call_without_gvl(
grpc_rb_wait_for_event_no_gil, NULL,