aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/server
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/server')
-rw-r--r--src/cpp/server/server.cc37
-rw-r--r--src/cpp/server/server_builder.cc8
-rw-r--r--src/cpp/server/server_context.cc33
3 files changed, 51 insertions, 27 deletions
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 0d31140924..fafe31e84c 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015-2016, Google Inc.
+ * Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -49,7 +49,7 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include "src/core/profiling/timers.h"
+#include "src/core/lib/profiling/timers.h"
#include "src/cpp/server/thread_pool_interface.h"
namespace grpc {
@@ -264,6 +264,7 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
void* const tag_;
bool in_flight_;
const bool has_request_payload_;
+ uint32_t incoming_flags_;
grpc_call* call_;
grpc_call_details* call_details_;
gpr_timespec deadline_;
@@ -272,27 +273,25 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
grpc_completion_queue* cq_;
};
-static grpc_server* CreateServer(const ChannelArguments& args) {
- grpc_channel_args channel_args;
- args.SetChannelArgs(&channel_args);
- return grpc_server_create(&channel_args, nullptr);
-}
-
static internal::GrpcLibraryInitializer g_gli_initializer;
Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
- int max_message_size, const ChannelArguments& args)
+ int max_message_size, ChannelArguments* args)
: max_message_size_(max_message_size),
started_(false),
shutdown_(false),
num_running_cb_(0),
sync_methods_(new std::list<SyncRequest>),
has_generic_service_(false),
- server_(CreateServer(args)),
+ server_(nullptr),
thread_pool_(thread_pool),
thread_pool_owned_(thread_pool_owned) {
g_gli_initializer.summon();
gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks);
global_callbacks_ = g_callbacks;
+ global_callbacks_->UpdateArguments(args);
+ grpc_channel_args channel_args;
+ args->SetChannelArgs(&channel_args);
+ server_ = grpc_server_create(&channel_args, nullptr);
grpc_server_register_completion_queue(server_, cq_.cq(), nullptr);
}
@@ -322,6 +321,19 @@ void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
g_callbacks.reset(callbacks);
}
+static grpc_server_register_method_payload_handling PayloadHandlingForMethod(
+ RpcServiceMethod* method) {
+ switch (method->method_type()) {
+ case RpcMethod::NORMAL_RPC:
+ case RpcMethod::SERVER_STREAMING:
+ return GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER;
+ case RpcMethod::CLIENT_STREAMING:
+ case RpcMethod::BIDI_STREAMING:
+ return GRPC_SRM_PAYLOAD_NONE;
+ }
+ GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;);
+}
+
bool Server::RegisterService(const grpc::string* host, Service* service) {
bool has_async_methods = service->has_async_methods();
if (has_async_methods) {
@@ -335,8 +347,9 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
continue;
}
RpcServiceMethod* method = it->get();
- void* tag = grpc_server_register_method(server_, method->name(),
- host ? host->c_str() : nullptr);
+ void* tag = grpc_server_register_method(
+ server_, method->name(), host ? host->c_str() : nullptr,
+ PayloadHandlingForMethod(method), 0);
if (tag == nullptr) {
gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
method->name());
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index c54cf6474f..68cc38258c 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015-2016, Google Inc.
+ * Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -33,10 +33,10 @@
#include <grpc++/server_builder.h>
-#include <grpc/support/cpu.h>
-#include <grpc/support/log.h>
#include <grpc++/impl/service_type.h>
#include <grpc++/server.h>
+#include <grpc/support/cpu.h>
+#include <grpc/support/log.h>
#include "src/cpp/server/thread_pool_interface.h"
namespace grpc {
@@ -103,7 +103,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
args.SetInt(GRPC_COMPRESSION_ALGORITHM_STATE_ARG,
compression_options_.enabled_algorithms_bitset);
std::unique_ptr<Server> server(
- new Server(thread_pool.release(), true, max_message_size_, args));
+ new Server(thread_pool.release(), true, max_message_size_, &args));
for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
nullptr);
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index e205a1969b..e05a7df28a 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -1,6 +1,6 @@
/*
*
- * Copyright 2015-2016, Google Inc.
+ * Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@@ -42,7 +42,8 @@
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
-#include "src/core/channel/compress_filter.h"
+#include "src/core/lib/channel/compress_filter.h"
+#include "src/core/lib/surface/call.h"
#include "src/cpp/common/create_auth_context.h"
namespace grpc {
@@ -62,7 +63,11 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface {
void FillOps(grpc_op* ops, size_t* nops) GRPC_OVERRIDE;
bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
- bool CheckCancelled(CompletionQueue* cq);
+ bool CheckCancelled(CompletionQueue* cq) {
+ cq->TryPluck(this);
+ return CheckCancelledNoPluck();
+ }
+ bool CheckCancelledAsync() { return CheckCancelledNoPluck(); }
void set_tag(void* tag) {
has_tag_ = true;
@@ -72,6 +77,11 @@ class ServerContext::CompletionOp GRPC_FINAL : public CallOpSetInterface {
void Unref();
private:
+ bool CheckCancelledNoPluck() {
+ grpc::lock_guard<grpc::mutex> g(mu_);
+ return finalized_ ? (cancelled_ != 0) : false;
+ }
+
bool has_tag_;
void* tag_;
grpc::mutex mu_;
@@ -88,12 +98,6 @@ void ServerContext::CompletionOp::Unref() {
}
}
-bool ServerContext::CompletionOp::CheckCancelled(CompletionQueue* cq) {
- cq->TryPluck(this);
- grpc::lock_guard<grpc::mutex> g(mu_);
- return finalized_ ? cancelled_ != 0 : false;
-}
-
void ServerContext::CompletionOp::FillOps(grpc_op* ops, size_t* nops) {
ops->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
ops->data.recv_close_on_server.cancelled = &cancelled_;
@@ -182,12 +186,19 @@ void ServerContext::TryCancel() const {
}
bool ServerContext::IsCancelled() const {
- return completion_op_ && completion_op_->CheckCancelled(cq_);
+ if (has_notify_when_done_tag_) {
+ // when using async API, but the result is only valid
+ // if the tag has already been delivered at the completion queue
+ return completion_op_ && completion_op_->CheckCancelledAsync();
+ } else {
+ // when using sync API
+ return completion_op_ && completion_op_->CheckCancelled(cq_);
+ }
}
void ServerContext::set_compression_level(grpc_compression_level level) {
const grpc_compression_algorithm algorithm_for_level =
- grpc_compression_algorithm_for_level(level);
+ grpc_call_compression_for_level(call_, level);
set_compression_algorithm(algorithm_for_level);
}