aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Robbie Shade <robbie.shade@gmail.com>2015-08-13 19:07:17 -0400
committerGravatar Robbie Shade <robbie.shade@gmail.com>2015-08-13 19:07:17 -0400
commit5c45b940e7c49cca3bffea3effce5271f644c3de (patch)
tree3116f77759522bbe7013313029a430db66c1bf59 /src
parent00436a256dd81b10df5da1154f48b9f81056824b (diff)
parentc49a48bd7545466d6535b0a548854ce8954ea045 (diff)
Merge remote-tracking branch 'upstream/master' into add_udp_server_2
Diffstat (limited to 'src')
-rw-r--r--src/core/surface/call.c17
-rw-r--r--src/core/surface/channel.c9
-rw-r--r--src/core/surface/channel_create.c4
-rw-r--r--src/core/surface/completion_queue.c10
-rw-r--r--src/core/surface/server.c4
-rw-r--r--src/core/surface/server_create.c3
-rw-r--r--src/core/transport/stream_op.c1
-rw-r--r--src/core/transport/stream_op.h1
-rw-r--r--src/cpp/client/channel.cc9
-rw-r--r--src/cpp/client/client_context.cc8
-rw-r--r--src/cpp/client/insecure_credentials.cc2
-rw-r--r--src/cpp/common/completion_queue.cc14
-rw-r--r--src/cpp/server/server.cc56
-rw-r--r--src/cpp/server/server_builder.cc10
-rw-r--r--src/cpp/server/server_context.cc1
-rw-r--r--src/csharp/ext/grpc_csharp_ext.c47
-rw-r--r--src/node/ext/call.cc8
-rw-r--r--src/node/ext/channel.cc5
-rw-r--r--src/node/ext/completion_queue_async_worker.cc4
-rw-r--r--src/node/ext/server.cc12
-rw-r--r--src/objective-c/GRPCClient/private/GRPCCompletionQueue.m5
-rw-r--r--src/objective-c/GRPCClient/private/GRPCHost.m2
-rw-r--r--src/objective-c/GRPCClient/private/GRPCUnsecuredChannel.m2
-rw-r--r--src/objective-c/GRPCClient/private/GRPCWrappedCall.m4
-rw-r--r--src/php/ext/grpc/call.c9
-rw-r--r--src/php/ext/grpc/channel.c6
-rw-r--r--src/php/ext/grpc/completion_queue.c7
-rw-r--r--src/php/ext/grpc/server.c11
-rw-r--r--src/python/grpcio/grpc/_adapter/_c/types/call.c6
-rw-r--r--src/python/grpcio/grpc/_adapter/_c/types/channel.c4
-rw-r--r--src/python/grpcio/grpc/_adapter/_c/types/completion_queue.c4
-rw-r--r--src/python/grpcio/grpc/_adapter/_c/types/server.c4
-rw-r--r--src/ruby/ext/grpc/rb_call.c4
-rw-r--r--src/ruby/ext/grpc/rb_channel.c4
-rw-r--r--src/ruby/ext/grpc/rb_completion_queue.c6
-rw-r--r--src/ruby/ext/grpc/rb_server.c4
36 files changed, 190 insertions, 117 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 5839d3ac2e..6a1a6cbf30 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -964,7 +964,7 @@ static void call_on_done_recv(void *pc, int success) {
next_child_call = child_call->sibling_next;
if (child_call->cancellation_is_inherited) {
GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel");
- grpc_call_cancel(child_call);
+ grpc_call_cancel(child_call, NULL);
GRPC_CALL_INTERNAL_UNREF(child_call, "propagate_cancel", 0);
}
child_call = next_child_call;
@@ -1265,18 +1265,22 @@ void grpc_call_destroy(grpc_call *c) {
c->cancel_alarm |= c->have_alarm;
cancel = c->read_state != READ_STATE_STREAM_CLOSED;
unlock(c);
- if (cancel) grpc_call_cancel(c);
+ if (cancel) grpc_call_cancel(c, NULL);
GRPC_CALL_INTERNAL_UNREF(c, "destroy", 1);
}
-grpc_call_error grpc_call_cancel(grpc_call *call) {
- return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled");
+grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) {
+ GPR_ASSERT(!reserved);
+ return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled",
+ NULL);
}
grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
grpc_status_code status,
- const char *description) {
+ const char *description,
+ void *reserved) {
grpc_call_error r;
+ (void) reserved;
lock(c);
r = cancel_with_status(c, status, description);
unlock(c);
@@ -1513,13 +1517,14 @@ static int are_write_flags_valid(gpr_uint32 flags) {
}
grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
- size_t nops, void *tag) {
+ size_t nops, void *tag, void *reserved) {
grpc_ioreq reqs[GRPC_IOREQ_OP_COUNT];
size_t in;
size_t out;
const grpc_op *op;
grpc_ioreq *req;
void (*finish_func)(grpc_call *, int, void *) = finish_batch;
+ GPR_ASSERT(!reserved);
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag);
diff --git a/src/core/surface/channel.c b/src/core/surface/channel.c
index 8692aa3903..308572c634 100644
--- a/src/core/surface/channel.c
+++ b/src/core/surface/channel.c
@@ -168,7 +168,8 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel,
gpr_uint32 propagation_mask,
grpc_completion_queue *cq,
const char *method, const char *host,
- gpr_timespec deadline) {
+ gpr_timespec deadline, void *reserved) {
+ GPR_ASSERT(!reserved);
return grpc_channel_create_call_internal(
channel, parent_call, propagation_mask, cq,
grpc_mdelem_from_metadata_strings(
@@ -182,8 +183,9 @@ grpc_call *grpc_channel_create_call(grpc_channel *channel,
}
void *grpc_channel_register_call(grpc_channel *channel, const char *method,
- const char *host) {
+ const char *host, void *reserved) {
registered_call *rc = gpr_malloc(sizeof(registered_call));
+ GPR_ASSERT(!reserved);
rc->path = grpc_mdelem_from_metadata_strings(
channel->metadata_context, GRPC_MDSTR_REF(channel->path_string),
grpc_mdstr_from_string(channel->metadata_context, method, 0));
@@ -200,8 +202,9 @@ void *grpc_channel_register_call(grpc_channel *channel, const char *method,
grpc_call *grpc_channel_create_registered_call(
grpc_channel *channel, grpc_call *parent_call, gpr_uint32 propagation_mask,
grpc_completion_queue *completion_queue, void *registered_call_handle,
- gpr_timespec deadline) {
+ gpr_timespec deadline, void *reserved) {
registered_call *rc = registered_call_handle;
+ GPR_ASSERT(!reserved);
return grpc_channel_create_call_internal(
channel, parent_call, propagation_mask, completion_queue,
GRPC_MDELEM_REF(rc->path),
diff --git a/src/core/surface/channel_create.c b/src/core/surface/channel_create.c
index 707d615688..82ddfac757 100644
--- a/src/core/surface/channel_create.c
+++ b/src/core/surface/channel_create.c
@@ -155,7 +155,8 @@ static const grpc_subchannel_factory_vtable subchannel_factory_vtable = {
- connect to it (trying alternatives as presented)
- perform handshakes */
grpc_channel *grpc_insecure_channel_create(const char *target,
- const grpc_channel_args *args) {
+ const grpc_channel_args *args,
+ void *reserved) {
grpc_channel *channel = NULL;
#define MAX_FILTERS 3
const grpc_channel_filter *filters[MAX_FILTERS];
@@ -163,6 +164,7 @@ grpc_channel *grpc_insecure_channel_create(const char *target,
subchannel_factory *f;
grpc_mdctx *mdctx = grpc_mdctx_create();
int n = 0;
+ GPR_ASSERT(!reserved);
/* TODO(census)
if (grpc_channel_args_is_census_enabled(args)) {
filters[n++] = &grpc_client_census_filter;
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 36d69cfe5f..378b3f71a1 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -69,8 +69,9 @@ struct grpc_completion_queue {
plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
};
-grpc_completion_queue *grpc_completion_queue_create(void) {
+grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue));
+ GPR_ASSERT(!reserved);
memset(cc, 0, sizeof(*cc));
/* Initial ref is dropped by grpc_completion_queue_shutdown */
gpr_ref_init(&cc->pending_events, 1);
@@ -166,9 +167,11 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, int success,
}
grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
- gpr_timespec deadline) {
+ gpr_timespec deadline,
+ void *reserved) {
grpc_event ret;
grpc_pollset_worker worker;
+ GPR_ASSERT(!reserved);
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
@@ -232,11 +235,12 @@ static void del_plucker(grpc_completion_queue *cc, void *tag,
}
grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
- gpr_timespec deadline) {
+ gpr_timespec deadline, void *reserved) {
grpc_event ret;
grpc_cq_completion *c;
grpc_cq_completion *prev;
grpc_pollset_worker worker;
+ GPR_ASSERT(!reserved);
deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index cd1dc589e1..f883275951 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -761,8 +761,10 @@ static const grpc_channel_filter server_surface_filter = {
};
void grpc_server_register_completion_queue(grpc_server *server,
- grpc_completion_queue *cq) {
+ grpc_completion_queue *cq,
+ void *reserved) {
size_t i, n;
+ GPR_ASSERT(!reserved);
for (i = 0; i < server->cq_count; i++) {
if (server->cqs[i] == cq) return;
}
diff --git a/src/core/surface/server_create.c b/src/core/surface/server_create.c
index 1e26c67693..9237eb5a90 100644
--- a/src/core/surface/server_create.c
+++ b/src/core/surface/server_create.c
@@ -36,8 +36,9 @@
#include "src/core/surface/server.h"
#include "src/core/channel/compress_filter.h"
-grpc_server *grpc_server_create(const grpc_channel_args *args) {
+grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) {
const grpc_channel_filter *filters[] = {&grpc_compress_filter};
+ (void) reserved;
return grpc_server_create_from_filters(filters, GPR_ARRAY_SIZE(filters),
args);
}
diff --git a/src/core/transport/stream_op.c b/src/core/transport/stream_op.c
index a5dfec9d50..0a9669b0ab 100644
--- a/src/core/transport/stream_op.c
+++ b/src/core/transport/stream_op.c
@@ -258,6 +258,7 @@ static void link_tail(grpc_mdelem_list *list, grpc_linked_mdelem *storage) {
GPR_ASSERT(storage->md);
storage->prev = list->tail;
storage->next = NULL;
+ storage->reserved = NULL;
if (list->tail != NULL) {
list->tail->next = storage;
} else {
diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h
index 227320cf2a..37f18b02d9 100644
--- a/src/core/transport/stream_op.h
+++ b/src/core/transport/stream_op.h
@@ -77,6 +77,7 @@ typedef struct grpc_linked_mdelem {
grpc_mdelem *md;
struct grpc_linked_mdelem *next;
struct grpc_linked_mdelem *prev;
+ void *reserved;
} grpc_linked_mdelem;
typedef struct grpc_mdelem_list {
diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc
index 1c2eecf786..0582b59a6d 100644
--- a/src/cpp/client/channel.cc
+++ b/src/cpp/client/channel.cc
@@ -67,7 +67,7 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
c_call = grpc_channel_create_registered_call(
c_channel_, context->propagate_from_call_,
context->propagation_options_.c_bitmask(), cq->cq(),
- method.channel_tag(), context->raw_deadline());
+ method.channel_tag(), context->raw_deadline(), nullptr);
} else {
const char* host_str = NULL;
if (!context->authority().empty()) {
@@ -78,7 +78,7 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
c_call = grpc_channel_create_call(c_channel_, context->propagate_from_call_,
context->propagation_options_.c_bitmask(),
cq->cq(), method.name(), host_str,
- context->raw_deadline());
+ context->raw_deadline(), nullptr);
}
grpc_census_call_set_context(c_call, context->census_context());
GRPC_TIMER_MARK(GRPC_PTAG_CPP_CALL_CREATED, c_call);
@@ -93,13 +93,14 @@ void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
GRPC_TIMER_BEGIN(GRPC_PTAG_CPP_PERFORM_OPS, call->call());
ops->FillOps(cops, &nops);
GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_batch(call->call(), cops, nops, ops));
+ grpc_call_start_batch(call->call(), cops, nops, ops, nullptr));
GRPC_TIMER_END(GRPC_PTAG_CPP_PERFORM_OPS, call->call());
}
void* Channel::RegisterMethod(const char* method) {
return grpc_channel_register_call(c_channel_, method,
- host_.empty() ? NULL : host_.c_str());
+ host_.empty() ? NULL : host_.c_str(),
+ nullptr);
}
grpc_connectivity_state Channel::GetState(bool try_to_connect) {
diff --git a/src/cpp/client/client_context.cc b/src/cpp/client/client_context.cc
index dd86e7b108..b8caa1eae4 100644
--- a/src/cpp/client/client_context.cc
+++ b/src/cpp/client/client_context.cc
@@ -77,19 +77,19 @@ void ClientContext::set_call(grpc_call* call,
channel_ = channel;
if (creds_ && !creds_->ApplyToCall(call_)) {
grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED,
- "Failed to set credentials to rpc.");
+ "Failed to set credentials to rpc.", nullptr);
}
}
void ClientContext::set_compression_algorithm(
grpc_compression_algorithm algorithm) {
- char* algorithm_name = NULL;
+ char* algorithm_name = nullptr;
if (!grpc_compression_algorithm_name(algorithm, &algorithm_name)) {
gpr_log(GPR_ERROR, "Name for compression algorithm '%d' unknown.",
algorithm);
abort();
}
- GPR_ASSERT(algorithm_name != NULL);
+ GPR_ASSERT(algorithm_name != nullptr);
AddMetadata(GRPC_COMPRESS_REQUEST_ALGORITHM_KEY, algorithm_name);
}
@@ -102,7 +102,7 @@ std::shared_ptr<const AuthContext> ClientContext::auth_context() const {
void ClientContext::TryCancel() {
if (call_) {
- grpc_call_cancel(call_);
+ grpc_call_cancel(call_, nullptr);
}
}
diff --git a/src/cpp/client/insecure_credentials.cc b/src/cpp/client/insecure_credentials.cc
index d8dcaa1436..2f9357b568 100644
--- a/src/cpp/client/insecure_credentials.cc
+++ b/src/cpp/client/insecure_credentials.cc
@@ -49,7 +49,7 @@ class InsecureCredentialsImpl GRPC_FINAL : public Credentials {
grpc_channel_args channel_args;
args.SetChannelArgs(&channel_args);
return std::shared_ptr<ChannelInterface>(new Channel(
- grpc_insecure_channel_create(target.c_str(), &channel_args)));
+ grpc_insecure_channel_create(target.c_str(), &channel_args, nullptr)));
}
// InsecureCredentials should not be applied to a call.
diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc
index 593963f672..fca33f8f54 100644
--- a/src/cpp/common/completion_queue.cc
+++ b/src/cpp/common/completion_queue.cc
@@ -40,7 +40,9 @@
namespace grpc {
-CompletionQueue::CompletionQueue() { cq_ = grpc_completion_queue_create(); }
+CompletionQueue::CompletionQueue() {
+ cq_ = grpc_completion_queue_create(nullptr);
+}
CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) {}
@@ -51,7 +53,7 @@ void CompletionQueue::Shutdown() { grpc_completion_queue_shutdown(cq_); }
CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
void** tag, bool* ok, gpr_timespec deadline) {
for (;;) {
- auto ev = grpc_completion_queue_next(cq_, deadline);
+ auto ev = grpc_completion_queue_next(cq_, deadline, nullptr);
switch (ev.type) {
case GRPC_QUEUE_TIMEOUT:
return TIMEOUT;
@@ -70,8 +72,8 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
}
bool CompletionQueue::Pluck(CompletionQueueTag* tag) {
- auto ev =
- grpc_completion_queue_pluck(cq_, tag, gpr_inf_future(GPR_CLOCK_REALTIME));
+ auto deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
+ auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr);
bool ok = ev.success != 0;
void* ignored = tag;
GPR_ASSERT(tag->FinalizeResult(&ignored, &ok));
@@ -81,8 +83,8 @@ bool CompletionQueue::Pluck(CompletionQueueTag* tag) {
}
void CompletionQueue::TryPluck(CompletionQueueTag* tag) {
- auto ev =
- grpc_completion_queue_pluck(cq_, tag, gpr_time_0(GPR_CLOCK_REALTIME));
+ auto deadline = gpr_time_0(GPR_CLOCK_REALTIME);
+ auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr);
if (ev.type == GRPC_QUEUE_TIMEOUT) return;
bool ok = ev.success != 0;
void* ignored = tag;
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index ab87b22f5f..90f3854a72 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -67,11 +67,17 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
method->method_type() ==
RpcMethod::SERVER_STREAMING),
+ call_details_(nullptr),
cq_(nullptr) {
grpc_metadata_array_init(&request_metadata_);
}
- ~SyncRequest() { grpc_metadata_array_destroy(&request_metadata_); }
+ ~SyncRequest() {
+ if (call_details_) {
+ delete call_details_;
+ }
+ grpc_metadata_array_destroy(&request_metadata_);
+ }
static SyncRequest* Wait(CompletionQueue* cq, bool* ok) {
void* tag = nullptr;
@@ -84,7 +90,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
return mrd;
}
- void SetupRequest() { cq_ = grpc_completion_queue_create(); }
+ void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); }
void TeardownRequest() {
grpc_completion_queue_destroy(cq_);
@@ -94,17 +100,32 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
GPR_ASSERT(cq_ && !in_flight_);
in_flight_ = true;
- GPR_ASSERT(GRPC_CALL_OK ==
- grpc_server_request_registered_call(
- server, tag_, &call_, &deadline_, &request_metadata_,
- has_request_payload_ ? &request_payload_ : nullptr, cq_,
- notify_cq, this));
+ if (tag_) {
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_server_request_registered_call(
+ server, tag_, &call_, &deadline_, &request_metadata_,
+ has_request_payload_ ? &request_payload_ : nullptr, cq_,
+ notify_cq, this));
+ } else {
+ if (!call_details_) {
+ call_details_ = new grpc_call_details;
+ grpc_call_details_init(call_details_);
+ }
+ GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
+ server, &call_, call_details_,
+ &request_metadata_, cq_, notify_cq, this));
+ }
}
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
if (!*status) {
grpc_completion_queue_destroy(cq_);
}
+ if (call_details_) {
+ deadline_ = call_details_->deadline;
+ grpc_call_details_destroy(call_details_);
+ grpc_call_details_init(call_details_);
+ }
return true;
}
@@ -157,6 +178,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
bool in_flight_;
const bool has_request_payload_;
grpc_call* call_;
+ grpc_call_details* call_details_;
gpr_timespec deadline_;
grpc_metadata_array request_metadata_;
grpc_byte_buffer* request_payload_;
@@ -170,9 +192,9 @@ static grpc_server* CreateServer(int max_message_size) {
arg.key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH);
arg.value.integer = max_message_size;
grpc_channel_args args = {1, &arg};
- return grpc_server_create(&args);
+ return grpc_server_create(&args, nullptr);
} else {
- return grpc_server_create(nullptr);
+ return grpc_server_create(nullptr, nullptr);
}
}
@@ -183,10 +205,11 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
shutdown_(false),
num_running_cb_(0),
sync_methods_(new std::list<SyncRequest>),
+ has_generic_service_(false),
server_(CreateServer(max_message_size)),
thread_pool_(thread_pool),
thread_pool_owned_(thread_pool_owned) {
- grpc_server_register_completion_queue(server_, cq_.cq());
+ grpc_server_register_completion_queue(server_, cq_.cq(), nullptr);
}
Server::~Server() {
@@ -223,7 +246,8 @@ bool Server::RegisterService(const grpc::string *host, RpcService* service) {
return true;
}
-bool Server::RegisterAsyncService(const grpc::string *host, AsynchronousService* service) {
+bool Server::RegisterAsyncService(const grpc::string* host,
+ AsynchronousService* service) {
GPR_ASSERT(service->server_ == nullptr &&
"Can only register an asynchronous service against one server.");
service->server_ = this;
@@ -245,6 +269,7 @@ void Server::RegisterAsyncGenericService(AsyncGenericService* service) {
GPR_ASSERT(service->server_ == nullptr &&
"Can only register an async generic service against one server.");
service->server_ = this;
+ has_generic_service_ = true;
}
int Server::AddListeningPort(const grpc::string& addr,
@@ -258,6 +283,11 @@ bool Server::Start() {
started_ = true;
grpc_server_start(server_);
+ if (!has_generic_service_) {
+ unknown_method_.reset(new RpcServiceMethod(
+ "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
+ sync_methods_->emplace_back(unknown_method_.get(), nullptr);
+ }
// Start processing rpcs.
if (!sync_methods_->empty()) {
for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
@@ -297,8 +327,8 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
size_t nops = 0;
grpc_op cops[MAX_OPS];
ops->FillOps(cops, &nops);
- GPR_ASSERT(GRPC_CALL_OK ==
- grpc_call_start_batch(call->call(), cops, nops, ops));
+ auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr);
+ GPR_ASSERT(GRPC_CALL_OK == result);
}
Server::BaseAsyncRequest::BaseAsyncRequest(
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index f723d4611a..09118879f4 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -38,6 +38,7 @@
#include <grpc++/impl/service_type.h>
#include <grpc++/server.h>
#include <grpc++/thread_pool_interface.h>
+#include <grpc++/fixed_size_thread_pool.h>
namespace grpc {
@@ -100,10 +101,17 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
thread_pool_ = CreateDefaultThreadPool();
thread_pool_owned = true;
}
+ // Async services only, create a thread pool to handle requests to unknown
+ // services.
+ if (!thread_pool_ && !generic_service_ && !async_services_.empty()) {
+ thread_pool_ = new FixedSizeThreadPool(1);
+ thread_pool_owned = true;
+ }
std::unique_ptr<Server> server(
new Server(thread_pool_, thread_pool_owned, max_message_size_));
for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
- grpc_server_register_completion_queue(server->server_, (*cq)->cq());
+ grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
+ nullptr);
}
for (auto service = services_.begin(); service != services_.end();
service++) {
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index 04373397f9..bb34040a2f 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -91,6 +91,7 @@ void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) {
ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
ops->data.recv_close_on_server.cancelled = &cancelled_;
ops->flags = 0;
+ ops->reserved = NULL;
*nops = 1;
}
diff --git a/src/csharp/ext/grpc_csharp_ext.c b/src/csharp/ext/grpc_csharp_ext.c
index 9379ae01f1..bf2bbd873b 100644
--- a/src/csharp/ext/grpc_csharp_ext.c
+++ b/src/csharp/ext/grpc_csharp_ext.c
@@ -339,7 +339,7 @@ GPR_EXPORT void GPR_CALLTYPE grpcsharp_shutdown(void) { grpc_shutdown(); }
GPR_EXPORT grpc_completion_queue *GPR_CALLTYPE
grpcsharp_completion_queue_create(void) {
- return grpc_completion_queue_create();
+ return grpc_completion_queue_create(NULL);
}
GPR_EXPORT void GPR_CALLTYPE
@@ -354,13 +354,14 @@ grpcsharp_completion_queue_destroy(grpc_completion_queue *cq) {
GPR_EXPORT grpc_event GPR_CALLTYPE
grpcsharp_completion_queue_next(grpc_completion_queue *cq) {
- return grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME));
+ return grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME),
+ NULL);
}
GPR_EXPORT grpc_event GPR_CALLTYPE
grpcsharp_completion_queue_pluck(grpc_completion_queue *cq, void *tag) {
return grpc_completion_queue_pluck(cq, tag,
- gpr_inf_future(GPR_CLOCK_REALTIME));
+ gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
}
/* Channel */
@@ -368,7 +369,7 @@ grpcsharp_completion_queue_pluck(grpc_completion_queue *cq, void *tag) {
GPR_EXPORT grpc_channel *GPR_CALLTYPE
grpcsharp_insecure_channel_create(const char *target, const grpc_channel_args *args) {
- return grpc_insecure_channel_create(target, args);
+ return grpc_insecure_channel_create(target, args, NULL);
}
GPR_EXPORT void GPR_CALLTYPE grpcsharp_channel_destroy(grpc_channel *channel) {
@@ -382,7 +383,7 @@ grpcsharp_channel_create_call(grpc_channel *channel, grpc_call *parent_call,
const char *method, const char *host,
gpr_timespec deadline) {
return grpc_channel_create_call(channel, parent_call, propagation_mask, cq,
- method, host, deadline);
+ method, host, deadline, NULL);
}
GPR_EXPORT grpc_connectivity_state GPR_CALLTYPE
@@ -475,13 +476,13 @@ GPR_EXPORT gpr_int32 GPR_CALLTYPE gprsharp_sizeof_timespec(void) {
/* Call */
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_cancel(grpc_call *call) {
- return grpc_call_cancel(call);
+ return grpc_call_cancel(call, NULL);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
grpcsharp_call_cancel_with_status(grpc_call *call, grpc_status_code status,
const char *description) {
- return grpc_call_cancel_with_status(call, status, description);
+ return grpc_call_cancel_with_status(call, status, description, NULL);
}
GPR_EXPORT char *GPR_CALLTYPE grpcsharp_call_get_peer(grpc_call *call) {
@@ -538,7 +539,8 @@ grpcsharp_call_start_unary(grpc_call *call, grpcsharp_batch_context *ctx,
&(ctx->recv_status_on_client.status_details_capacity);
ops[5].flags = 0;
- return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
+ return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
+ NULL);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
@@ -575,7 +577,8 @@ grpcsharp_call_start_client_streaming(grpc_call *call,
&(ctx->recv_status_on_client.status_details_capacity);
ops[3].flags = 0;
- return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
+ return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
+ NULL);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
@@ -615,7 +618,8 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_start_server_streaming(
&(ctx->recv_status_on_client.status_details_capacity);
ops[4].flags = 0;
- return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
+ return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
+ NULL);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
@@ -648,7 +652,8 @@ grpcsharp_call_start_duplex_streaming(grpc_call *call,
&(ctx->recv_status_on_client.status_details_capacity);
ops[2].flags = 0;
- return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
+ return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
+ NULL);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
@@ -668,7 +673,7 @@ grpcsharp_call_send_message(grpc_call *call, grpcsharp_batch_context *ctx,
ops[1].data.send_initial_metadata.metadata = NULL;
ops[1].flags = 0;
- return grpc_call_start_batch(call, ops, nops, ctx);
+ return grpc_call_start_batch(call, ops, nops, ctx, NULL);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
@@ -679,7 +684,8 @@ grpcsharp_call_send_close_from_client(grpc_call *call,
ops[0].op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
ops[0].flags = 0;
- return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
+ return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
+ NULL);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
@@ -705,7 +711,7 @@ GPR_EXPORT grpc_call_error GPR_CALLTYPE grpcsharp_call_send_status_from_server(
ops[1].data.send_initial_metadata.metadata = NULL;
ops[1].flags = 0;
- return grpc_call_start_batch(call, ops, nops, ctx);
+ return grpc_call_start_batch(call, ops, nops, ctx, NULL);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
@@ -715,7 +721,8 @@ grpcsharp_call_recv_message(grpc_call *call, grpcsharp_batch_context *ctx) {
ops[0].op = GRPC_OP_RECV_MESSAGE;
ops[0].data.recv_message = &(ctx->recv_message);
ops[0].flags = 0;
- return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
+ return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
+ NULL);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
@@ -727,7 +734,8 @@ grpcsharp_call_start_serverside(grpc_call *call, grpcsharp_batch_context *ctx) {
(&ctx->recv_close_on_server_cancelled);
ops[0].flags = 0;
- return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
+ return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
+ NULL);
}
GPR_EXPORT grpc_call_error GPR_CALLTYPE
@@ -744,7 +752,8 @@ grpcsharp_call_send_initial_metadata(grpc_call *call,
ctx->send_initial_metadata.metadata;
ops[0].flags = 0;
- return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx);
+ return grpc_call_start_batch(call, ops, sizeof(ops) / sizeof(ops[0]), ctx,
+ NULL);
}
/* Server */
@@ -752,8 +761,8 @@ grpcsharp_call_send_initial_metadata(grpc_call *call,
GPR_EXPORT grpc_server *GPR_CALLTYPE
grpcsharp_server_create(grpc_completion_queue *cq,
const grpc_channel_args *args) {
- grpc_server *server = grpc_server_create(args);
- grpc_server_register_completion_queue(server, cq);
+ grpc_server *server = grpc_server_create(args, NULL);
+ grpc_server_register_completion_queue(server, cq, NULL);
return server;
}
diff --git a/src/node/ext/call.cc b/src/node/ext/call.cc
index c5c8313385..6fc1bc424f 100644
--- a/src/node/ext/call.cc
+++ b/src/node/ext/call.cc
@@ -516,12 +516,12 @@ NAN_METHOD(Call::New) {
wrapped_call = grpc_channel_create_call(
wrapped_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
CompletionQueueAsyncWorker::GetQueue(), *method,
- *host_override, MillisecondsToTimespec(deadline));
+ *host_override, MillisecondsToTimespec(deadline), NULL);
} else if (args[3]->IsUndefined() || args[3]->IsNull()) {
wrapped_call = grpc_channel_create_call(
wrapped_channel, NULL, GRPC_PROPAGATE_DEFAULTS,
CompletionQueueAsyncWorker::GetQueue(), *method,
- NULL, MillisecondsToTimespec(deadline));
+ NULL, MillisecondsToTimespec(deadline), NULL);
} else {
return NanThrowTypeError("Call's fourth argument must be a string");
}
@@ -601,7 +601,7 @@ NAN_METHOD(Call::StartBatch) {
NanCallback *callback = new NanCallback(callback_func);
grpc_call_error error = grpc_call_start_batch(
call->wrapped_call, &ops[0], nops, new struct tag(
- callback, op_vector.release(), resources));
+ callback, op_vector.release(), resources), NULL);
if (error != GRPC_CALL_OK) {
return NanThrowError("startBatch failed", error);
}
@@ -615,7 +615,7 @@ NAN_METHOD(Call::Cancel) {
return NanThrowTypeError("cancel can only be called on Call objects");
}
Call *call = ObjectWrap::Unwrap<Call>(args.This());
- grpc_call_error error = grpc_call_cancel(call->wrapped_call);
+ grpc_call_error error = grpc_call_cancel(call->wrapped_call, NULL);
if (error != GRPC_CALL_OK) {
return NanThrowError("cancel failed", error);
}
diff --git a/src/node/ext/channel.cc b/src/node/ext/channel.cc
index 457a58c057..45d0d09e22 100644
--- a/src/node/ext/channel.cc
+++ b/src/node/ext/channel.cc
@@ -111,7 +111,7 @@ NAN_METHOD(Channel::New) {
grpc_channel_args *channel_args_ptr;
if (args[2]->IsUndefined()) {
channel_args_ptr = NULL;
- wrapped_channel = grpc_insecure_channel_create(*host, NULL);
+ wrapped_channel = grpc_insecure_channel_create(*host, NULL, NULL);
} else if (args[2]->IsObject()) {
Handle<Object> args_hash(args[2]->ToObject()->Clone());
Handle<Array> keys(args_hash->GetOwnPropertyNames());
@@ -145,7 +145,8 @@ NAN_METHOD(Channel::New) {
return NanThrowTypeError("Channel expects a string and an object");
}
if (creds == NULL) {
- wrapped_channel = grpc_insecure_channel_create(*host, channel_args_ptr);
+ wrapped_channel = grpc_insecure_channel_create(*host, channel_args_ptr,
+ NULL);
} else {
wrapped_channel =
grpc_secure_channel_create(creds, *host, channel_args_ptr);
diff --git a/src/node/ext/completion_queue_async_worker.cc b/src/node/ext/completion_queue_async_worker.cc
index 1215c97e19..4501e848ae 100644
--- a/src/node/ext/completion_queue_async_worker.cc
+++ b/src/node/ext/completion_queue_async_worker.cc
@@ -63,7 +63,7 @@ CompletionQueueAsyncWorker::~CompletionQueueAsyncWorker() {}
void CompletionQueueAsyncWorker::Execute() {
result =
- grpc_completion_queue_next(queue, gpr_inf_future(GPR_CLOCK_REALTIME));
+ grpc_completion_queue_next(queue, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
if (!result.success) {
SetErrorMessage("The batch encountered an error");
}
@@ -85,7 +85,7 @@ void CompletionQueueAsyncWorker::Init(Handle<Object> exports) {
NanScope();
current_threads = 0;
waiting_next_calls = 0;
- queue = grpc_completion_queue_create();
+ queue = grpc_completion_queue_create(NULL);
}
void CompletionQueueAsyncWorker::HandleOKCallback() {
diff --git a/src/node/ext/server.cc b/src/node/ext/server.cc
index 1dc179db3d..8e39644846 100644
--- a/src/node/ext/server.cc
+++ b/src/node/ext/server.cc
@@ -113,8 +113,8 @@ class NewCallOp : public Op {
};
Server::Server(grpc_server *server) : wrapped_server(server) {
- shutdown_queue = grpc_completion_queue_create();
- grpc_server_register_completion_queue(server, shutdown_queue);
+ shutdown_queue = grpc_completion_queue_create(NULL);
+ grpc_server_register_completion_queue(server, shutdown_queue, NULL);
}
Server::~Server() {
@@ -158,7 +158,7 @@ void Server::ShutdownServer() {
this->shutdown_queue,
NULL);
grpc_completion_queue_pluck(this->shutdown_queue, NULL,
- gpr_inf_future(GPR_CLOCK_REALTIME));
+ gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
this->wrapped_server = NULL;
}
}
@@ -176,7 +176,7 @@ NAN_METHOD(Server::New) {
grpc_server *wrapped_server;
grpc_completion_queue *queue = CompletionQueueAsyncWorker::GetQueue();
if (args[0]->IsUndefined()) {
- wrapped_server = grpc_server_create(NULL);
+ wrapped_server = grpc_server_create(NULL, NULL);
} else if (args[0]->IsObject()) {
Handle<Object> args_hash(args[0]->ToObject());
Handle<Array> keys(args_hash->GetOwnPropertyNames());
@@ -205,12 +205,12 @@ NAN_METHOD(Server::New) {
return NanThrowTypeError("Arg values must be strings");
}
}
- wrapped_server = grpc_server_create(&channel_args);
+ wrapped_server = grpc_server_create(&channel_args, NULL);
free(channel_args.args);
} else {
return NanThrowTypeError("Server expects an object");
}
- grpc_server_register_completion_queue(wrapped_server, queue);
+ grpc_server_register_completion_queue(wrapped_server, queue, NULL);
Server *server = new Server(wrapped_server);
server->Wrap(args.This());
NanReturnValue(args.This());
diff --git a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m
index 696069c200..ea2b01ee1d 100644
--- a/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m
+++ b/src/objective-c/GRPCClient/private/GRPCCompletionQueue.m
@@ -43,7 +43,7 @@
- (instancetype)init {
if ((self = [super init])) {
- _unmanagedQueue = grpc_completion_queue_create();
+ _unmanagedQueue = grpc_completion_queue_create(NULL);
// This is for the following block to capture the pointer by value (instead
// of retaining self and doing self->_unmanagedQueue). This is essential
@@ -64,7 +64,8 @@
while (YES) {
// The following call blocks until an event is available.
grpc_event event = grpc_completion_queue_next(unmanagedQueue,
- gpr_inf_future(GPR_CLOCK_REALTIME));
+ gpr_inf_future(GPR_CLOCK_REALTIME),
+ NULL);
GRPCQueueCompletionHandler handler;
switch (event.type) {
case GRPC_OP_COMPLETE:
diff --git a/src/objective-c/GRPCClient/private/GRPCHost.m b/src/objective-c/GRPCClient/private/GRPCHost.m
index d902f95b51..a7142d0f00 100644
--- a/src/objective-c/GRPCClient/private/GRPCHost.m
+++ b/src/objective-c/GRPCClient/private/GRPCHost.m
@@ -97,7 +97,7 @@
queue.unmanagedQueue,
path.UTF8String,
self.hostName.UTF8String,
- gpr_inf_future(GPR_CLOCK_REALTIME));
+ gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
}
- (GRPCChannel *)channel {
diff --git a/src/objective-c/GRPCClient/private/GRPCUnsecuredChannel.m b/src/objective-c/GRPCClient/private/GRPCUnsecuredChannel.m
index 070a529629..15b6ffc75c 100644
--- a/src/objective-c/GRPCClient/private/GRPCUnsecuredChannel.m
+++ b/src/objective-c/GRPCClient/private/GRPCUnsecuredChannel.m
@@ -38,7 +38,7 @@
@implementation GRPCUnsecuredChannel
- (instancetype)initWithHost:(NSString *)host {
- return (self = [super initWithChannel:grpc_insecure_channel_create(host.UTF8String, NULL)]);
+ return (self = [super initWithChannel:grpc_insecure_channel_create(host.UTF8String, NULL, NULL)]);
}
// TODO(jcanizales): GRPCSecureChannel and GRPCUnsecuredChannel are just convenience initializers
diff --git a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m
index 951c051036..fe3d51da53 100644
--- a/src/objective-c/GRPCClient/private/GRPCWrappedCall.m
+++ b/src/objective-c/GRPCClient/private/GRPCWrappedCall.m
@@ -282,7 +282,7 @@
for (GRPCOperation *operation in operations) {
[operation finish];
}
- }));
+ }), NULL);
gpr_free(ops_array);
if (error != GRPC_CALL_OK) {
@@ -293,7 +293,7 @@
}
- (void)cancel {
- grpc_call_cancel(_call);
+ grpc_call_cancel(_call, NULL);
}
- (void)dealloc {
diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c
index 01ec909b79..1cf766c312 100644
--- a/src/php/ext/grpc/call.c
+++ b/src/php/ext/grpc/call.c
@@ -241,7 +241,7 @@ PHP_METHOD(Call, __construct) {
deadline_obj TSRMLS_CC);
call->wrapped = grpc_channel_create_call(
channel->wrapped, NULL, GRPC_PROPAGATE_DEFAULTS, completion_queue, method,
- channel->target, deadline->wrapped);
+ channel->target, deadline->wrapped, NULL);
}
/**
@@ -400,7 +400,8 @@ PHP_METHOD(Call, startBatch) {
ops[op_num].flags = 0;
op_num++;
}
- error = grpc_call_start_batch(call->wrapped, ops, op_num, call->wrapped);
+ error = grpc_call_start_batch(call->wrapped, ops, op_num, call->wrapped,
+ NULL);
if (error != GRPC_CALL_OK) {
zend_throw_exception(spl_ce_LogicException,
"start_batch was called incorrectly",
@@ -408,7 +409,7 @@ PHP_METHOD(Call, startBatch) {
goto cleanup;
}
event = grpc_completion_queue_pluck(completion_queue, call->wrapped,
- gpr_inf_future(GPR_CLOCK_REALTIME));
+ gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
if (!event.success) {
zend_throw_exception(spl_ce_LogicException,
"The batch failed for some reason",
@@ -489,7 +490,7 @@ PHP_METHOD(Call, getPeer) {
PHP_METHOD(Call, cancel) {
wrapped_grpc_call *call =
(wrapped_grpc_call *)zend_object_store_get_object(getThis() TSRMLS_CC);
- grpc_call_cancel(call->wrapped);
+ grpc_call_cancel(call->wrapped, NULL);
}
static zend_function_entry call_methods[] = {
diff --git a/src/php/ext/grpc/channel.c b/src/php/ext/grpc/channel.c
index 447cfc15be..f8ce04d902 100644
--- a/src/php/ext/grpc/channel.c
+++ b/src/php/ext/grpc/channel.c
@@ -154,7 +154,7 @@ PHP_METHOD(Channel, __construct) {
override = target;
override_len = target_length;
if (args_array == NULL) {
- channel->wrapped = grpc_insecure_channel_create(target, NULL);
+ channel->wrapped = grpc_insecure_channel_create(target, NULL, NULL);
} else {
array_hash = Z_ARRVAL_P(args_array);
if (zend_hash_find(array_hash, "credentials", sizeof("credentials"),
@@ -184,7 +184,7 @@ PHP_METHOD(Channel, __construct) {
}
php_grpc_read_args_array(args_array, &args);
if (creds == NULL) {
- channel->wrapped = grpc_insecure_channel_create(target, &args);
+ channel->wrapped = grpc_insecure_channel_create(target, &args, NULL);
} else {
gpr_log(GPR_DEBUG, "Initialized secure channel");
channel->wrapped =
@@ -255,7 +255,7 @@ PHP_METHOD(Channel, watchConnectivityState) {
deadline->wrapped, completion_queue, NULL);
grpc_event event = grpc_completion_queue_pluck(
completion_queue, NULL,
- gpr_inf_future(GPR_CLOCK_REALTIME));
+ gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
RETURN_BOOL(event.success);
}
diff --git a/src/php/ext/grpc/completion_queue.c b/src/php/ext/grpc/completion_queue.c
index c653a592ef..741204b0b1 100644
--- a/src/php/ext/grpc/completion_queue.c
+++ b/src/php/ext/grpc/completion_queue.c
@@ -38,14 +38,13 @@
grpc_completion_queue *completion_queue;
void grpc_php_init_completion_queue(TSRMLS_D) {
- completion_queue = grpc_completion_queue_create();
+ completion_queue = grpc_completion_queue_create(NULL);
}
void grpc_php_shutdown_completion_queue(TSRMLS_D) {
grpc_completion_queue_shutdown(completion_queue);
while (grpc_completion_queue_next(completion_queue,
- gpr_inf_future(GPR_CLOCK_REALTIME))
- .type != GRPC_QUEUE_SHUTDOWN)
- ;
+ gpr_inf_future(GPR_CLOCK_REALTIME),
+ NULL).type != GRPC_QUEUE_SHUTDOWN);
grpc_completion_queue_destroy(completion_queue);
}
diff --git a/src/php/ext/grpc/server.c b/src/php/ext/grpc/server.c
index d58aa884ca..ca129e76ca 100644
--- a/src/php/ext/grpc/server.c
+++ b/src/php/ext/grpc/server.c
@@ -66,7 +66,7 @@ void free_wrapped_grpc_server(void *object TSRMLS_DC) {
grpc_server_shutdown_and_notify(server->wrapped, completion_queue, NULL);
grpc_server_cancel_all_calls(server->wrapped);
grpc_completion_queue_pluck(completion_queue, NULL,
- gpr_inf_future(GPR_CLOCK_REALTIME));
+ gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
grpc_server_destroy(server->wrapped);
}
efree(server);
@@ -109,13 +109,14 @@ PHP_METHOD(Server, __construct) {
return;
}
if (args_array == NULL) {
- server->wrapped = grpc_server_create(NULL);
+ server->wrapped = grpc_server_create(NULL, NULL);
} else {
php_grpc_read_args_array(args_array, &args);
- server->wrapped = grpc_server_create(&args);
+ server->wrapped = grpc_server_create(&args, NULL);
efree(args.args);
}
- grpc_server_register_completion_queue(server->wrapped, completion_queue);
+ grpc_server_register_completion_queue(server->wrapped, completion_queue,
+ NULL);
}
/**
@@ -146,7 +147,7 @@ PHP_METHOD(Server, requestCall) {
goto cleanup;
}
event = grpc_completion_queue_pluck(completion_queue, NULL,
- gpr_inf_future(GPR_CLOCK_REALTIME));
+ gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
if (!event.success) {
zend_throw_exception(spl_ce_LogicException,
"Failed to request a call for some reason",
diff --git a/src/python/grpcio/grpc/_adapter/_c/types/call.c b/src/python/grpcio/grpc/_adapter/_c/types/call.c
index 5e46605c45..42a50151f6 100644
--- a/src/python/grpcio/grpc/_adapter/_c/types/call.c
+++ b/src/python/grpcio/grpc/_adapter/_c/types/call.c
@@ -132,7 +132,7 @@ PyObject *pygrpc_Call_start_batch(Call *self, PyObject *args, PyObject *kwargs)
}
}
tag = pygrpc_produce_batch_tag(user_tag, self, ops, nops);
- errcode = grpc_call_start_batch(self->c_call, tag->ops, tag->nops, tag);
+ errcode = grpc_call_start_batch(self->c_call, tag->ops, tag->nops, tag, NULL);
gpr_free(ops);
return PyInt_FromLong(errcode);
}
@@ -152,13 +152,13 @@ PyObject *pygrpc_Call_cancel(Call *self, PyObject *args, PyObject *kwargs) {
return NULL;
}
code = PyInt_AsLong(py_code);
- errcode = grpc_call_cancel_with_status(self->c_call, code, details);
+ errcode = grpc_call_cancel_with_status(self->c_call, code, details, NULL);
} else if (py_code != NULL || details != NULL) {
PyErr_SetString(PyExc_ValueError,
"if `code` is specified, so must `details`");
return NULL;
} else {
- errcode = grpc_call_cancel(self->c_call);
+ errcode = grpc_call_cancel(self->c_call, NULL);
}
return PyInt_FromLong(errcode);
}
diff --git a/src/python/grpcio/grpc/_adapter/_c/types/channel.c b/src/python/grpcio/grpc/_adapter/_c/types/channel.c
index eb9d43d154..c577ac05eb 100644
--- a/src/python/grpcio/grpc/_adapter/_c/types/channel.c
+++ b/src/python/grpcio/grpc/_adapter/_c/types/channel.c
@@ -108,7 +108,7 @@ Channel *pygrpc_Channel_new(
if (creds) {
self->c_chan = grpc_secure_channel_create(creds->c_creds, target, &c_args);
} else {
- self->c_chan = grpc_insecure_channel_create(target, &c_args);
+ self->c_chan = grpc_insecure_channel_create(target, &c_args, NULL);
}
pygrpc_discard_channel_args(c_args);
return self;
@@ -133,7 +133,7 @@ Call *pygrpc_Channel_create_call(
call = pygrpc_Call_new_empty(cq);
call->c_call = grpc_channel_create_call(
self->c_chan, NULL, GRPC_PROPAGATE_DEFAULTS, cq->c_cq, method, host,
- pygrpc_cast_double_to_gpr_timespec(deadline));
+ pygrpc_cast_double_to_gpr_timespec(deadline), NULL);
return call;
}
diff --git a/src/python/grpcio/grpc/_adapter/_c/types/completion_queue.c b/src/python/grpcio/grpc/_adapter/_c/types/completion_queue.c
index 2dd44b6ddd..d8bb89ca4b 100644
--- a/src/python/grpcio/grpc/_adapter/_c/types/completion_queue.c
+++ b/src/python/grpcio/grpc/_adapter/_c/types/completion_queue.c
@@ -90,7 +90,7 @@ PyTypeObject pygrpc_CompletionQueue_type = {
CompletionQueue *pygrpc_CompletionQueue_new(
PyTypeObject *type, PyObject *args, PyObject *kwargs) {
CompletionQueue *self = (CompletionQueue *)type->tp_alloc(type, 0);
- self->c_cq = grpc_completion_queue_create();
+ self->c_cq = grpc_completion_queue_create(NULL);
return self;
}
@@ -111,7 +111,7 @@ PyObject *pygrpc_CompletionQueue_next(
}
Py_BEGIN_ALLOW_THREADS;
event = grpc_completion_queue_next(
- self->c_cq, pygrpc_cast_double_to_gpr_timespec(deadline));
+ self->c_cq, pygrpc_cast_double_to_gpr_timespec(deadline), NULL);
Py_END_ALLOW_THREADS;
transliterated_event = pygrpc_consume_event(event);
return transliterated_event;
diff --git a/src/python/grpcio/grpc/_adapter/_c/types/server.c b/src/python/grpcio/grpc/_adapter/_c/types/server.c
index 2a11d09d21..15c98f28eb 100644
--- a/src/python/grpcio/grpc/_adapter/_c/types/server.c
+++ b/src/python/grpcio/grpc/_adapter/_c/types/server.c
@@ -104,8 +104,8 @@ Server *pygrpc_Server_new(PyTypeObject *type, PyObject *args, PyObject *kwargs)
return NULL;
}
self = (Server *)type->tp_alloc(type, 0);
- self->c_serv = grpc_server_create(&c_args);
- grpc_server_register_completion_queue(self->c_serv, cq->c_cq);
+ self->c_serv = grpc_server_create(&c_args, NULL);
+ grpc_server_register_completion_queue(self->c_serv, cq->c_cq, NULL);
pygrpc_discard_channel_args(c_args);
self->cq = cq;
Py_INCREF(self->cq);
diff --git a/src/ruby/ext/grpc/rb_call.c b/src/ruby/ext/grpc/rb_call.c
index 88659da535..70f0795f29 100644
--- a/src/ruby/ext/grpc/rb_call.c
+++ b/src/ruby/ext/grpc/rb_call.c
@@ -170,7 +170,7 @@ static VALUE grpc_rb_call_cancel(VALUE self) {
grpc_call *call = NULL;
grpc_call_error err;
TypedData_Get_Struct(self, grpc_call, &grpc_call_data_type, call);
- err = grpc_call_cancel(call);
+ err = grpc_call_cancel(call, NULL);
if (err != GRPC_CALL_OK) {
rb_raise(grpc_rb_eCallError, "cancel failed: %s (code=%d)",
grpc_call_error_detail_of(err), err);
@@ -615,7 +615,7 @@ static VALUE grpc_rb_call_run_batch(VALUE self, VALUE cqueue, VALUE tag,
/* call grpc_call_start_batch, then wait for it to complete using
* pluck_event */
- err = grpc_call_start_batch(call, st.ops, st.op_num, ROBJECT(tag));
+ err = grpc_call_start_batch(call, st.ops, st.op_num, ROBJECT(tag), NULL);
if (err != GRPC_CALL_OK) {
grpc_run_batch_stack_cleanup(&st);
rb_raise(grpc_rb_eCallError,
diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c
index 2129ba3485..6491aa4fb4 100644
--- a/src/ruby/ext/grpc/rb_channel.c
+++ b/src/ruby/ext/grpc/rb_channel.c
@@ -147,7 +147,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) {
target_chars = StringValueCStr(target);
grpc_rb_hash_convert_to_channel_args(channel_args, &args);
if (credentials == Qnil) {
- ch = grpc_insecure_channel_create(target_chars, &args);
+ ch = grpc_insecure_channel_create(target_chars, &args, NULL);
} else {
creds = grpc_rb_get_wrapped_credentials(credentials);
ch = grpc_secure_channel_create(creds, target_chars, &args);
@@ -288,7 +288,7 @@ static VALUE grpc_rb_channel_create_call(VALUE self, VALUE cqueue,
call = grpc_channel_create_call(ch, parent_call, flags, cq, method_chars,
host_chars, grpc_rb_time_timeval(
deadline,
- /* absolute time */ 0));
+ /* absolute time */ 0), NULL);
if (call == NULL) {
rb_raise(rb_eRuntimeError, "cannot create call with method %s",
method_chars);
diff --git a/src/ruby/ext/grpc/rb_completion_queue.c b/src/ruby/ext/grpc/rb_completion_queue.c
index b6674d7682..0bc9eb2a97 100644
--- a/src/ruby/ext/grpc/rb_completion_queue.c
+++ b/src/ruby/ext/grpc/rb_completion_queue.c
@@ -56,7 +56,7 @@ typedef struct next_call_stack {
static void *grpc_rb_completion_queue_next_no_gil(void *param) {
next_call_stack *const next_call = (next_call_stack*)param;
next_call->event =
- grpc_completion_queue_next(next_call->cq, next_call->timeout);
+ grpc_completion_queue_next(next_call->cq, next_call->timeout, NULL);
return NULL;
}
@@ -64,7 +64,7 @@ static void *grpc_rb_completion_queue_next_no_gil(void *param) {
static void *grpc_rb_completion_queue_pluck_no_gil(void *param) {
next_call_stack *const next_call = (next_call_stack*)param;
next_call->event = grpc_completion_queue_pluck(next_call->cq, next_call->tag,
- next_call->timeout);
+ next_call->timeout, NULL);
return NULL;
}
@@ -128,7 +128,7 @@ static rb_data_type_t grpc_rb_completion_queue_data_type = {
/* Allocates a completion queue. */
static VALUE grpc_rb_completion_queue_alloc(VALUE cls) {
- grpc_completion_queue *cq = grpc_completion_queue_create();
+ grpc_completion_queue *cq = grpc_completion_queue_create(NULL);
if (cq == NULL) {
rb_raise(rb_eArgError, "could not create a completion queue: not sure why");
}
diff --git a/src/ruby/ext/grpc/rb_server.c b/src/ruby/ext/grpc/rb_server.c
index 79a4ae8757..7e76349d2e 100644
--- a/src/ruby/ext/grpc/rb_server.c
+++ b/src/ruby/ext/grpc/rb_server.c
@@ -128,7 +128,7 @@ static VALUE grpc_rb_server_init(VALUE self, VALUE cqueue, VALUE channel_args) {
TypedData_Get_Struct(self, grpc_rb_server, &grpc_rb_server_data_type,
wrapper);
grpc_rb_hash_convert_to_channel_args(channel_args, &args);
- srv = grpc_server_create(&args);
+ srv = grpc_server_create(&args, NULL);
if (args.args != NULL) {
xfree(args.args); /* Allocated by grpc_rb_hash_convert_to_channel_args */
@@ -136,7 +136,7 @@ static VALUE grpc_rb_server_init(VALUE self, VALUE cqueue, VALUE channel_args) {
if (srv == NULL) {
rb_raise(rb_eRuntimeError, "could not create a gRPC server, not sure why");
}
- grpc_server_register_completion_queue(srv, cq);
+ grpc_server_register_completion_queue(srv, cq, NULL);
wrapper->wrapped = srv;
/* Add the cq as the server's mark object. This ensures the ruby cq can't be