aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--grpc.def6
-rw-r--r--include/grpc++/alarm.h2
-rw-r--r--include/grpc++/channel.h16
-rw-r--r--include/grpc++/impl/codegen/async_stream.h239
-rw-r--r--include/grpc++/impl/codegen/async_unary_call.h64
-rw-r--r--include/grpc++/impl/codegen/byte_buffer.h21
-rw-r--r--include/grpc++/impl/codegen/call.h17
-rw-r--r--include/grpc++/impl/codegen/call_hook.h2
-rw-r--r--include/grpc++/impl/codegen/channel_interface.h45
-rw-r--r--include/grpc++/impl/codegen/client_context.h23
-rw-r--r--include/grpc++/impl/codegen/client_unary_call.h73
-rw-r--r--include/grpc++/impl/codegen/completion_queue.h51
-rw-r--r--include/grpc++/impl/codegen/completion_queue_tag.h2
-rw-r--r--include/grpc++/impl/codegen/metadata_map.h2
-rw-r--r--include/grpc++/impl/codegen/method_handler_impl.h2
-rw-r--r--include/grpc++/impl/codegen/rpc_method.h3
-rw-r--r--include/grpc++/impl/codegen/rpc_service_method.h3
-rw-r--r--include/grpc++/impl/codegen/server_context.h27
-rw-r--r--include/grpc++/impl/codegen/server_interface.h42
-rw-r--r--include/grpc++/impl/codegen/service_type.h46
-rw-r--r--include/grpc++/impl/codegen/sync_stream.h327
-rw-r--r--include/grpc++/impl/codegen/time.h6
-rw-r--r--include/grpc++/server.h3
-rw-r--r--include/grpc++/server_builder.h1
-rw-r--r--include/grpc/grpc_security.h74
-rw-r--r--include/grpc/grpc_security_constants.h7
-rw-r--r--src/compiler/cpp_generator.cc151
-rw-r--r--src/core/lib/security/credentials/ssl/ssl_credentials.cc156
-rw-r--r--src/core/lib/security/credentials/ssl/ssl_credentials.h12
-rw-r--r--src/core/lib/security/transport/security_connector.cc251
-rw-r--r--src/core/lib/security/transport/security_connector.h4
-rw-r--r--src/cpp/client/channel_cc.cc12
-rw-r--r--src/cpp/client/generic_stub.cc10
-rw-r--r--src/cpp/common/completion_queue_cc.cc4
-rw-r--r--src/cpp/server/health/default_health_check_service.cc9
-rw-r--r--src/cpp/server/health/default_health_check_service.h2
-rw-r--r--src/cpp/server/server_cc.cc69
-rw-r--r--src/cpp/server/server_context.cc8
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.c12
-rw-r--r--src/ruby/ext/grpc/rb_grpc_imports.generated.h18
-rw-r--r--test/cpp/codegen/compiler_test_golden17
-rw-r--r--test/cpp/microbenchmarks/bm_cq.cc2
-rw-r--r--test/cpp/qps/client_async.cc28
-rw-r--r--test/cpp/qps/server_async.cc3
44 files changed, 1227 insertions, 645 deletions
diff --git a/grpc.def b/grpc.def
index e4281f3ab6..eea12bdda5 100644
--- a/grpc.def
+++ b/grpc.def
@@ -132,8 +132,14 @@ EXPORTS
grpc_metadata_credentials_create_from_plugin
grpc_secure_channel_create
grpc_server_credentials_release
+ grpc_ssl_server_certificate_config_create
+ grpc_ssl_server_certificate_config_destroy
grpc_ssl_server_credentials_create
grpc_ssl_server_credentials_create_ex
+ grpc_ssl_server_credentials_create_options_using_config
+ grpc_ssl_server_credentials_create_options_using_config_fetcher
+ grpc_ssl_server_credentials_options_destroy
+ grpc_ssl_server_credentials_create_with_options
grpc_server_add_secure_http2_port
grpc_call_set_credentials
grpc_server_credentials_set_auth_metadata_processor
diff --git a/include/grpc++/alarm.h b/include/grpc++/alarm.h
index 2d88d868e5..b43425e224 100644
--- a/include/grpc++/alarm.h
+++ b/include/grpc++/alarm.h
@@ -92,7 +92,7 @@ class Alarm : private GrpcLibraryCodegen {
}
private:
- class AlarmEntry : public CompletionQueueTag {
+ class AlarmEntry : public internal::CompletionQueueTag {
public:
AlarmEntry(void* tag) : tag_(tag) {}
void Set(void* tag) { tag_ = tag; }
diff --git a/include/grpc++/channel.h b/include/grpc++/channel.h
index c50091d6ac..e9fb5a5d09 100644
--- a/include/grpc++/channel.h
+++ b/include/grpc++/channel.h
@@ -32,7 +32,7 @@ struct grpc_channel;
namespace grpc {
/// Channels represent a connection to an endpoint. Created by \a CreateChannel.
class Channel final : public ChannelInterface,
- public CallHook,
+ public internal::CallHook,
public std::enable_shared_from_this<Channel>,
private GrpcLibraryCodegen {
public:
@@ -51,18 +51,16 @@ class Channel final : public ChannelInterface,
private:
template <class InputMessage, class OutputMessage>
- friend Status BlockingUnaryCall(ChannelInterface* channel,
- const RpcMethod& method,
- ClientContext* context,
- const InputMessage& request,
- OutputMessage* result);
+ friend class internal::BlockingUnaryCallImpl;
friend std::shared_ptr<Channel> CreateChannelInternal(
const grpc::string& host, grpc_channel* c_channel);
Channel(const grpc::string& host, grpc_channel* c_channel);
- Call CreateCall(const RpcMethod& method, ClientContext* context,
- CompletionQueue* cq) override;
- void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) override;
+ internal::Call CreateCall(const internal::RpcMethod& method,
+ ClientContext* context,
+ CompletionQueue* cq) override;
+ void PerformOpsOnCall(internal::CallOpSetInterface* ops,
+ internal::Call* call) override;
void* RegisterMethod(const char* method) override;
void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h
index e60572fc93..4476033463 100644
--- a/include/grpc++/impl/codegen/async_stream.h
+++ b/include/grpc++/impl/codegen/async_stream.h
@@ -30,6 +30,7 @@ namespace grpc {
class CompletionQueue;
+namespace internal {
/// Common interface for all client side asynchronous streaming.
class ClientAsyncStreamingInterface {
public:
@@ -151,15 +152,16 @@ class AsyncWriterInterface {
}
};
+} // namespace internal
+
template <class R>
-class ClientAsyncReaderInterface : public ClientAsyncStreamingInterface,
- public AsyncReaderInterface<R> {};
+class ClientAsyncReaderInterface
+ : public internal::ClientAsyncStreamingInterface,
+ public internal::AsyncReaderInterface<R> {};
-/// Async client-side API for doing server-streaming RPCs,
-/// where the incoming message stream coming from the server has
-/// messages of type \a R.
+namespace internal {
template <class R>
-class ClientAsyncReader final : public ClientAsyncReaderInterface<R> {
+class ClientAsyncReaderFactory {
public:
/// Create a stream object.
/// Write the first request out if \a start is set.
@@ -169,16 +171,25 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> {
/// Note that \a context will be used to fill in custom initial metadata
/// used to send to the server when starting the call.
template <class W>
- static ClientAsyncReader* Create(ChannelInterface* channel,
- CompletionQueue* cq, const RpcMethod& method,
- ClientContext* context, const W& request,
- bool start, void* tag) {
- Call call = channel->CreateCall(method, context, cq);
+ static ClientAsyncReader<R>* Create(ChannelInterface* channel,
+ CompletionQueue* cq,
+ const ::grpc::internal::RpcMethod& method,
+ ClientContext* context, const W& request,
+ bool start, void* tag) {
+ ::grpc::internal::Call call = channel->CreateCall(method, context, cq);
return new (g_core_codegen_interface->grpc_call_arena_alloc(
- call.call(), sizeof(ClientAsyncReader)))
- ClientAsyncReader(call, context, request, start, tag);
+ call.call(), sizeof(ClientAsyncReader<R>)))
+ ClientAsyncReader<R>(call, context, request, start, tag);
}
+};
+} // namespace internal
+/// Async client-side API for doing server-streaming RPCs,
+/// where the incoming message stream coming from the server has
+/// messages of type \a R.
+template <class R>
+class ClientAsyncReader final : public ClientAsyncReaderInterface<R> {
+ public:
// always allocated against a call arena, no memory free required
static void operator delete(void* ptr, std::size_t size) {
assert(size == sizeof(ClientAsyncReader));
@@ -233,9 +244,10 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> {
}
private:
+ friend class internal::ClientAsyncReaderFactory<R>;
template <class W>
- ClientAsyncReader(Call call, ClientContext* context, const W& request,
- bool start, void* tag)
+ ClientAsyncReader(::grpc::internal::Call call, ClientContext* context,
+ const W& request, bool start, void* tag)
: context_(context), call_(call), started_(start) {
// TODO(ctiller): don't assert
GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok());
@@ -255,19 +267,27 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> {
}
ClientContext* context_;
- Call call_;
+ ::grpc::internal::Call call_;
bool started_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpSendMessage,
+ ::grpc::internal::CallOpClientSendClose>
init_ops_;
- CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
- CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
- CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
+ meta_ops_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
+ ::grpc::internal::CallOpRecvMessage<R>>
+ read_ops_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
+ ::grpc::internal::CallOpClientRecvStatus>
+ finish_ops_;
};
/// Common interface for client side asynchronous writing.
template <class W>
-class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface,
- public AsyncWriterInterface<W> {
+class ClientAsyncWriterInterface
+ : public internal::ClientAsyncStreamingInterface,
+ public internal::AsyncWriterInterface<W> {
public:
/// Signal the client is done with the writes (half-close the client stream).
/// Thread-safe with respect to \a AsyncReaderInterface::Read
@@ -276,11 +296,9 @@ class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface,
virtual void WritesDone(void* tag) = 0;
};
-/// Async API on the client side for doing client-streaming RPCs,
-/// where the outgoing message stream going to the server contains
-/// messages of type \a W.
+namespace internal {
template <class W>
-class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
+class ClientAsyncWriterFactory {
public:
/// Create a stream object.
/// Start the RPC if \a start is set
@@ -294,16 +312,25 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
/// message from the server upon a successful call to the \a Finish
/// method of this instance.
template <class R>
- static ClientAsyncWriter* Create(ChannelInterface* channel,
- CompletionQueue* cq, const RpcMethod& method,
- ClientContext* context, R* response,
- bool start, void* tag) {
- Call call = channel->CreateCall(method, context, cq);
+ static ClientAsyncWriter<W>* Create(ChannelInterface* channel,
+ CompletionQueue* cq,
+ const ::grpc::internal::RpcMethod& method,
+ ClientContext* context, R* response,
+ bool start, void* tag) {
+ ::grpc::internal::Call call = channel->CreateCall(method, context, cq);
return new (g_core_codegen_interface->grpc_call_arena_alloc(
- call.call(), sizeof(ClientAsyncWriter)))
- ClientAsyncWriter(call, context, response, start, tag);
+ call.call(), sizeof(ClientAsyncWriter<W>)))
+ ClientAsyncWriter<W>(call, context, response, start, tag);
}
+};
+} // namespace internal
+/// Async API on the client side for doing client-streaming RPCs,
+/// where the outgoing message stream going to the server contains
+/// messages of type \a W.
+template <class W>
+class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
+ public:
// always allocated against a call arena, no memory free required
static void operator delete(void* ptr, std::size_t size) {
assert(size == sizeof(ClientAsyncWriter));
@@ -376,9 +403,10 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
}
private:
+ friend class internal::ClientAsyncWriterFactory<W>;
template <class R>
- ClientAsyncWriter(Call call, ClientContext* context, R* response, bool start,
- void* tag)
+ ClientAsyncWriter(::grpc::internal::Call call, ClientContext* context,
+ R* response, bool start, void* tag)
: context_(context), call_(call), started_(start) {
finish_ops_.RecvMessage(response);
finish_ops_.AllowNoMessage();
@@ -401,13 +429,17 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
}
ClientContext* context_;
- Call call_;
+ ::grpc::internal::Call call_;
bool started_;
- CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
+ meta_ops_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpSendMessage,
+ ::grpc::internal::CallOpClientSendClose>
write_ops_;
- CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage,
- CallOpClientRecvStatus>
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
+ ::grpc::internal::CallOpGenericRecvMessage,
+ ::grpc::internal::CallOpClientRecvStatus>
finish_ops_;
};
@@ -415,9 +447,10 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
/// where the client-to-server message stream has messages of type \a W,
/// and the server-to-client message stream has messages of type \a R.
template <class W, class R>
-class ClientAsyncReaderWriterInterface : public ClientAsyncStreamingInterface,
- public AsyncWriterInterface<W>,
- public AsyncReaderInterface<R> {
+class ClientAsyncReaderWriterInterface
+ : public internal::ClientAsyncStreamingInterface,
+ public internal::AsyncWriterInterface<W>,
+ public internal::AsyncReaderInterface<R> {
public:
/// Signal the client is done with the writes (half-close the client stream).
/// Thread-safe with respect to \a AsyncReaderInterface::Read
@@ -426,13 +459,9 @@ class ClientAsyncReaderWriterInterface : public ClientAsyncStreamingInterface,
virtual void WritesDone(void* tag) = 0;
};
-/// Async client-side interface for bi-directional streaming,
-/// where the outgoing message stream going to the server
-/// has messages of type \a W, and the incoming message stream coming
-/// from the server has messages of type \a R.
+namespace internal {
template <class W, class R>
-class ClientAsyncReaderWriter final
- : public ClientAsyncReaderWriterInterface<W, R> {
+class ClientAsyncReaderWriterFactory {
public:
/// Create a stream object.
/// Start the RPC request if \a start is set.
@@ -441,18 +470,27 @@ class ClientAsyncReaderWriter final
/// nullptr and the actual call must be initiated by StartCall
/// Note that \a context will be used to fill in custom initial metadata
/// used to send to the server when starting the call.
- static ClientAsyncReaderWriter* Create(ChannelInterface* channel,
- CompletionQueue* cq,
- const RpcMethod& method,
- ClientContext* context, bool start,
- void* tag) {
- Call call = channel->CreateCall(method, context, cq);
+ static ClientAsyncReaderWriter<W, R>* Create(
+ ChannelInterface* channel, CompletionQueue* cq,
+ const ::grpc::internal::RpcMethod& method, ClientContext* context,
+ bool start, void* tag) {
+ ::grpc::internal::Call call = channel->CreateCall(method, context, cq);
return new (g_core_codegen_interface->grpc_call_arena_alloc(
- call.call(), sizeof(ClientAsyncReaderWriter)))
- ClientAsyncReaderWriter(call, context, start, tag);
+ call.call(), sizeof(ClientAsyncReaderWriter<W, R>)))
+ ClientAsyncReaderWriter<W, R>(call, context, start, tag);
}
+};
+} // namespace internal
+/// Async client-side interface for bi-directional streaming,
+/// where the outgoing message stream going to the server
+/// has messages of type \a W, and the incoming message stream coming
+/// from the server has messages of type \a R.
+template <class W, class R>
+class ClientAsyncReaderWriter final
+ : public ClientAsyncReaderWriterInterface<W, R> {
+ public:
// always allocated against a call arena, no memory free required
static void operator delete(void* ptr, std::size_t size) {
assert(size == sizeof(ClientAsyncReaderWriter));
@@ -532,8 +570,9 @@ class ClientAsyncReaderWriter final
}
private:
- ClientAsyncReaderWriter(Call call, ClientContext* context, bool start,
- void* tag)
+ friend class internal::ClientAsyncReaderWriterFactory<W, R>;
+ ClientAsyncReaderWriter(::grpc::internal::Call call, ClientContext* context,
+ bool start, void* tag)
: context_(context), call_(call), started_(start) {
if (start) {
StartCallInternal(tag);
@@ -554,18 +593,26 @@ class ClientAsyncReaderWriter final
}
ClientContext* context_;
- Call call_;
+ ::grpc::internal::Call call_;
bool started_;
- CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
- CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
+ meta_ops_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
+ ::grpc::internal::CallOpRecvMessage<R>>
+ read_ops_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpSendMessage,
+ ::grpc::internal::CallOpClientSendClose>
write_ops_;
- CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
+ ::grpc::internal::CallOpClientRecvStatus>
+ finish_ops_;
};
template <class W, class R>
-class ServerAsyncReaderInterface : public ServerAsyncStreamingInterface,
- public AsyncReaderInterface<R> {
+class ServerAsyncReaderInterface
+ : public internal::ServerAsyncStreamingInterface,
+ public internal::AsyncReaderInterface<R> {
public:
/// Indicate that the stream is to be finished with a certain status code
/// and also send out \a msg response to the client.
@@ -692,20 +739,23 @@ class ServerAsyncReader final : public ServerAsyncReaderInterface<W, R> {
}
private:
- void BindCall(Call* call) override { call_ = *call; }
+ void BindCall(::grpc::internal::Call* call) override { call_ = *call; }
- Call call_;
+ ::grpc::internal::Call call_;
ServerContext* ctx_;
- CallOpSet<CallOpSendInitialMetadata> meta_ops_;
- CallOpSet<CallOpRecvMessage<R>> read_ops_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpServerSendStatus>
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
+ meta_ops_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> read_ops_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpSendMessage,
+ ::grpc::internal::CallOpServerSendStatus>
finish_ops_;
};
template <class W>
-class ServerAsyncWriterInterface : public ServerAsyncStreamingInterface,
- public AsyncWriterInterface<W> {
+class ServerAsyncWriterInterface
+ : public internal::ServerAsyncStreamingInterface,
+ public internal::AsyncWriterInterface<W> {
public:
/// Indicate that the stream is to be finished with a certain status code.
/// Request notification for when the server has sent the appropriate
@@ -823,7 +873,7 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> {
}
private:
- void BindCall(Call* call) override { call_ = *call; }
+ void BindCall(::grpc::internal::Call* call) override { call_ = *call; }
template <class T>
void EnsureInitialMetadataSent(T* ops) {
@@ -837,20 +887,25 @@ class ServerAsyncWriter final : public ServerAsyncWriterInterface<W> {
}
}
- Call call_;
+ ::grpc::internal::Call call_;
ServerContext* ctx_;
- CallOpSet<CallOpSendInitialMetadata> meta_ops_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpServerSendStatus>
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
+ meta_ops_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpSendMessage,
+ ::grpc::internal::CallOpServerSendStatus>
write_ops_;
- CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpServerSendStatus>
+ finish_ops_;
};
/// Server-side interface for asynchronous bi-directional streaming.
template <class W, class R>
-class ServerAsyncReaderWriterInterface : public ServerAsyncStreamingInterface,
- public AsyncWriterInterface<W>,
- public AsyncReaderInterface<R> {
+class ServerAsyncReaderWriterInterface
+ : public internal::ServerAsyncStreamingInterface,
+ public internal::AsyncWriterInterface<W>,
+ public internal::AsyncReaderInterface<R> {
public:
/// Indicate that the stream is to be finished with a certain status code.
/// Request notification for when the server has sent the appropriate
@@ -980,7 +1035,7 @@ class ServerAsyncReaderWriter final
private:
friend class ::grpc::Server;
- void BindCall(Call* call) override { call_ = *call; }
+ void BindCall(::grpc::internal::Call* call) override { call_ = *call; }
template <class T>
void EnsureInitialMetadataSent(T* ops) {
@@ -994,14 +1049,18 @@ class ServerAsyncReaderWriter final
}
}
- Call call_;
+ ::grpc::internal::Call call_;
ServerContext* ctx_;
- CallOpSet<CallOpSendInitialMetadata> meta_ops_;
- CallOpSet<CallOpRecvMessage<R>> read_ops_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpServerSendStatus>
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
+ meta_ops_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvMessage<R>> read_ops_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpSendMessage,
+ ::grpc::internal::CallOpServerSendStatus>
write_ops_;
- CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpServerSendStatus>
+ finish_ops_;
};
} // namespace grpc
diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h
index e472f04f56..6d51c78d5c 100644
--- a/include/grpc++/impl/codegen/async_unary_call.h
+++ b/include/grpc++/impl/codegen/async_unary_call.h
@@ -69,11 +69,9 @@ class ClientAsyncResponseReaderInterface {
virtual void Finish(R* msg, Status* status, void* tag) = 0;
};
-/// Async API for client-side unary RPCs, where the message response
-/// received from the server is of type \a R.
+namespace internal {
template <class R>
-class ClientAsyncResponseReader final
- : public ClientAsyncResponseReaderInterface<R> {
+class ClientAsyncResponseReaderFactory {
public:
/// Start a call and write the request out if \a start is set.
/// \a tag will be notified on \a cq when the call has been started (i.e.
@@ -82,17 +80,24 @@ class ClientAsyncResponseReader final
/// Note that \a context will be used to fill in custom initial metadata
/// used to send to the server when starting the call.
template <class W>
- static ClientAsyncResponseReader* Create(ChannelInterface* channel,
- CompletionQueue* cq,
- const RpcMethod& method,
- ClientContext* context,
- const W& request, bool start) {
- Call call = channel->CreateCall(method, context, cq);
+ static ClientAsyncResponseReader<R>* Create(
+ ChannelInterface* channel, CompletionQueue* cq,
+ const ::grpc::internal::RpcMethod& method, ClientContext* context,
+ const W& request, bool start) {
+ ::grpc::internal::Call call = channel->CreateCall(method, context, cq);
return new (g_core_codegen_interface->grpc_call_arena_alloc(
- call.call(), sizeof(ClientAsyncResponseReader)))
- ClientAsyncResponseReader(call, context, request, start);
+ call.call(), sizeof(ClientAsyncResponseReader<R>)))
+ ClientAsyncResponseReader<R>(call, context, request, start);
}
+};
+} // namespace internal
+/// Async API for client-side unary RPCs, where the message response
+/// received from the server is of type \a R.
+template <class R>
+class ClientAsyncResponseReader final
+ : public ClientAsyncResponseReaderInterface<R> {
+ public:
// always allocated against a call arena, no memory free required
static void operator delete(void* ptr, std::size_t size) {
assert(size == sizeof(ClientAsyncResponseReader));
@@ -137,13 +142,14 @@ class ClientAsyncResponseReader final
}
private:
+ friend class internal::ClientAsyncResponseReaderFactory<R>;
ClientContext* const context_;
- Call call_;
+ ::grpc::internal::Call call_;
bool started_;
template <class W>
- ClientAsyncResponseReader(Call call, ClientContext* context, const W& request,
- bool start)
+ ClientAsyncResponseReader(::grpc::internal::Call call, ClientContext* context,
+ const W& request, bool start)
: context_(context), call_(call), started_(start) {
// Bind the metadata at time of StartCallInternal but set up the rest here
// TODO(ctiller): don't assert
@@ -162,19 +168,23 @@ class ClientAsyncResponseReader final
static void* operator new(std::size_t size);
static void* operator new(std::size_t size, void* p) { return p; }
- SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpClientSendClose>
+ ::grpc::internal::SneakyCallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpSendMessage,
+ ::grpc::internal::CallOpClientSendClose>
init_buf;
- CallOpSet<CallOpRecvInitialMetadata> meta_buf;
- CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>,
- CallOpClientRecvStatus>
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
+ meta_buf;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
+ ::grpc::internal::CallOpRecvMessage<R>,
+ ::grpc::internal::CallOpClientRecvStatus>
finish_buf;
};
/// Async server-side API for handling unary calls, where the single
/// response message sent to the client is of type \a W.
template <class W>
-class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface {
+class ServerAsyncResponseWriter final
+ : public internal::ServerAsyncStreamingInterface {
public:
explicit ServerAsyncResponseWriter(ServerContext* ctx)
: call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
@@ -262,13 +272,15 @@ class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface {
}
private:
- void BindCall(Call* call) override { call_ = *call; }
+ void BindCall(::grpc::internal::Call* call) override { call_ = *call; }
- Call call_;
+ ::grpc::internal::Call call_;
ServerContext* ctx_;
- CallOpSet<CallOpSendInitialMetadata> meta_buf_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpServerSendStatus>
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
+ meta_buf_;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpSendMessage,
+ ::grpc::internal::CallOpServerSendStatus>
finish_buf_;
};
diff --git a/include/grpc++/impl/codegen/byte_buffer.h b/include/grpc++/impl/codegen/byte_buffer.h
index 57d731be18..fe73ce7a83 100644
--- a/include/grpc++/impl/codegen/byte_buffer.h
+++ b/include/grpc++/impl/codegen/byte_buffer.h
@@ -31,18 +31,19 @@
namespace grpc {
+namespace internal {
+class CallOpSendMessage;
template <class R>
class CallOpRecvMessage;
+class CallOpGenericRecvMessage;
class MethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
class RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
class ServerStreamingHandler;
-namespace CallOpGenericRecvMessageHelper {
template <class R>
class DeserializeFuncType;
-} // namespace CallOpGenericRecvMessageHelper
-
+} // namespace internal
/// A sequence of bytes.
class ByteBuffer final {
public:
@@ -97,17 +98,17 @@ class ByteBuffer final {
private:
friend class SerializationTraits<ByteBuffer, void>;
- friend class CallOpSendMessage;
+ friend class internal::CallOpSendMessage;
template <class R>
- friend class CallOpRecvMessage;
- friend class CallOpGenericRecvMessage;
- friend class MethodHandler;
+ friend class internal::CallOpRecvMessage;
+ friend class internal::CallOpGenericRecvMessage;
+ friend class internal::MethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
- friend class RpcMethodHandler;
+ friend class internal::RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
- friend class ServerStreamingHandler;
+ friend class internal::ServerStreamingHandler;
template <class R>
- friend class CallOpGenericRecvMessageHelper::DeserializeFuncType;
+ friend class internal::DeserializeFuncType;
grpc_byte_buffer* buffer_;
diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h
index 06f107fa83..1a988297dc 100644
--- a/include/grpc++/impl/codegen/call.h
+++ b/include/grpc++/impl/codegen/call.h
@@ -43,11 +43,13 @@
namespace grpc {
class ByteBuffer;
-class Call;
-class CallHook;
class CompletionQueue;
extern CoreCodegenInterface* g_core_codegen_interface;
+namespace internal {
+class Call;
+class CallHook;
+
const char kBinaryErrorDetailsKey[] = "grpc-status-details-bin";
// TODO(yangg) if the map is changed before we send, the pointers will be a
@@ -75,6 +77,7 @@ inline grpc_metadata* FillMetadataArray(
}
return metadata_array;
}
+} // namespace internal
/// Per-message write options.
class WriteOptions {
@@ -199,6 +202,7 @@ class WriteOptions {
bool last_message_;
};
+namespace internal {
/// Default argument for CallOpSet. I is unused by the class, but can be
/// used for generating multiple names for the same thing.
template <int I>
@@ -387,7 +391,6 @@ class CallOpRecvMessage {
bool allow_not_getting_message_;
};
-namespace CallOpGenericRecvMessageHelper {
class DeserializeFunc {
public:
virtual Status Deserialize(ByteBuffer* buf) = 0;
@@ -407,7 +410,6 @@ class DeserializeFuncType final : public DeserializeFunc {
private:
R* message_; // Not a managed pointer because management is external to this
};
-} // namespace CallOpGenericRecvMessageHelper
class CallOpGenericRecvMessage {
public:
@@ -418,8 +420,7 @@ class CallOpGenericRecvMessage {
void RecvMessage(R* message) {
// Use an explicit base class pointer to avoid resolution error in the
// following unique_ptr::reset for some old implementations.
- CallOpGenericRecvMessageHelper::DeserializeFunc* func =
- new CallOpGenericRecvMessageHelper::DeserializeFuncType<R>(message);
+ DeserializeFunc* func = new DeserializeFuncType<R>(message);
deserialize_.reset(func);
}
@@ -459,7 +460,7 @@ class CallOpGenericRecvMessage {
}
private:
- std::unique_ptr<CallOpGenericRecvMessageHelper::DeserializeFunc> deserialize_;
+ std::unique_ptr<DeserializeFunc> deserialize_;
ByteBuffer recv_buf_;
bool allow_not_getting_message_;
};
@@ -714,7 +715,7 @@ class Call final {
grpc_call* call_;
int max_receive_message_size_;
};
-
+} // namespace internal
} // namespace grpc
#endif // GRPCXX_IMPL_CODEGEN_CALL_H
diff --git a/include/grpc++/impl/codegen/call_hook.h b/include/grpc++/impl/codegen/call_hook.h
index d026cc8b58..44e9de220e 100644
--- a/include/grpc++/impl/codegen/call_hook.h
+++ b/include/grpc++/impl/codegen/call_hook.h
@@ -21,6 +21,7 @@
namespace grpc {
+namespace internal {
class CallOpSetInterface;
class Call;
@@ -31,6 +32,7 @@ class CallHook {
virtual ~CallHook() {}
virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0;
};
+} // namespace internal
} // namespace grpc
diff --git a/include/grpc++/impl/codegen/channel_interface.h b/include/grpc++/impl/codegen/channel_interface.h
index 1b7590bf0c..769f853974 100644
--- a/include/grpc++/impl/codegen/channel_interface.h
+++ b/include/grpc++/impl/codegen/channel_interface.h
@@ -24,10 +24,8 @@
#include <grpc/impl/codegen/connectivity_state.h>
namespace grpc {
-class Call;
+class ChannelInterface;
class ClientContext;
-class RpcMethod;
-class CallOpSetInterface;
class CompletionQueue;
template <class R>
@@ -36,14 +34,22 @@ template <class W>
class ClientWriter;
template <class W, class R>
class ClientReaderWriter;
+
+namespace internal {
+class Call;
+class CallOpSetInterface;
+class RpcMethod;
+template <class InputMessage, class OutputMessage>
+class BlockingUnaryCallImpl;
template <class R>
-class ClientAsyncReader;
+class ClientAsyncReaderFactory;
template <class W>
-class ClientAsyncWriter;
+class ClientAsyncWriterFactory;
template <class W, class R>
-class ClientAsyncReaderWriter;
+class ClientAsyncReaderWriterFactory;
template <class R>
-class ClientAsyncResponseReader;
+class ClientAsyncResponseReaderFactory;
+} // namespace internal
/// Codegen interface for \a grpc::Channel.
class ChannelInterface {
@@ -88,23 +94,21 @@ class ChannelInterface {
template <class W, class R>
friend class ::grpc::ClientReaderWriter;
template <class R>
- friend class ::grpc::ClientAsyncReader;
+ friend class ::grpc::internal::ClientAsyncReaderFactory;
template <class W>
- friend class ::grpc::ClientAsyncWriter;
+ friend class ::grpc::internal::ClientAsyncWriterFactory;
template <class W, class R>
- friend class ::grpc::ClientAsyncReaderWriter;
+ friend class ::grpc::internal::ClientAsyncReaderWriterFactory;
template <class R>
- friend class ::grpc::ClientAsyncResponseReader;
+ friend class ::grpc::internal::ClientAsyncResponseReaderFactory;
template <class InputMessage, class OutputMessage>
- friend Status BlockingUnaryCall(ChannelInterface* channel,
- const RpcMethod& method,
- ClientContext* context,
- const InputMessage& request,
- OutputMessage* result);
- friend class ::grpc::RpcMethod;
- virtual Call CreateCall(const RpcMethod& method, ClientContext* context,
- CompletionQueue* cq) = 0;
- virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0;
+ friend class ::grpc::internal::BlockingUnaryCallImpl;
+ friend class ::grpc::internal::RpcMethod;
+ virtual internal::Call CreateCall(const internal::RpcMethod& method,
+ ClientContext* context,
+ CompletionQueue* cq) = 0;
+ virtual void PerformOpsOnCall(internal::CallOpSetInterface* ops,
+ internal::Call* call) = 0;
virtual void* RegisterMethod(const char* method) = 0;
virtual void NotifyOnStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline,
@@ -112,7 +116,6 @@ class ChannelInterface {
virtual bool WaitForStateChangeImpl(grpc_connectivity_state last_observed,
gpr_timespec deadline) = 0;
};
-
} // namespace grpc
#endif // GRPCXX_IMPL_CODEGEN_CHANNEL_INTERFACE_H
diff --git a/include/grpc++/impl/codegen/client_context.h b/include/grpc++/impl/codegen/client_context.h
index 6d7e13bbf2..22b581cbc5 100644
--- a/include/grpc++/impl/codegen/client_context.h
+++ b/include/grpc++/impl/codegen/client_context.h
@@ -60,7 +60,16 @@ class Channel;
class ChannelInterface;
class CompletionQueue;
class CallCredentials;
+class ClientContext;
+
+namespace internal {
class RpcMethod;
+class CallOpClientRecvStatus;
+class CallOpRecvInitialMetadata;
+template <class InputMessage, class OutputMessage>
+class BlockingUnaryCallImpl;
+} // namespace internal
+
template <class R>
class ClientReader;
template <class W>
@@ -345,8 +354,8 @@ class ClientContext {
ClientContext& operator=(const ClientContext&);
friend class ::grpc::testing::InteropClientContextInspector;
- friend class CallOpClientRecvStatus;
- friend class CallOpRecvInitialMetadata;
+ friend class ::grpc::internal::CallOpClientRecvStatus;
+ friend class ::grpc::internal::CallOpRecvInitialMetadata;
friend class Channel;
template <class R>
friend class ::grpc::ClientReader;
@@ -363,11 +372,7 @@ class ClientContext {
template <class R>
friend class ::grpc::ClientAsyncResponseReader;
template <class InputMessage, class OutputMessage>
- friend Status BlockingUnaryCall(ChannelInterface* channel,
- const RpcMethod& method,
- ClientContext* context,
- const InputMessage& request,
- OutputMessage* result);
+ friend class ::grpc::internal::BlockingUnaryCallImpl;
grpc_call* call() const { return call_; }
void set_call(grpc_call* call, const std::shared_ptr<Channel>& channel);
@@ -399,8 +404,8 @@ class ClientContext {
mutable std::shared_ptr<const AuthContext> auth_context_;
struct census_context* census_context_;
std::multimap<grpc::string, grpc::string> send_initial_metadata_;
- MetadataMap recv_initial_metadata_;
- MetadataMap trailing_metadata_;
+ internal::MetadataMap recv_initial_metadata_;
+ internal::MetadataMap trailing_metadata_;
grpc_call* propagate_from_call_;
PropagationOptions propagation_options_;
diff --git a/include/grpc++/impl/codegen/client_unary_call.h b/include/grpc++/impl/codegen/client_unary_call.h
index 7c540fade9..170c562cf3 100644
--- a/include/grpc++/impl/codegen/client_unary_call.h
+++ b/include/grpc++/impl/codegen/client_unary_call.h
@@ -30,43 +30,60 @@ namespace grpc {
class Channel;
class ClientContext;
class CompletionQueue;
-class RpcMethod;
+namespace internal {
+class RpcMethod;
/// Wrapper that performs a blocking unary call
template <class InputMessage, class OutputMessage>
Status BlockingUnaryCall(ChannelInterface* channel, const RpcMethod& method,
ClientContext* context, const InputMessage& request,
OutputMessage* result) {
- CompletionQueue cq(grpc_completion_queue_attributes{
- GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK,
- GRPC_CQ_DEFAULT_POLLING}); // Pluckable completion queue
- Call call(channel->CreateCall(method, context, &cq));
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpRecvInitialMetadata, CallOpRecvMessage<OutputMessage>,
- CallOpClientSendClose, CallOpClientRecvStatus>
- ops;
- Status status = ops.SendMessage(request);
- if (!status.ok()) {
- return status;
- }
- ops.SendInitialMetadata(context->send_initial_metadata_,
- context->initial_metadata_flags());
- ops.RecvInitialMetadata(context);
- ops.RecvMessage(result);
- ops.ClientSendClose();
- ops.ClientRecvStatus(context, &status);
- call.PerformOps(&ops);
- if (cq.Pluck(&ops)) {
- if (!ops.got_message && status.ok()) {
- return Status(StatusCode::UNIMPLEMENTED,
- "No message returned for unary request");
+ return BlockingUnaryCallImpl<InputMessage, OutputMessage>(
+ channel, method, context, request, result)
+ .status();
+};
+
+template <class InputMessage, class OutputMessage>
+class BlockingUnaryCallImpl {
+ public:
+ BlockingUnaryCallImpl(ChannelInterface* channel, const RpcMethod& method,
+ ClientContext* context, const InputMessage& request,
+ OutputMessage* result) {
+ CompletionQueue cq(grpc_completion_queue_attributes{
+ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK,
+ GRPC_CQ_DEFAULT_POLLING}); // Pluckable completion queue
+ Call call(channel->CreateCall(method, context, &cq));
+ CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpRecvInitialMetadata, CallOpRecvMessage<OutputMessage>,
+ CallOpClientSendClose, CallOpClientRecvStatus>
+ ops;
+ status_ = ops.SendMessage(request);
+ if (!status_.ok()) {
+ return;
+ }
+ ops.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ ops.RecvInitialMetadata(context);
+ ops.RecvMessage(result);
+ ops.ClientSendClose();
+ ops.ClientRecvStatus(context, &status_);
+ call.PerformOps(&ops);
+ if (cq.Pluck(&ops)) {
+ if (!ops.got_message && status_.ok()) {
+ status_ = Status(StatusCode::UNIMPLEMENTED,
+ "No message returned for unary request");
+ }
+ } else {
+ GPR_CODEGEN_ASSERT(!status_.ok());
}
- } else {
- GPR_CODEGEN_ASSERT(!status.ok());
}
- return status;
-}
+ Status status() { return status_; }
+
+ private:
+ Status status_;
+};
+} // namespace internal
} // namespace grpc
#endif // GRPCXX_IMPL_CODEGEN_CLIENT_UNARY_CALL_H
diff --git a/include/grpc++/impl/codegen/completion_queue.h b/include/grpc++/impl/codegen/completion_queue.h
index e2c0c29dca..14de30a62e 100644
--- a/include/grpc++/impl/codegen/completion_queue.h
+++ b/include/grpc++/impl/codegen/completion_queue.h
@@ -56,7 +56,19 @@ class ServerWriter;
namespace internal {
template <class W, class R>
class ServerReaderWriterBody;
-}
+} // namespace internal
+
+class Channel;
+class ChannelInterface;
+class ClientContext;
+class CompletionQueue;
+class Server;
+class ServerBuilder;
+class ServerContext;
+
+namespace internal {
+class CompletionQueueTag;
+class RpcMethod;
template <class ServiceType, class RequestType, class ResponseType>
class RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
@@ -66,16 +78,11 @@ class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler;
class UnknownMethodHandler;
-
-class Channel;
-class ChannelInterface;
-class ClientContext;
-class CompletionQueueTag;
-class CompletionQueue;
-class RpcMethod;
-class Server;
-class ServerBuilder;
-class ServerContext;
+template <class Streamer, bool WriteNeeded>
+class TemplatedBidiStreamingHandler;
+template <class InputMessage, class OutputMessage>
+class BlockingUnaryCallImpl;
+} // namespace internal
extern CoreCodegenInterface* g_core_codegen_interface;
@@ -220,22 +227,18 @@ class CompletionQueue : private GrpcLibraryCodegen {
template <class W, class R>
friend class ::grpc::internal::ServerReaderWriterBody;
template <class ServiceType, class RequestType, class ResponseType>
- friend class RpcMethodHandler;
+ friend class ::grpc::internal::RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
- friend class ClientStreamingHandler;
+ friend class ::grpc::internal::ClientStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
- friend class ServerStreamingHandler;
+ friend class ::grpc::internal::ServerStreamingHandler;
template <class Streamer, bool WriteNeeded>
- friend class TemplatedBidiStreamingHandler;
- friend class UnknownMethodHandler;
+ friend class ::grpc::internal::TemplatedBidiStreamingHandler;
+ friend class ::grpc::internal::UnknownMethodHandler;
friend class ::grpc::Server;
friend class ::grpc::ServerContext;
template <class InputMessage, class OutputMessage>
- friend Status BlockingUnaryCall(ChannelInterface* channel,
- const RpcMethod& method,
- ClientContext* context,
- const InputMessage& request,
- OutputMessage* result);
+ friend class ::grpc::internal::BlockingUnaryCallImpl;
/// EXPERIMENTAL
/// Creates a Thread Local cache to store the first event
@@ -256,7 +259,7 @@ class CompletionQueue : private GrpcLibraryCodegen {
/// Wraps \a grpc_completion_queue_pluck.
/// \warning Must not be mixed with calls to \a Next.
- bool Pluck(CompletionQueueTag* tag) {
+ bool Pluck(internal::CompletionQueueTag* tag) {
auto deadline =
g_core_codegen_interface->gpr_inf_future(GPR_CLOCK_REALTIME);
auto ev = g_core_codegen_interface->grpc_completion_queue_pluck(
@@ -277,7 +280,7 @@ class CompletionQueue : private GrpcLibraryCodegen {
/// implementation to simple call the other TryPluck function with a zero
/// timeout. i.e:
/// TryPluck(tag, gpr_time_0(GPR_CLOCK_REALTIME))
- void TryPluck(CompletionQueueTag* tag) {
+ void TryPluck(internal::CompletionQueueTag* tag) {
auto deadline = g_core_codegen_interface->gpr_time_0(GPR_CLOCK_REALTIME);
auto ev = g_core_codegen_interface->grpc_completion_queue_pluck(
cq_, tag, deadline, nullptr);
@@ -293,7 +296,7 @@ class CompletionQueue : private GrpcLibraryCodegen {
///
/// This exects tag->FinalizeResult (if called) to return 'false' i.e expects
/// that the tag is internal not something that is returned to the user.
- void TryPluck(CompletionQueueTag* tag, gpr_timespec deadline) {
+ void TryPluck(internal::CompletionQueueTag* tag, gpr_timespec deadline) {
auto ev = g_core_codegen_interface->grpc_completion_queue_pluck(
cq_, tag, deadline, nullptr);
if (ev.type == GRPC_QUEUE_TIMEOUT || ev.type == GRPC_QUEUE_SHUTDOWN) {
diff --git a/include/grpc++/impl/codegen/completion_queue_tag.h b/include/grpc++/impl/codegen/completion_queue_tag.h
index 4d7d3a98dd..cb16bcf9ff 100644
--- a/include/grpc++/impl/codegen/completion_queue_tag.h
+++ b/include/grpc++/impl/codegen/completion_queue_tag.h
@@ -21,6 +21,7 @@
namespace grpc {
+namespace internal {
/// An interface allowing implementors to process and filter event tags.
class CompletionQueueTag {
public:
@@ -31,6 +32,7 @@ class CompletionQueueTag {
/// queue
virtual bool FinalizeResult(void** tag, bool* status) = 0;
};
+} // namespace internal
} // namespace grpc
diff --git a/include/grpc++/impl/codegen/metadata_map.h b/include/grpc++/impl/codegen/metadata_map.h
index b73985967d..fd4750efdd 100644
--- a/include/grpc++/impl/codegen/metadata_map.h
+++ b/include/grpc++/impl/codegen/metadata_map.h
@@ -23,6 +23,7 @@
namespace grpc {
+namespace internal {
class MetadataMap {
public:
MetadataMap() { memset(&arr_, 0, sizeof(arr_)); }
@@ -50,6 +51,7 @@ class MetadataMap {
grpc_metadata_array arr_;
std::multimap<grpc::string_ref, grpc::string_ref> map_;
};
+} // namespace internal
} // namespace grpc
diff --git a/include/grpc++/impl/codegen/method_handler_impl.h b/include/grpc++/impl/codegen/method_handler_impl.h
index e14cb0e926..c0af4ca130 100644
--- a/include/grpc++/impl/codegen/method_handler_impl.h
+++ b/include/grpc++/impl/codegen/method_handler_impl.h
@@ -26,6 +26,7 @@
namespace grpc {
+namespace internal {
/// A wrapper class of an application provided rpc method handler.
template <class ServiceType, class RequestType, class ResponseType>
class RpcMethodHandler : public MethodHandler {
@@ -266,6 +267,7 @@ class UnknownMethodHandler : public MethodHandler {
}
};
+} // namespace internal
} // namespace grpc
#endif // GRPCXX_IMPL_CODEGEN_METHOD_HANDLER_IMPL_H
diff --git a/include/grpc++/impl/codegen/rpc_method.h b/include/grpc++/impl/codegen/rpc_method.h
index ac13ac56c7..54e52364ef 100644
--- a/include/grpc++/impl/codegen/rpc_method.h
+++ b/include/grpc++/impl/codegen/rpc_method.h
@@ -24,7 +24,7 @@
#include <grpc++/impl/codegen/channel_interface.h>
namespace grpc {
-
+namespace internal {
/// Descriptor of an RPC method
class RpcMethod {
public:
@@ -55,6 +55,7 @@ class RpcMethod {
void* const channel_tag_;
};
+} // namespace internal
} // namespace grpc
#endif // GRPCXX_IMPL_CODEGEN_RPC_METHOD_H
diff --git a/include/grpc++/impl/codegen/rpc_service_method.h b/include/grpc++/impl/codegen/rpc_service_method.h
index d356012ad6..5ba11e8559 100644
--- a/include/grpc++/impl/codegen/rpc_service_method.h
+++ b/include/grpc++/impl/codegen/rpc_service_method.h
@@ -32,8 +32,8 @@
namespace grpc {
class ServerContext;
-class StreamContextInterface;
+namespace internal {
/// Base class for running an RPC handler.
class MethodHandler {
public:
@@ -71,6 +71,7 @@ class RpcServiceMethod : public RpcMethod {
void* server_tag_;
std::unique_ptr<MethodHandler> handler_;
};
+} // namespace internal
} // namespace grpc
diff --git a/include/grpc++/impl/codegen/server_context.h b/include/grpc++/impl/codegen/server_context.h
index b5e37fd12b..a2d6967bf8 100644
--- a/include/grpc++/impl/codegen/server_context.h
+++ b/include/grpc++/impl/codegen/server_context.h
@@ -55,7 +55,6 @@ class ServerWriter;
namespace internal {
template <class W, class R>
class ServerReaderWriterBody;
-}
template <class ServiceType, class RequestType, class ResponseType>
class RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
@@ -65,9 +64,11 @@ class ServerStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
class BidiStreamingHandler;
class UnknownMethodHandler;
-
+template <class Streamer, bool WriteNeeded>
+class TemplatedBidiStreamingHandler;
class Call;
-class CallOpBuffer;
+} // namespace internal
+
class CompletionQueue;
class Server;
class ServerInterface;
@@ -247,14 +248,14 @@ class ServerContext {
template <class W, class R>
friend class ::grpc::internal::ServerReaderWriterBody;
template <class ServiceType, class RequestType, class ResponseType>
- friend class RpcMethodHandler;
+ friend class ::grpc::internal::RpcMethodHandler;
template <class ServiceType, class RequestType, class ResponseType>
- friend class ClientStreamingHandler;
+ friend class ::grpc::internal::ClientStreamingHandler;
template <class ServiceType, class RequestType, class ResponseType>
- friend class ServerStreamingHandler;
+ friend class ::grpc::internal::ServerStreamingHandler;
template <class Streamer, bool WriteNeeded>
- friend class TemplatedBidiStreamingHandler;
- friend class UnknownMethodHandler;
+ friend class ::grpc::internal::TemplatedBidiStreamingHandler;
+ friend class ::grpc::internal::UnknownMethodHandler;
friend class ::grpc::ClientContext;
/// Prevent copying.
@@ -263,9 +264,9 @@ class ServerContext {
class CompletionOp;
- void BeginCompletionOp(Call* call);
+ void BeginCompletionOp(internal::Call* call);
/// Return the tag queued by BeginCompletionOp()
- CompletionQueueTag* GetCompletionOpTag();
+ internal::CompletionQueueTag* GetCompletionOpTag();
ServerContext(gpr_timespec deadline, grpc_metadata_array* arr);
@@ -282,7 +283,7 @@ class ServerContext {
CompletionQueue* cq_;
bool sent_initial_metadata_;
mutable std::shared_ptr<const AuthContext> auth_context_;
- MetadataMap client_metadata_;
+ internal::MetadataMap client_metadata_;
std::multimap<grpc::string, grpc::string> initial_metadata_;
std::multimap<grpc::string, grpc::string> trailing_metadata_;
@@ -290,7 +291,9 @@ class ServerContext {
grpc_compression_level compression_level_;
grpc_compression_algorithm compression_algorithm_;
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> pending_ops_;
+ internal::CallOpSet<internal::CallOpSendInitialMetadata,
+ internal::CallOpSendMessage>
+ pending_ops_;
bool has_pending_ops_;
};
diff --git a/include/grpc++/impl/codegen/server_interface.h b/include/grpc++/impl/codegen/server_interface.h
index 55937f19d7..3bcf4c87e7 100644
--- a/include/grpc++/impl/codegen/server_interface.h
+++ b/include/grpc++/impl/codegen/server_interface.h
@@ -30,20 +30,21 @@ namespace grpc {
class AsyncGenericService;
class Channel;
class GenericServerContext;
-class RpcService;
-class ServerAsyncStreamingInterface;
class ServerCompletionQueue;
class ServerContext;
class ServerCredentials;
class Service;
-class ThreadPoolInterface;
extern CoreCodegenInterface* g_core_codegen_interface;
/// Models a gRPC server.
///
/// Servers are configured and started via \a grpc::ServerBuilder.
-class ServerInterface : public CallHook {
+namespace internal {
+class ServerAsyncStreamingInterface;
+} // namespace internal
+
+class ServerInterface : public internal::CallHook {
public:
virtual ~ServerInterface() {}
@@ -78,7 +79,7 @@ class ServerInterface : public CallHook {
virtual void Wait() = 0;
protected:
- friend class Service;
+ friend class ::grpc::Service;
/// Register a service. This call does not take ownership of the service.
/// The service must exist for the lifetime of the Server instance.
@@ -116,12 +117,13 @@ class ServerInterface : public CallHook {
virtual grpc_server* server() = 0;
- virtual void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) = 0;
+ virtual void PerformOpsOnCall(internal::CallOpSetInterface* ops,
+ internal::Call* call) = 0;
- class BaseAsyncRequest : public CompletionQueueTag {
+ class BaseAsyncRequest : public internal::CompletionQueueTag {
public:
BaseAsyncRequest(ServerInterface* server, ServerContext* context,
- ServerAsyncStreamingInterface* stream,
+ internal::ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq, void* tag,
bool delete_on_finalize);
virtual ~BaseAsyncRequest();
@@ -131,7 +133,7 @@ class ServerInterface : public CallHook {
protected:
ServerInterface* const server_;
ServerContext* const context_;
- ServerAsyncStreamingInterface* const stream_;
+ internal::ServerAsyncStreamingInterface* const stream_;
CompletionQueue* const call_cq_;
void* const tag_;
const bool delete_on_finalize_;
@@ -141,7 +143,7 @@ class ServerInterface : public CallHook {
class RegisteredAsyncRequest : public BaseAsyncRequest {
public:
RegisteredAsyncRequest(ServerInterface* server, ServerContext* context,
- ServerAsyncStreamingInterface* stream,
+ internal::ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq, void* tag);
// uses BaseAsyncRequest::FinalizeResult
@@ -155,7 +157,7 @@ class ServerInterface : public CallHook {
public:
NoPayloadAsyncRequest(void* registered_method, ServerInterface* server,
ServerContext* context,
- ServerAsyncStreamingInterface* stream,
+ internal::ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag)
: RegisteredAsyncRequest(server, context, stream, call_cq, tag) {
@@ -170,7 +172,7 @@ class ServerInterface : public CallHook {
public:
PayloadAsyncRequest(void* registered_method, ServerInterface* server,
ServerContext* context,
- ServerAsyncStreamingInterface* stream,
+ internal::ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag,
Message* request)
@@ -212,7 +214,7 @@ class ServerInterface : public CallHook {
void* const registered_method_;
ServerInterface* const server_;
ServerContext* const context_;
- ServerAsyncStreamingInterface* const stream_;
+ internal::ServerAsyncStreamingInterface* const stream_;
CompletionQueue* const call_cq_;
ServerCompletionQueue* const notification_cq_;
void* const tag_;
@@ -223,7 +225,7 @@ class ServerInterface : public CallHook {
class GenericAsyncRequest : public BaseAsyncRequest {
public:
GenericAsyncRequest(ServerInterface* server, GenericServerContext* context,
- ServerAsyncStreamingInterface* stream,
+ internal::ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag,
bool delete_on_finalize);
@@ -235,8 +237,9 @@ class ServerInterface : public CallHook {
};
template <class Message>
- void RequestAsyncCall(RpcServiceMethod* method, ServerContext* context,
- ServerAsyncStreamingInterface* stream,
+ void RequestAsyncCall(internal::RpcServiceMethod* method,
+ ServerContext* context,
+ internal::ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag,
Message* message) {
@@ -246,8 +249,9 @@ class ServerInterface : public CallHook {
message);
}
- void RequestAsyncCall(RpcServiceMethod* method, ServerContext* context,
- ServerAsyncStreamingInterface* stream,
+ void RequestAsyncCall(internal::RpcServiceMethod* method,
+ ServerContext* context,
+ internal::ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) {
GPR_CODEGEN_ASSERT(method);
@@ -256,7 +260,7 @@ class ServerInterface : public CallHook {
}
void RequestAsyncGenericCall(GenericServerContext* context,
- ServerAsyncStreamingInterface* stream,
+ internal::ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq,
void* tag) {
diff --git a/include/grpc++/impl/codegen/service_type.h b/include/grpc++/impl/codegen/service_type.h
index 2dc4ea0ea6..71c3d99d5c 100644
--- a/include/grpc++/impl/codegen/service_type.h
+++ b/include/grpc++/impl/codegen/service_type.h
@@ -28,13 +28,14 @@
namespace grpc {
-class Call;
class CompletionQueue;
class Server;
class ServerInterface;
class ServerCompletionQueue;
class ServerContext;
+namespace internal {
+class Call;
class ServerAsyncStreamingInterface {
public:
virtual ~ServerAsyncStreamingInterface() {}
@@ -48,9 +49,10 @@ class ServerAsyncStreamingInterface {
virtual void SendInitialMetadata(void* tag) = 0;
private:
- friend class ServerInterface;
+ friend class ::grpc::ServerInterface;
virtual void BindCall(Call* call) = 0;
};
+} // namespace internal
/// Desriptor of an RPC service and its various RPC methods
class Service {
@@ -88,40 +90,38 @@ class Service {
protected:
template <class Message>
void RequestAsyncUnary(int index, ServerContext* context, Message* request,
- ServerAsyncStreamingInterface* stream,
+ internal::ServerAsyncStreamingInterface* stream,
CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag) {
server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq,
notification_cq, tag, request);
}
- void RequestAsyncClientStreaming(int index, ServerContext* context,
- ServerAsyncStreamingInterface* stream,
- CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq,
- void* tag) {
+ void RequestAsyncClientStreaming(
+ int index, ServerContext* context,
+ internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag) {
server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq,
notification_cq, tag);
}
template <class Message>
- void RequestAsyncServerStreaming(int index, ServerContext* context,
- Message* request,
- ServerAsyncStreamingInterface* stream,
- CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq,
- void* tag) {
+ void RequestAsyncServerStreaming(
+ int index, ServerContext* context, Message* request,
+ internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag) {
server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq,
notification_cq, tag, request);
}
- void RequestAsyncBidiStreaming(int index, ServerContext* context,
- ServerAsyncStreamingInterface* stream,
- CompletionQueue* call_cq,
- ServerCompletionQueue* notification_cq,
- void* tag) {
+ void RequestAsyncBidiStreaming(
+ int index, ServerContext* context,
+ internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ ServerCompletionQueue* notification_cq, void* tag) {
server_->RequestAsyncCall(methods_[index].get(), context, stream, call_cq,
notification_cq, tag);
}
- void AddMethod(RpcServiceMethod* method) { methods_.emplace_back(method); }
+ void AddMethod(internal::RpcServiceMethod* method) {
+ methods_.emplace_back(method);
+ }
void MarkMethodAsync(int index) {
GPR_CODEGEN_ASSERT(
@@ -139,7 +139,7 @@ class Service {
methods_[index].reset();
}
- void MarkMethodStreamed(int index, MethodHandler* streamed_method) {
+ void MarkMethodStreamed(int index, internal::MethodHandler* streamed_method) {
GPR_CODEGEN_ASSERT(methods_[index] && methods_[index]->handler() &&
"Cannot mark an async or generic method Streamed");
methods_[index]->SetHandler(streamed_method);
@@ -148,14 +148,14 @@ class Service {
// case of BIDI_STREAMING that has 1 read and 1 write, in that order,
// and split server-side streaming is BIDI_STREAMING with 1 read and
// any number of writes, in that order.
- methods_[index]->SetMethodType(::grpc::RpcMethod::BIDI_STREAMING);
+ methods_[index]->SetMethodType(internal::RpcMethod::BIDI_STREAMING);
}
private:
friend class Server;
friend class ServerInterface;
ServerInterface* server_;
- std::vector<std::unique_ptr<RpcServiceMethod>> methods_;
+ std::vector<std::unique_ptr<internal::RpcServiceMethod>> methods_;
};
} // namespace grpc
diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h
index c1784f1820..a6dd26fb00 100644
--- a/include/grpc++/impl/codegen/sync_stream.h
+++ b/include/grpc++/impl/codegen/sync_stream.h
@@ -30,6 +30,7 @@
namespace grpc {
+namespace internal {
/// Common interface for all synchronous client side streaming.
class ClientStreamingInterface {
public:
@@ -141,10 +142,12 @@ class WriterInterface {
}
};
+} // namespace internal
+
/// Client-side interface for streaming reads of message of type \a R.
template <class R>
-class ClientReaderInterface : public ClientStreamingInterface,
- public ReaderInterface<R> {
+class ClientReaderInterface : public internal::ClientStreamingInterface,
+ public internal::ReaderInterface<R> {
public:
/// Block to wait for initial metadata from server. The received metadata
/// can only be accessed after this call returns. Should only be called before
@@ -153,35 +156,25 @@ class ClientReaderInterface : public ClientStreamingInterface,
virtual void WaitForInitialMetadata() = 0;
};
+namespace internal {
+template <class R>
+class ClientReaderFactory {
+ public:
+ template <class W>
+ static ClientReader<R>* Create(ChannelInterface* channel,
+ const ::grpc::internal::RpcMethod& method,
+ ClientContext* context, const W& request) {
+ return new ClientReader<R>(channel, method, context, request);
+ }
+};
+} // namespace internal
+
/// Synchronous (blocking) client-side API for doing server-streaming RPCs,
/// where the stream of messages coming from the server has messages
/// of type \a R.
template <class R>
class ClientReader final : public ClientReaderInterface<R> {
public:
- /// Block to create a stream and write the initial metadata and \a request
- /// out. Note that \a context will be used to fill in custom initial
- /// metadata used to send to the server when starting the call.
- template <class W>
- ClientReader(ChannelInterface* channel, const RpcMethod& method,
- ClientContext* context, const W& request)
- : context_(context),
- cq_(grpc_completion_queue_attributes{
- GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK,
- GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq
- call_(channel->CreateCall(method, context, &cq_)) {
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpClientSendClose>
- ops;
- ops.SendInitialMetadata(context->send_initial_metadata_,
- context->initial_metadata_flags());
- // TODO(ctiller): don't assert
- GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok());
- ops.ClientSendClose();
- call_.PerformOps(&ops);
- cq_.Pluck(&ops);
- }
-
/// See the \a ClientStreamingInterface.WaitForInitialMetadata method for
/// semantics.
///
@@ -192,7 +185,8 @@ class ClientReader final : public ClientReaderInterface<R> {
void WaitForInitialMetadata() override {
GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
- CallOpSet<CallOpRecvInitialMetadata> ops;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
+ ops;
ops.RecvInitialMetadata(context_);
call_.PerformOps(&ops);
cq_.Pluck(&ops); /// status ignored
@@ -209,7 +203,9 @@ class ClientReader final : public ClientReaderInterface<R> {
/// already received (if initial metadata is received, it can be then
/// accessed through the \a ClientContext associated with this call).
bool Read(R* msg) override {
- CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
+ ::grpc::internal::CallOpRecvMessage<R>>
+ ops;
if (!context_->initial_metadata_received_) {
ops.RecvInitialMetadata(context_);
}
@@ -224,7 +220,7 @@ class ClientReader final : public ClientReaderInterface<R> {
/// The \a ClientContext associated with this call is updated with
/// possible metadata received from the server.
Status Finish() override {
- CallOpSet<CallOpClientRecvStatus> ops;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientRecvStatus> ops;
Status status;
ops.ClientRecvStatus(context_, &status);
call_.PerformOps(&ops);
@@ -233,15 +229,41 @@ class ClientReader final : public ClientReaderInterface<R> {
}
private:
+ friend class internal::ClientReaderFactory<R>;
ClientContext* context_;
CompletionQueue cq_;
- Call call_;
+ ::grpc::internal::Call call_;
+
+ /// Block to create a stream and write the initial metadata and \a request
+ /// out. Note that \a context will be used to fill in custom initial
+ /// metadata used to send to the server when starting the call.
+ template <class W>
+ ClientReader(::grpc::ChannelInterface* channel,
+ const ::grpc::internal::RpcMethod& method,
+ ClientContext* context, const W& request)
+ : context_(context),
+ cq_(grpc_completion_queue_attributes{
+ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK,
+ GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq
+ call_(channel->CreateCall(method, context, &cq_)) {
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpSendMessage,
+ ::grpc::internal::CallOpClientSendClose>
+ ops;
+ ops.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ // TODO(ctiller): don't assert
+ GPR_CODEGEN_ASSERT(ops.SendMessage(request).ok());
+ ops.ClientSendClose();
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops);
+ }
};
/// Client-side interface for streaming writes of message type \a W.
template <class W>
-class ClientWriterInterface : public ClientStreamingInterface,
- public WriterInterface<W> {
+class ClientWriterInterface : public internal::ClientStreamingInterface,
+ public internal::WriterInterface<W> {
public:
/// Half close writing from the client. (signal that the stream of messages
/// coming from the client is complete).
@@ -252,37 +274,25 @@ class ClientWriterInterface : public ClientStreamingInterface,
virtual bool WritesDone() = 0;
};
+namespace internal {
+template <class W>
+class ClientWriterFactory {
+ public:
+ template <class R>
+ static ClientWriter<W>* Create(::grpc::ChannelInterface* channel,
+ const ::grpc::internal::RpcMethod& method,
+ ClientContext* context, R* response) {
+ return new ClientWriter<W>(channel, method, context, response);
+ }
+};
+} // namespace internal
+
/// Synchronous (blocking) client-side API for doing client-streaming RPCs,
/// where the outgoing message stream coming from the client has messages of
/// type \a W.
template <class W>
class ClientWriter : public ClientWriterInterface<W> {
public:
- /// Block to create a stream (i.e. send request headers and other initial
- /// metadata to the server). Note that \a context will be used to fill
- /// in custom initial metadata. \a response will be filled in with the
- /// single expected response message from the server upon a successful
- /// call to the \a Finish method of this instance.
- template <class R>
- ClientWriter(ChannelInterface* channel, const RpcMethod& method,
- ClientContext* context, R* response)
- : context_(context),
- cq_(grpc_completion_queue_attributes{
- GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK,
- GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq
- call_(channel->CreateCall(method, context, &cq_)) {
- finish_ops_.RecvMessage(response);
- finish_ops_.AllowNoMessage();
-
- if (!context_->initial_metadata_corked_) {
- CallOpSet<CallOpSendInitialMetadata> ops;
- ops.SendInitialMetadata(context->send_initial_metadata_,
- context->initial_metadata_flags());
- call_.PerformOps(&ops);
- cq_.Pluck(&ops);
- }
- }
-
/// See the \a ClientStreamingInterface.WaitForInitialMetadata method for
/// semantics.
///
@@ -292,7 +302,8 @@ class ClientWriter : public ClientWriterInterface<W> {
void WaitForInitialMetadata() {
GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
- CallOpSet<CallOpRecvInitialMetadata> ops;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
+ ops;
ops.RecvInitialMetadata(context_);
call_.PerformOps(&ops);
cq_.Pluck(&ops); // status ignored
@@ -304,10 +315,11 @@ class ClientWriter : public ClientWriterInterface<W> {
/// Side effect:
/// Also sends initial metadata if not already sent (using the
/// \a ClientContext associated with this call).
- using WriterInterface<W>::Write;
+ using ::grpc::internal::WriterInterface<W>::Write;
bool Write(const W& msg, WriteOptions options) override {
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpClientSendClose>
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpSendMessage,
+ ::grpc::internal::CallOpClientSendClose>
ops;
if (options.is_last_message()) {
@@ -328,7 +340,7 @@ class ClientWriter : public ClientWriterInterface<W> {
}
bool WritesDone() override {
- CallOpSet<CallOpClientSendClose> ops;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops;
ops.ClientSendClose();
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
@@ -352,21 +364,51 @@ class ClientWriter : public ClientWriterInterface<W> {
}
private:
+ friend class internal::ClientWriterFactory<W>;
+
+ /// Block to create a stream (i.e. send request headers and other initial
+ /// metadata to the server). Note that \a context will be used to fill
+ /// in custom initial metadata. \a response will be filled in with the
+ /// single expected response message from the server upon a successful
+ /// call to the \a Finish method of this instance.
+ template <class R>
+ ClientWriter(ChannelInterface* channel,
+ const ::grpc::internal::RpcMethod& method,
+ ClientContext* context, R* response)
+ : context_(context),
+ cq_(grpc_completion_queue_attributes{
+ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK,
+ GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq
+ call_(channel->CreateCall(method, context, &cq_)) {
+ finish_ops_.RecvMessage(response);
+ finish_ops_.AllowNoMessage();
+
+ if (!context_->initial_metadata_corked_) {
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
+ ops;
+ ops.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops);
+ }
+ }
+
ClientContext* context_;
- CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage,
- CallOpClientRecvStatus>
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
+ ::grpc::internal::CallOpGenericRecvMessage,
+ ::grpc::internal::CallOpClientRecvStatus>
finish_ops_;
CompletionQueue cq_;
- Call call_;
+ ::grpc::internal::Call call_;
};
/// Client-side interface for bi-directional streaming with
/// client-to-server stream messages of type \a W and
/// server-to-client stream messages of type \a R.
template <class W, class R>
-class ClientReaderWriterInterface : public ClientStreamingInterface,
- public WriterInterface<W>,
- public ReaderInterface<R> {
+class ClientReaderWriterInterface : public internal::ClientStreamingInterface,
+ public internal::WriterInterface<W>,
+ public internal::ReaderInterface<R> {
public:
/// Block to wait for initial metadata from server. The received metadata
/// can only be accessed after this call returns. Should only be called before
@@ -375,7 +417,7 @@ class ClientReaderWriterInterface : public ClientStreamingInterface,
virtual void WaitForInitialMetadata() = 0;
/// Half close writing from the client. (signal that the stream of messages
- /// coming from the client is complete).
+ /// coming from the clinet is complete).
/// Blocks until currently-pending writes are completed.
/// Thread-safe with respect to \a ReaderInterface::Read
///
@@ -383,6 +425,18 @@ class ClientReaderWriterInterface : public ClientStreamingInterface,
virtual bool WritesDone() = 0;
};
+namespace internal {
+template <class W, class R>
+class ClientReaderWriterFactory {
+ public:
+ static ClientReaderWriter<W, R>* Create(
+ ::grpc::ChannelInterface* channel,
+ const ::grpc::internal::RpcMethod& method, ClientContext* context) {
+ return new ClientReaderWriter<W, R>(channel, method, context);
+ }
+};
+} // namespace internal
+
/// Synchronous (blocking) client-side API for bi-directional streaming RPCs,
/// where the outgoing message stream coming from the client has messages of
/// type \a W, and the incoming messages stream coming from the server has
@@ -390,25 +444,6 @@ class ClientReaderWriterInterface : public ClientStreamingInterface,
template <class W, class R>
class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
public:
- /// Block to create a stream and write the initial metadata and \a request
- /// out. Note that \a context will be used to fill in custom initial metadata
- /// used to send to the server when starting the call.
- ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method,
- ClientContext* context)
- : context_(context),
- cq_(grpc_completion_queue_attributes{
- GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK,
- GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq
- call_(channel->CreateCall(method, context, &cq_)) {
- if (!context_->initial_metadata_corked_) {
- CallOpSet<CallOpSendInitialMetadata> ops;
- ops.SendInitialMetadata(context->send_initial_metadata_,
- context->initial_metadata_flags());
- call_.PerformOps(&ops);
- cq_.Pluck(&ops);
- }
- }
-
/// Block waiting to read initial metadata from the server.
/// This call is optional, but if it is used, it cannot be used concurrently
/// with or after the \a Finish method.
@@ -418,7 +453,8 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
void WaitForInitialMetadata() override {
GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
- CallOpSet<CallOpRecvInitialMetadata> ops;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata>
+ ops;
ops.RecvInitialMetadata(context_);
call_.PerformOps(&ops);
cq_.Pluck(&ops); // status ignored
@@ -434,7 +470,9 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
/// Also receives initial metadata if not already received (updates the \a
/// ClientContext associated with this call in that case).
bool Read(R* msg) override {
- CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
+ ::grpc::internal::CallOpRecvMessage<R>>
+ ops;
if (!context_->initial_metadata_received_) {
ops.RecvInitialMetadata(context_);
}
@@ -448,10 +486,11 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
/// Side effect:
/// Also sends initial metadata if not already sent (using the
/// \a ClientContext associated with this call to fill in values).
- using WriterInterface<W>::Write;
+ using ::grpc::internal::WriterInterface<W>::Write;
bool Write(const W& msg, WriteOptions options) override {
- CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpClientSendClose>
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata,
+ ::grpc::internal::CallOpSendMessage,
+ ::grpc::internal::CallOpClientSendClose>
ops;
if (options.is_last_message()) {
@@ -472,7 +511,7 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
}
bool WritesDone() override {
- CallOpSet<CallOpClientSendClose> ops;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpClientSendClose> ops;
ops.ClientSendClose();
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
@@ -484,7 +523,9 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
/// - the \a ClientContext associated with this call is updated with
/// possible trailing metadata sent from the server.
Status Finish() override {
- CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> ops;
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpRecvInitialMetadata,
+ ::grpc::internal::CallOpClientRecvStatus>
+ ops;
if (!context_->initial_metadata_received_) {
ops.RecvInitialMetadata(context_);
}
@@ -496,15 +537,38 @@ class ClientReaderWriter final : public ClientReaderWriterInterface<W, R> {
}
private:
+ friend class internal::ClientReaderWriterFactory<W, R>;
+
ClientContext* context_;
CompletionQueue cq_;
- Call call_;
+ ::grpc::internal::Call call_;
+
+ /// Block to create a stream and write the initial metadata and \a request
+ /// out. Note that \a context will be used to fill in custom initial metadata
+ /// used to send to the server when starting the call.
+ ClientReaderWriter(::grpc::ChannelInterface* channel,
+ const ::grpc::internal::RpcMethod& method,
+ ClientContext* context)
+ : context_(context),
+ cq_(grpc_completion_queue_attributes{
+ GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK,
+ GRPC_CQ_DEFAULT_POLLING}), // Pluckable cq
+ call_(channel->CreateCall(method, context, &cq_)) {
+ if (!context_->initial_metadata_corked_) {
+ ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
+ ops;
+ ops.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops);
+ }
+ }
};
/// Server-side interface for streaming reads of message of type \a R.
template <class R>
-class ServerReaderInterface : public ServerStreamingInterface,
- public ReaderInterface<R> {};
+class ServerReaderInterface : public internal::ServerStreamingInterface,
+ public internal::ReaderInterface<R> {};
/// Synchronous (blocking) server-side API for doing client-streaming RPCs,
/// where the incoming message stream coming from the client has messages of
@@ -512,15 +576,13 @@ class ServerReaderInterface : public ServerStreamingInterface,
template <class R>
class ServerReader final : public ServerReaderInterface<R> {
public:
- ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
-
/// See the \a ServerStreamingInterface.SendInitialMetadata method
/// for semantics. Note that initial metadata will be affected by the
/// \a ServerContext associated with this call.
void SendInitialMetadata() override {
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
- CallOpSet<CallOpSendInitialMetadata> ops;
+ internal::CallOpSet<internal::CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
@@ -537,21 +599,27 @@ class ServerReader final : public ServerReaderInterface<R> {
}
bool Read(R* msg) override {
- CallOpSet<CallOpRecvMessage<R>> ops;
+ internal::CallOpSet<internal::CallOpRecvMessage<R>> ops;
ops.RecvMessage(msg);
call_->PerformOps(&ops);
return call_->cq()->Pluck(&ops) && ops.got_message;
}
private:
- Call* const call_;
+ internal::Call* const call_;
ServerContext* const ctx_;
+
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class internal::ClientStreamingHandler;
+
+ ServerReader(internal::Call* call, ServerContext* ctx)
+ : call_(call), ctx_(ctx) {}
};
/// Server-side interface for streaming writes of message of type \a W.
template <class W>
-class ServerWriterInterface : public ServerStreamingInterface,
- public WriterInterface<W> {};
+class ServerWriterInterface : public internal::ServerStreamingInterface,
+ public internal::WriterInterface<W> {};
/// Synchronous (blocking) server-side API for doing for doing a
/// server-streaming RPCs, where the outgoing message stream coming from the
@@ -559,8 +627,6 @@ class ServerWriterInterface : public ServerStreamingInterface,
template <class W>
class ServerWriter final : public ServerWriterInterface<W> {
public:
- ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
-
/// See the \a ServerStreamingInterface.SendInitialMetadata method
/// for semantics.
/// Note that initial metadata will be affected by the
@@ -568,7 +634,7 @@ class ServerWriter final : public ServerWriterInterface<W> {
void SendInitialMetadata() override {
GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
- CallOpSet<CallOpSendInitialMetadata> ops;
+ internal::CallOpSet<internal::CallOpSendInitialMetadata> ops;
ops.SendInitialMetadata(ctx_->initial_metadata_,
ctx_->initial_metadata_flags());
if (ctx_->compression_level_set()) {
@@ -584,11 +650,12 @@ class ServerWriter final : public ServerWriterInterface<W> {
/// Side effect:
/// Also sends initial metadata if not already sent (using the
/// \a ClientContext associated with this call to fill in values).
- using WriterInterface<W>::Write;
+ using internal::WriterInterface<W>::Write;
bool Write(const W& msg, WriteOptions options) override {
if (options.is_last_message()) {
options.set_buffer_hint();
}
+
if (!ctx_->pending_ops_.SendMessage(msg, options).ok()) {
return false;
}
@@ -613,15 +680,21 @@ class ServerWriter final : public ServerWriterInterface<W> {
}
private:
- Call* const call_;
+ internal::Call* const call_;
ServerContext* const ctx_;
+
+ template <class ServiceType, class RequestType, class ResponseType>
+ friend class internal::ServerStreamingHandler;
+
+ ServerWriter(internal::Call* call, ServerContext* ctx)
+ : call_(call), ctx_(ctx) {}
};
/// Server-side interface for bi-directional streaming.
template <class W, class R>
-class ServerReaderWriterInterface : public ServerStreamingInterface,
- public WriterInterface<W>,
- public ReaderInterface<R> {};
+class ServerReaderWriterInterface : public internal::ServerStreamingInterface,
+ public internal::WriterInterface<W>,
+ public internal::ReaderInterface<R> {};
/// Actual implementation of bi-directional streaming
namespace internal {
@@ -688,6 +761,7 @@ class ServerReaderWriterBody final {
Call* const call_;
ServerContext* const ctx_;
};
+
} // namespace internal
/// Synchronous (blocking) server-side API for a bidirectional
@@ -697,8 +771,6 @@ class ServerReaderWriterBody final {
template <class W, class R>
class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> {
public:
- ServerReaderWriter(Call* call, ServerContext* ctx) : body_(call, ctx) {}
-
/// See the \a ServerStreamingInterface.SendInitialMetadata method
/// for semantics. Note that initial metadata will be affected by the
/// \a ServerContext associated with this call.
@@ -715,13 +787,18 @@ class ServerReaderWriter final : public ServerReaderWriterInterface<W, R> {
/// Side effect:
/// Also sends initial metadata if not already sent (using the \a
/// ServerContext associated with this call).
- using WriterInterface<W>::Write;
+ using internal::WriterInterface<W>::Write;
bool Write(const W& msg, WriteOptions options) override {
return body_.Write(msg, options);
}
private:
internal::ServerReaderWriterBody<W, R> body_;
+
+ friend class internal::TemplatedBidiStreamingHandler<ServerReaderWriter<W, R>,
+ false>;
+ ServerReaderWriter(internal::Call* call, ServerContext* ctx)
+ : body_(call, ctx) {}
};
/// A class to represent a flow-controlled unary call. This is something
@@ -736,9 +813,6 @@ template <class RequestType, class ResponseType>
class ServerUnaryStreamer final
: public ServerReaderWriterInterface<ResponseType, RequestType> {
public:
- ServerUnaryStreamer(Call* call, ServerContext* ctx)
- : body_(call, ctx), read_done_(false), write_done_(false) {}
-
/// Block to send initial metadata to client.
/// Implicit input parameter:
/// - the \a ServerContext associated with this call will be used for
@@ -775,7 +849,7 @@ class ServerUnaryStreamer final
/// \param options The WriteOptions affecting the write operation.
///
/// \return \a true on success, \a false when the stream has been closed.
- using WriterInterface<ResponseType>::Write;
+ using internal::WriterInterface<ResponseType>::Write;
bool Write(const ResponseType& response, WriteOptions options) override {
if (write_done_ || !read_done_) {
return false;
@@ -788,6 +862,11 @@ class ServerUnaryStreamer final
internal::ServerReaderWriterBody<ResponseType, RequestType> body_;
bool read_done_;
bool write_done_;
+
+ friend class internal::TemplatedBidiStreamingHandler<
+ ServerUnaryStreamer<RequestType, ResponseType>, true>;
+ ServerUnaryStreamer(internal::Call* call, ServerContext* ctx)
+ : body_(call, ctx), read_done_(false), write_done_(false) {}
};
/// A class to represent a flow-controlled server-side streaming call.
@@ -799,9 +878,6 @@ template <class RequestType, class ResponseType>
class ServerSplitStreamer final
: public ServerReaderWriterInterface<ResponseType, RequestType> {
public:
- ServerSplitStreamer(Call* call, ServerContext* ctx)
- : body_(call, ctx), read_done_(false) {}
-
/// Block to send initial metadata to client.
/// Implicit input parameter:
/// - the \a ServerContext associated with this call will be used for
@@ -838,7 +914,7 @@ class ServerSplitStreamer final
/// \param options The WriteOptions affecting the write operation.
///
/// \return \a true on success, \a false when the stream has been closed.
- using WriterInterface<ResponseType>::Write;
+ using internal::WriterInterface<ResponseType>::Write;
bool Write(const ResponseType& response, WriteOptions options) override {
return read_done_ && body_.Write(response, options);
}
@@ -846,6 +922,11 @@ class ServerSplitStreamer final
private:
internal::ServerReaderWriterBody<ResponseType, RequestType> body_;
bool read_done_;
+
+ friend class internal::TemplatedBidiStreamingHandler<
+ ServerSplitStreamer<RequestType, ResponseType>, false>;
+ ServerSplitStreamer(internal::Call* call, ServerContext* ctx)
+ : body_(call, ctx), read_done_(false) {}
};
} // namespace grpc
diff --git a/include/grpc++/impl/codegen/time.h b/include/grpc++/impl/codegen/time.h
index 589deb4f03..d464d6ea13 100644
--- a/include/grpc++/impl/codegen/time.h
+++ b/include/grpc++/impl/codegen/time.h
@@ -19,6 +19,8 @@
#ifndef GRPCXX_IMPL_CODEGEN_TIME_H
#define GRPCXX_IMPL_CODEGEN_TIME_H
+#include <chrono>
+
#include <grpc++/impl/codegen/config.h>
#include <grpc/impl/codegen/grpc_types.h>
@@ -59,10 +61,6 @@ class TimePoint<gpr_timespec> {
} // namespace grpc
-#include <chrono>
-
-#include <grpc/impl/codegen/grpc_types.h>
-
namespace grpc {
// from and to should be absolute time.
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index 0a3aae8241..01c4a60d21 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -175,7 +175,8 @@ class Server final : public ServerInterface, private GrpcLibraryCodegen {
/// \param num_cqs How many completion queues does \a cqs hold.
void Start(ServerCompletionQueue** cqs, size_t num_cqs) override;
- void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) override;
+ void PerformOpsOnCall(internal::CallOpSetInterface* ops,
+ internal::Call* call) override;
void ShutdownInternal(gpr_timespec deadline) override;
diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h
index a948abedb5..0888bef0d9 100644
--- a/include/grpc++/server_builder.h
+++ b/include/grpc++/server_builder.h
@@ -40,7 +40,6 @@ namespace grpc {
class AsyncGenericService;
class ResourceQuota;
class CompletionQueue;
-class RpcService;
class Server;
class ServerCompletionQueue;
class ServerCredentials;
diff --git a/include/grpc/grpc_security.h b/include/grpc/grpc_security.h
index 95b1447935..758aaf5b6c 100644
--- a/include/grpc/grpc_security.h
+++ b/include/grpc/grpc_security.h
@@ -316,6 +316,43 @@ typedef struct grpc_server_credentials grpc_server_credentials;
*/
GRPCAPI void grpc_server_credentials_release(grpc_server_credentials *creds);
+/** Server certificate config object holds the server's public certificates and
+ associated private keys, as well as any CA certificates needed for client
+ certificate validation (if applicable). Create using
+ grpc_ssl_server_certificate_config_create(). */
+typedef struct grpc_ssl_server_certificate_config
+ grpc_ssl_server_certificate_config;
+
+/** Creates a grpc_ssl_server_certificate_config object.
+ - pem_roots_cert is the NULL-terminated string containing the PEM encoding of
+ the client root certificates. This parameter may be NULL if the server does
+ not want the client to be authenticated with SSL.
+ - pem_key_cert_pairs is an array private key / certificate chains of the
+ server. This parameter cannot be NULL.
+ - num_key_cert_pairs indicates the number of items in the private_key_files
+ and cert_chain_files parameters. It must be at least 1.
+ - It is the caller's responsibility to free this object via
+ grpc_ssl_server_certificate_config_destroy(). */
+GRPCAPI grpc_ssl_server_certificate_config *
+grpc_ssl_server_certificate_config_create(
+ const char *pem_root_certs,
+ const grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs,
+ size_t num_key_cert_pairs);
+
+/** Destroys a grpc_ssl_server_certificate_config object. */
+GRPCAPI void grpc_ssl_server_certificate_config_destroy(
+ grpc_ssl_server_certificate_config *config);
+
+/** Callback to retrieve updated SSL server certificates, private keys, and
+ trusted CAs (for client authentication).
+ - user_data parameter, if not NULL, contains opaque data to be used by the
+ callback.
+ - Use grpc_ssl_server_certificate_config_create to create the config.
+ - The caller assumes ownership of the config. */
+typedef grpc_ssl_certificate_config_reload_status (
+ *grpc_ssl_server_certificate_config_callback)(
+ void *user_data, grpc_ssl_server_certificate_config **config);
+
/** Deprecated in favor of grpc_ssl_server_credentials_create_ex.
Creates an SSL server_credentials object.
- pem_roots_cert is the NULL-terminated string containing the PEM encoding of
@@ -332,7 +369,8 @@ GRPCAPI grpc_server_credentials *grpc_ssl_server_credentials_create(
const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs,
size_t num_key_cert_pairs, int force_client_auth, void *reserved);
-/** Same as grpc_ssl_server_credentials_create method except uses
+/** Deprecated in favor of grpc_ssl_server_credentials_create_with_options.
+ Same as grpc_ssl_server_credentials_create method except uses
grpc_ssl_client_certificate_request_type enum to support more ways to
authenticate client cerificates.*/
GRPCAPI grpc_server_credentials *grpc_ssl_server_credentials_create_ex(
@@ -341,6 +379,40 @@ GRPCAPI grpc_server_credentials *grpc_ssl_server_credentials_create_ex(
grpc_ssl_client_certificate_request_type client_certificate_request,
void *reserved);
+typedef struct grpc_ssl_server_credentials_options
+ grpc_ssl_server_credentials_options;
+
+/** Creates an options object using a certificate config. Use this method when
+ the certificates and keys of the SSL server will not change during the
+ server's lifetime.
+ - Takes ownership of the certificate_config parameter. */
+GRPCAPI grpc_ssl_server_credentials_options *
+grpc_ssl_server_credentials_create_options_using_config(
+ grpc_ssl_client_certificate_request_type client_certificate_request,
+ grpc_ssl_server_certificate_config *certificate_config);
+
+/** Creates an options object using a certificate config fetcher. Use this
+ method to reload the certificates and keys of the SSL server without
+ interrupting the operation of the server. Initial certificate config will be
+ fetched during server initialization.
+ - user_data parameter, if not NULL, contains opaque data which will be passed
+ to the fetcher (see definition of
+ grpc_ssl_server_certificate_config_callback). */
+GRPCAPI grpc_ssl_server_credentials_options *
+grpc_ssl_server_credentials_create_options_using_config_fetcher(
+ grpc_ssl_client_certificate_request_type client_certificate_request,
+ grpc_ssl_server_certificate_config_callback cb, void *user_data);
+
+/** Destroys a grpc_ssl_server_credentials_options object. */
+GRPCAPI void grpc_ssl_server_credentials_options_destroy(
+ grpc_ssl_server_credentials_options *options);
+
+/** Creates an SSL server_credentials object using the provided options struct.
+ - Takes ownership of the options parameter. */
+GRPCAPI grpc_server_credentials *
+grpc_ssl_server_credentials_create_with_options(
+ grpc_ssl_server_credentials_options *options);
+
/** --- Server-side secure ports. --- */
/** Add a HTTP2 over an encrypted link over tcp listener.
diff --git a/include/grpc/grpc_security_constants.h b/include/grpc/grpc_security_constants.h
index fde300dfb1..60e167eb88 100644
--- a/include/grpc/grpc_security_constants.h
+++ b/include/grpc/grpc_security_constants.h
@@ -48,6 +48,13 @@ typedef enum {
GRPC_SSL_ROOTS_OVERRIDE_FAIL
} grpc_ssl_roots_override_result;
+/** Callback results for dynamically loading a SSL certificate config. */
+typedef enum {
+ GRPC_SSL_CERTIFICATE_CONFIG_RELOAD_UNCHANGED,
+ GRPC_SSL_CERTIFICATE_CONFIG_RELOAD_NEW,
+ GRPC_SSL_CERTIFICATE_CONFIG_RELOAD_FAIL
+} grpc_ssl_certificate_config_reload_status;
+
typedef enum {
/** Server does not request client certificate. A client can present a self
signed or signed certificates if it wishes to do so and they would be
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc
index c2db8eff71..253280bd24 100644
--- a/src/compiler/cpp_generator.cc
+++ b/src/compiler/cpp_generator.cc
@@ -140,7 +140,6 @@ grpc::string GetHeaderIncludes(grpc_generator::File *file,
printer->Print(vars, "namespace grpc {\n");
printer->Print(vars, "class CompletionQueue;\n");
printer->Print(vars, "class Channel;\n");
- printer->Print(vars, "class RpcService;\n");
printer->Print(vars, "class ServerCompletionQueue;\n");
printer->Print(vars, "class ServerContext;\n");
printer->Print(vars, "} // namespace grpc\n\n");
@@ -324,7 +323,8 @@ void PrintHeaderClientMethodInterfaces(
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
- "virtual ::grpc::ClientReaderInterface< $Response$>* $Method$Raw("
+ "virtual ::grpc::ClientReaderInterface< $Response$>* "
+ "$Method$Raw("
"::grpc::ClientContext* context, const $Request$& request) = 0;\n");
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
@@ -546,7 +546,8 @@ void PrintHeaderClientMethodData(grpc_generator::Printer *printer,
const grpc_generator::Method *method,
std::map<grpc::string, grpc::string> *vars) {
(*vars)["Method"] = method->name();
- printer->Print(*vars, "const ::grpc::RpcMethod rpcmethod_$Method$_;\n");
+ printer->Print(*vars,
+ "const ::grpc::internal::RpcMethod rpcmethod_$Method$_;\n");
}
void PrintHeaderServerMethodSync(grpc_generator::Printer *printer,
@@ -718,7 +719,7 @@ void PrintHeaderServerMethodStreamedUnary(
printer->Print(*vars,
"WithStreamedUnaryMethod_$Method$() {\n"
" ::grpc::Service::MarkMethodStreamed($Idx$,\n"
- " new ::grpc::StreamedUnaryHandler< $Request$, "
+ " new ::grpc::internal::StreamedUnaryHandler< $Request$, "
"$Response$>(std::bind"
"(&WithStreamedUnaryMethod_$Method$<BaseClass>::"
"Streamed$Method$, this, std::placeholders::_1, "
@@ -766,15 +767,16 @@ void PrintHeaderServerMethodSplitStreaming(
"{}\n");
printer->Print(" public:\n");
printer->Indent();
- printer->Print(*vars,
- "WithSplitStreamingMethod_$Method$() {\n"
- " ::grpc::Service::MarkMethodStreamed($Idx$,\n"
- " new ::grpc::SplitServerStreamingHandler< $Request$, "
- "$Response$>(std::bind"
- "(&WithSplitStreamingMethod_$Method$<BaseClass>::"
- "Streamed$Method$, this, std::placeholders::_1, "
- "std::placeholders::_2)));\n"
- "}\n");
+ printer->Print(
+ *vars,
+ "WithSplitStreamingMethod_$Method$() {\n"
+ " ::grpc::Service::MarkMethodStreamed($Idx$,\n"
+ " new ::grpc::internal::SplitServerStreamingHandler< $Request$, "
+ "$Response$>(std::bind"
+ "(&WithSplitStreamingMethod_$Method$<BaseClass>::"
+ "Streamed$Method$, this, std::placeholders::_1, "
+ "std::placeholders::_2)));\n"
+ "}\n");
printer->Print(*vars,
"~WithSplitStreamingMethod_$Method$() override {\n"
" BaseClassMustBeDerivedFromService(this);\n"
@@ -914,7 +916,8 @@ void PrintHeaderService(grpc_generator::Printer *printer,
" {\n public:\n");
printer->Indent();
printer->Print(
- "Stub(const std::shared_ptr< ::grpc::ChannelInterface>& channel);\n");
+ "Stub(const std::shared_ptr< ::grpc::ChannelInterface>& "
+ "channel);\n");
for (int i = 0; i < service->method_count(); ++i) {
PrintHeaderClientMethod(printer, service->method(i).get(), vars, true);
}
@@ -1185,10 +1188,9 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response) {\n");
printer->Print(*vars,
- " return ::grpc::BlockingUnaryCall(channel_.get(), "
- "rpcmethod_$Method$_, "
- "context, request, response);\n"
- "}\n\n");
+ " return ::grpc::internal::BlockingUnaryCall"
+ "(channel_.get(), rpcmethod_$Method$_, "
+ "context, request, response);\n}\n\n");
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncStart"] = async_prefix.start;
@@ -1198,25 +1200,27 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"ClientContext* context, "
"const $Request$& request, "
"::grpc::CompletionQueue* cq) {\n");
- printer->Print(*vars,
- " return "
- "::grpc::ClientAsyncResponseReader< $Response$>::Create("
- "channel_.get(), cq, "
- "rpcmethod_$Method$_, "
- "context, request, $AsyncStart$);\n"
- "}\n\n");
+ printer->Print(
+ *vars,
+ " return "
+ "::grpc::internal::ClientAsyncResponseReaderFactory< $Response$>"
+ "::Create(channel_.get(), cq, "
+ "rpcmethod_$Method$_, "
+ "context, request, $AsyncStart$);\n"
+ "}\n\n");
}
} else if (ClientOnlyStreaming(method)) {
printer->Print(*vars,
"::grpc::ClientWriter< $Request$>* "
"$ns$$Service$::Stub::$Method$Raw("
"::grpc::ClientContext* context, $Response$* response) {\n");
- printer->Print(*vars,
- " return new ::grpc::ClientWriter< $Request$>("
- "channel_.get(), "
- "rpcmethod_$Method$_, "
- "context, response);\n"
- "}\n\n");
+ printer->Print(
+ *vars,
+ " return ::grpc::internal::ClientWriterFactory< $Request$>::Create("
+ "channel_.get(), "
+ "rpcmethod_$Method$_, "
+ "context, response);\n"
+ "}\n\n");
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncStart"] = async_prefix.start;
@@ -1227,12 +1231,13 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"$ns$$Service$::Stub::$AsyncPrefix$$Method$Raw("
"::grpc::ClientContext* context, $Response$* response, "
"::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n");
- printer->Print(*vars,
- " return ::grpc::ClientAsyncWriter< $Request$>::Create("
- "channel_.get(), cq, "
- "rpcmethod_$Method$_, "
- "context, response, $AsyncStart$$AsyncCreateArgs$);\n"
- "}\n\n");
+ printer->Print(
+ *vars,
+ " return ::grpc::internal::ClientAsyncWriterFactory< $Request$>"
+ "::Create(channel_.get(), cq, "
+ "rpcmethod_$Method$_, "
+ "context, response, $AsyncStart$$AsyncCreateArgs$);\n"
+ "}\n\n");
}
} else if (ServerOnlyStreaming(method)) {
printer->Print(
@@ -1240,12 +1245,13 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"::grpc::ClientReader< $Response$>* "
"$ns$$Service$::Stub::$Method$Raw("
"::grpc::ClientContext* context, const $Request$& request) {\n");
- printer->Print(*vars,
- " return new ::grpc::ClientReader< $Response$>("
- "channel_.get(), "
- "rpcmethod_$Method$_, "
- "context, request);\n"
- "}\n\n");
+ printer->Print(
+ *vars,
+ " return ::grpc::internal::ClientReaderFactory< $Response$>::Create("
+ "channel_.get(), "
+ "rpcmethod_$Method$_, "
+ "context, request);\n"
+ "}\n\n");
for (auto async_prefix : async_prefixes) {
(*vars)["AsyncPrefix"] = async_prefix.prefix;
(*vars)["AsyncStart"] = async_prefix.start;
@@ -1257,12 +1263,13 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"$ns$$Service$::Stub::$AsyncPrefix$$Method$Raw("
"::grpc::ClientContext* context, const $Request$& request, "
"::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n");
- printer->Print(*vars,
- " return ::grpc::ClientAsyncReader< $Response$>::Create("
- "channel_.get(), cq, "
- "rpcmethod_$Method$_, "
- "context, request, $AsyncStart$$AsyncCreateArgs$);\n"
- "}\n\n");
+ printer->Print(
+ *vars,
+ " return ::grpc::internal::ClientAsyncReaderFactory< $Response$>"
+ "::Create(channel_.get(), cq, "
+ "rpcmethod_$Method$_, "
+ "context, request, $AsyncStart$$AsyncCreateArgs$);\n"
+ "}\n\n");
}
} else if (method->BidiStreaming()) {
printer->Print(
@@ -1270,8 +1277,8 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"::grpc::ClientReaderWriter< $Request$, $Response$>* "
"$ns$$Service$::Stub::$Method$Raw(::grpc::ClientContext* context) {\n");
printer->Print(*vars,
- " return new ::grpc::ClientReaderWriter< "
- "$Request$, $Response$>("
+ " return ::grpc::internal::ClientReaderWriterFactory< "
+ "$Request$, $Response$>::Create("
"channel_.get(), "
"rpcmethod_$Method$_, "
"context);\n"
@@ -1286,14 +1293,14 @@ void PrintSourceClientMethod(grpc_generator::Printer *printer,
"$ns$$Service$::Stub::$AsyncPrefix$$Method$Raw(::grpc::"
"ClientContext* context, "
"::grpc::CompletionQueue* cq$AsyncMethodParams$) {\n");
- printer->Print(
- *vars,
- " return "
- "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>::Create("
- "channel_.get(), cq, "
- "rpcmethod_$Method$_, "
- "context, $AsyncStart$$AsyncCreateArgs$);\n"
- "}\n\n");
+ printer->Print(*vars,
+ " return "
+ "::grpc::internal::ClientAsyncReaderWriterFactory< "
+ "$Request$, $Response$>::Create("
+ "channel_.get(), cq, "
+ "rpcmethod_$Method$_, "
+ "context, $AsyncStart$$AsyncCreateArgs$);\n"
+ "}\n\n");
}
}
}
@@ -1404,7 +1411,7 @@ void PrintSourceService(grpc_generator::Printer *printer,
printer->Print(*vars,
", rpcmethod_$Method$_("
"$prefix$$Service$_method_names[$Idx$], "
- "::grpc::RpcMethod::$StreamingType$, "
+ "::grpc::internal::RpcMethod::$StreamingType$, "
"channel"
")\n");
}
@@ -1427,38 +1434,38 @@ void PrintSourceService(grpc_generator::Printer *printer,
if (method->NoStreaming()) {
printer->Print(
*vars,
- "AddMethod(new ::grpc::RpcServiceMethod(\n"
+ "AddMethod(new ::grpc::internal::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
- " ::grpc::RpcMethod::NORMAL_RPC,\n"
- " new ::grpc::RpcMethodHandler< $ns$$Service$::Service, "
+ " ::grpc::internal::RpcMethod::NORMAL_RPC,\n"
+ " new ::grpc::internal::RpcMethodHandler< $ns$$Service$::Service, "
"$Request$, "
"$Response$>(\n"
" std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (ClientOnlyStreaming(method.get())) {
printer->Print(
*vars,
- "AddMethod(new ::grpc::RpcServiceMethod(\n"
+ "AddMethod(new ::grpc::internal::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
- " ::grpc::RpcMethod::CLIENT_STREAMING,\n"
- " new ::grpc::ClientStreamingHandler< "
+ " ::grpc::internal::RpcMethod::CLIENT_STREAMING,\n"
+ " new ::grpc::internal::ClientStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
" std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (ServerOnlyStreaming(method.get())) {
printer->Print(
*vars,
- "AddMethod(new ::grpc::RpcServiceMethod(\n"
+ "AddMethod(new ::grpc::internal::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
- " ::grpc::RpcMethod::SERVER_STREAMING,\n"
- " new ::grpc::ServerStreamingHandler< "
+ " ::grpc::internal::RpcMethod::SERVER_STREAMING,\n"
+ " new ::grpc::internal::ServerStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
" std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
} else if (method->BidiStreaming()) {
printer->Print(
*vars,
- "AddMethod(new ::grpc::RpcServiceMethod(\n"
+ "AddMethod(new ::grpc::internal::RpcServiceMethod(\n"
" $prefix$$Service$_method_names[$Idx$],\n"
- " ::grpc::RpcMethod::BIDI_STREAMING,\n"
- " new ::grpc::BidiStreamingHandler< "
+ " ::grpc::internal::RpcMethod::BIDI_STREAMING,\n"
+ " new ::grpc::internal::BidiStreamingHandler< "
"$ns$$Service$::Service, $Request$, $Response$>(\n"
" std::mem_fn(&$ns$$Service$::Service::$Method$), this)));\n");
}
diff --git a/src/core/lib/security/credentials/ssl/ssl_credentials.cc b/src/core/lib/security/credentials/ssl/ssl_credentials.cc
index 8e47aebedb..2085e2b8e7 100644
--- a/src/core/lib/security/credentials/ssl/ssl_credentials.cc
+++ b/src/core/lib/security/credentials/ssl/ssl_credentials.cc
@@ -119,6 +119,12 @@ grpc_channel_credentials *grpc_ssl_credentials_create(
// SSL Server Credentials.
//
+struct grpc_ssl_server_credentials_options {
+ grpc_ssl_client_certificate_request_type client_certificate_request;
+ grpc_ssl_server_certificate_config *certificate_config;
+ grpc_ssl_server_certificate_config_fetcher *certificate_config_fetcher;
+};
+
static void ssl_server_destruct(grpc_exec_ctx *exec_ctx,
grpc_server_credentials *creds) {
grpc_ssl_server_credentials *c = (grpc_ssl_server_credentials *)creds;
@@ -130,9 +136,7 @@ static void ssl_server_destruct(grpc_exec_ctx *exec_ctx,
static grpc_security_status ssl_server_create_security_connector(
grpc_exec_ctx *exec_ctx, grpc_server_credentials *creds,
grpc_server_security_connector **sc) {
- grpc_ssl_server_credentials *c = (grpc_ssl_server_credentials *)creds;
- return grpc_ssl_server_security_connector_create(exec_ctx, creds, &c->config,
- sc);
+ return grpc_ssl_server_security_connector_create(exec_ctx, creds, sc);
}
static grpc_server_credentials_vtable ssl_server_vtable = {
@@ -170,6 +174,86 @@ static void ssl_build_server_config(
config->num_key_cert_pairs = num_key_cert_pairs;
}
+grpc_ssl_server_certificate_config *grpc_ssl_server_certificate_config_create(
+ const char *pem_root_certs,
+ const grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs,
+ size_t num_key_cert_pairs) {
+ grpc_ssl_server_certificate_config *config =
+ (grpc_ssl_server_certificate_config *)gpr_zalloc(
+ sizeof(grpc_ssl_server_certificate_config));
+ if (pem_root_certs != NULL) {
+ config->pem_root_certs = gpr_strdup(pem_root_certs);
+ }
+ if (num_key_cert_pairs > 0) {
+ GPR_ASSERT(pem_key_cert_pairs != NULL);
+ config->pem_key_cert_pairs = (grpc_ssl_pem_key_cert_pair *)gpr_zalloc(
+ num_key_cert_pairs * sizeof(grpc_ssl_pem_key_cert_pair));
+ }
+ config->num_key_cert_pairs = num_key_cert_pairs;
+ for (size_t i = 0; i < num_key_cert_pairs; i++) {
+ GPR_ASSERT(pem_key_cert_pairs[i].private_key != NULL);
+ GPR_ASSERT(pem_key_cert_pairs[i].cert_chain != NULL);
+ config->pem_key_cert_pairs[i].cert_chain =
+ gpr_strdup(pem_key_cert_pairs[i].cert_chain);
+ config->pem_key_cert_pairs[i].private_key =
+ gpr_strdup(pem_key_cert_pairs[i].private_key);
+ }
+ return config;
+}
+
+void grpc_ssl_server_certificate_config_destroy(
+ grpc_ssl_server_certificate_config *config) {
+ if (config == NULL) return;
+ for (size_t i = 0; i < config->num_key_cert_pairs; i++) {
+ gpr_free((void *)config->pem_key_cert_pairs[i].private_key);
+ gpr_free((void *)config->pem_key_cert_pairs[i].cert_chain);
+ }
+ gpr_free(config->pem_key_cert_pairs);
+ gpr_free(config->pem_root_certs);
+ gpr_free(config);
+}
+
+grpc_ssl_server_credentials_options *
+grpc_ssl_server_credentials_create_options_using_config(
+ grpc_ssl_client_certificate_request_type client_certificate_request,
+ grpc_ssl_server_certificate_config *config) {
+ grpc_ssl_server_credentials_options *options = NULL;
+ if (config == NULL) {
+ gpr_log(GPR_ERROR, "Certificate config must not be NULL.");
+ goto done;
+ }
+ options = (grpc_ssl_server_credentials_options *)gpr_zalloc(
+ sizeof(grpc_ssl_server_credentials_options));
+ options->client_certificate_request = client_certificate_request;
+ options->certificate_config = config;
+done:
+ return options;
+}
+
+grpc_ssl_server_credentials_options *
+grpc_ssl_server_credentials_create_options_using_config_fetcher(
+ grpc_ssl_client_certificate_request_type client_certificate_request,
+ grpc_ssl_server_certificate_config_callback cb, void *user_data) {
+ if (cb == NULL) {
+ gpr_log(GPR_ERROR, "Invalid certificate config callback parameter.");
+ return NULL;
+ }
+
+ grpc_ssl_server_certificate_config_fetcher *fetcher =
+ (grpc_ssl_server_certificate_config_fetcher *)gpr_zalloc(
+ sizeof(grpc_ssl_server_certificate_config_fetcher));
+ fetcher->cb = cb;
+ fetcher->user_data = user_data;
+
+ grpc_ssl_server_credentials_options *options =
+ (grpc_ssl_server_credentials_options *)gpr_zalloc(
+ sizeof(grpc_ssl_server_credentials_options));
+ options->client_certificate_request = client_certificate_request;
+ options->certificate_config_fetcher = fetcher;
+
+ return options;
+}
+
grpc_server_credentials *grpc_ssl_server_credentials_create(
const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs,
size_t num_key_cert_pairs, int force_client_auth, void *reserved) {
@@ -186,8 +270,6 @@ grpc_server_credentials *grpc_ssl_server_credentials_create_ex(
size_t num_key_cert_pairs,
grpc_ssl_client_certificate_request_type client_certificate_request,
void *reserved) {
- grpc_ssl_server_credentials *c = (grpc_ssl_server_credentials *)gpr_zalloc(
- sizeof(grpc_ssl_server_credentials));
GRPC_API_TRACE(
"grpc_ssl_server_credentials_create_ex("
"pem_root_certs=%s, pem_key_cert_pairs=%p, num_key_cert_pairs=%lu, "
@@ -195,11 +277,67 @@ grpc_server_credentials *grpc_ssl_server_credentials_create_ex(
5, (pem_root_certs, pem_key_cert_pairs, (unsigned long)num_key_cert_pairs,
client_certificate_request, reserved));
GPR_ASSERT(reserved == NULL);
+
+ grpc_ssl_server_certificate_config *cert_config =
+ grpc_ssl_server_certificate_config_create(
+ pem_root_certs, pem_key_cert_pairs, num_key_cert_pairs);
+ grpc_ssl_server_credentials_options *options =
+ grpc_ssl_server_credentials_create_options_using_config(
+ client_certificate_request, cert_config);
+
+ return grpc_ssl_server_credentials_create_with_options(options);
+}
+
+grpc_server_credentials *grpc_ssl_server_credentials_create_with_options(
+ grpc_ssl_server_credentials_options *options) {
+ grpc_server_credentials *retval = NULL;
+ grpc_ssl_server_credentials *c = NULL;
+
+ if (options == NULL) {
+ gpr_log(GPR_ERROR,
+ "Invalid options trying to create SSL server credentials.");
+ goto done;
+ }
+
+ if (options->certificate_config == NULL &&
+ options->certificate_config_fetcher == NULL) {
+ gpr_log(GPR_ERROR,
+ "SSL server credentials options must specify either "
+ "certificate config or fetcher.");
+ goto done;
+ } else if (options->certificate_config_fetcher != NULL &&
+ options->certificate_config_fetcher->cb == NULL) {
+ gpr_log(GPR_ERROR, "Certificate config fetcher callback must not be NULL.");
+ goto done;
+ }
+
+ c = (grpc_ssl_server_credentials *)gpr_zalloc(
+ sizeof(grpc_ssl_server_credentials));
c->base.type = GRPC_CHANNEL_CREDENTIALS_TYPE_SSL;
gpr_ref_init(&c->base.refcount, 1);
c->base.vtable = &ssl_server_vtable;
- ssl_build_server_config(pem_root_certs, pem_key_cert_pairs,
- num_key_cert_pairs, client_certificate_request,
- &c->config);
- return &c->base;
+
+ if (options->certificate_config_fetcher != NULL) {
+ c->config.client_certificate_request = options->client_certificate_request;
+ c->certificate_config_fetcher = *options->certificate_config_fetcher;
+ } else {
+ ssl_build_server_config(options->certificate_config->pem_root_certs,
+ options->certificate_config->pem_key_cert_pairs,
+ options->certificate_config->num_key_cert_pairs,
+ options->client_certificate_request, &c->config);
+ }
+
+ retval = &c->base;
+
+done:
+ grpc_ssl_server_credentials_options_destroy(options);
+ return retval;
+}
+
+void grpc_ssl_server_credentials_options_destroy(
+ grpc_ssl_server_credentials_options *o) {
+ if (o == NULL) return;
+ gpr_free(o->certificate_config_fetcher);
+ grpc_ssl_server_certificate_config_destroy(o->certificate_config);
+ gpr_free(o);
}
diff --git a/src/core/lib/security/credentials/ssl/ssl_credentials.h b/src/core/lib/security/credentials/ssl/ssl_credentials.h
index 42e425d9f1..5542484aae 100644
--- a/src/core/lib/security/credentials/ssl/ssl_credentials.h
+++ b/src/core/lib/security/credentials/ssl/ssl_credentials.h
@@ -29,9 +29,21 @@ typedef struct {
grpc_ssl_config config;
} grpc_ssl_credentials;
+struct grpc_ssl_server_certificate_config {
+ grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs;
+ size_t num_key_cert_pairs;
+ char *pem_root_certs;
+};
+
+typedef struct {
+ grpc_ssl_server_certificate_config_callback cb;
+ void *user_data;
+} grpc_ssl_server_certificate_config_fetcher;
+
typedef struct {
grpc_server_credentials base;
grpc_ssl_server_config config;
+ grpc_ssl_server_certificate_config_fetcher certificate_config_fetcher;
} grpc_ssl_server_credentials;
tsi_ssl_pem_key_cert_pair *grpc_convert_grpc_to_tsi_cert_pairs(
diff --git a/src/core/lib/security/transport/security_connector.cc b/src/core/lib/security/transport/security_connector.cc
index b050be2129..06160d0caa 100644
--- a/src/core/lib/security/transport/security_connector.cc
+++ b/src/core/lib/security/transport/security_connector.cc
@@ -34,6 +34,7 @@
#include "src/core/lib/security/context/security_context.h"
#include "src/core/lib/security/credentials/credentials.h"
#include "src/core/lib/security/credentials/fake/fake_credentials.h"
+#include "src/core/lib/security/credentials/ssl/ssl_credentials.h"
#include "src/core/lib/security/transport/lb_targets_info.h"
#include "src/core/lib/security/transport/secure_endpoint.h"
#include "src/core/lib/security/transport/security_handshaker.h"
@@ -277,6 +278,30 @@ grpc_security_connector *grpc_security_connector_find_in_args(
return NULL;
}
+static tsi_client_certificate_request_type
+get_tsi_client_certificate_request_type(
+ grpc_ssl_client_certificate_request_type grpc_request_type) {
+ switch (grpc_request_type) {
+ case GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE:
+ return TSI_DONT_REQUEST_CLIENT_CERTIFICATE;
+
+ case GRPC_SSL_REQUEST_CLIENT_CERTIFICATE_BUT_DONT_VERIFY:
+ return TSI_REQUEST_CLIENT_CERTIFICATE_BUT_DONT_VERIFY;
+
+ case GRPC_SSL_REQUEST_CLIENT_CERTIFICATE_AND_VERIFY:
+ return TSI_REQUEST_CLIENT_CERTIFICATE_AND_VERIFY;
+
+ case GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_BUT_DONT_VERIFY:
+ return TSI_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_BUT_DONT_VERIFY;
+
+ case GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY:
+ return TSI_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY;
+
+ default:
+ return TSI_DONT_REQUEST_CLIENT_CERTIFICATE;
+ }
+}
+
/* -- Fake implementation. -- */
typedef struct {
@@ -533,6 +558,15 @@ typedef struct {
tsi_ssl_server_handshaker_factory *server_handshaker_factory;
} grpc_ssl_server_security_connector;
+static bool server_connector_has_cert_config_fetcher(
+ grpc_ssl_server_security_connector *c) {
+ GPR_ASSERT(c != NULL);
+ grpc_ssl_server_credentials *server_creds =
+ (grpc_ssl_server_credentials *)c->base.server_creds;
+ GPR_ASSERT(server_creds != NULL);
+ return server_creds->certificate_config_fetcher.cb != NULL;
+}
+
static void ssl_channel_destroy(grpc_exec_ctx *exec_ctx,
grpc_security_connector *sc) {
grpc_ssl_channel_security_connector *c =
@@ -573,7 +607,6 @@ static void ssl_channel_add_handshakers(grpc_exec_ctx *exec_ctx,
tsi_result_to_string(result));
return;
}
-
// Create handshakers.
grpc_handshake_manager_add(
handshake_mgr,
@@ -581,12 +614,102 @@ static void ssl_channel_add_handshakers(grpc_exec_ctx *exec_ctx,
exec_ctx, tsi_create_adapter_handshaker(tsi_hs), &sc->base));
}
+static const char **fill_alpn_protocol_strings(size_t *num_alpn_protocols) {
+ GPR_ASSERT(num_alpn_protocols != NULL);
+ *num_alpn_protocols = grpc_chttp2_num_alpn_versions();
+ const char **alpn_protocol_strings =
+ (const char **)gpr_malloc(sizeof(const char *) * (*num_alpn_protocols));
+ for (size_t i = 0; i < *num_alpn_protocols; i++) {
+ alpn_protocol_strings[i] = grpc_chttp2_get_alpn_version_index(i);
+ }
+ return alpn_protocol_strings;
+}
+
+/* Attempts to replace the server_handshaker_factory with a new factory using
+ * the provided grpc_ssl_server_certificate_config. Should new factory creation
+ * fail, the existing factory will not be replaced. Returns true on success (new
+ * factory created). */
+static bool try_replace_server_handshaker_factory(
+ grpc_ssl_server_security_connector *sc,
+ const grpc_ssl_server_certificate_config *config) {
+ if (config == NULL) {
+ gpr_log(GPR_ERROR,
+ "Server certificate config callback returned invalid (NULL) "
+ "config.");
+ return false;
+ }
+ gpr_log(GPR_DEBUG, "Using new server certificate config (%p).", config);
+
+ size_t num_alpn_protocols = 0;
+ const char **alpn_protocol_strings =
+ fill_alpn_protocol_strings(&num_alpn_protocols);
+ tsi_ssl_pem_key_cert_pair *cert_pairs = grpc_convert_grpc_to_tsi_cert_pairs(
+ config->pem_key_cert_pairs, config->num_key_cert_pairs);
+ tsi_ssl_server_handshaker_factory *new_handshaker_factory = NULL;
+ grpc_ssl_server_credentials *server_creds =
+ (grpc_ssl_server_credentials *)sc->base.server_creds;
+ tsi_result result = tsi_create_ssl_server_handshaker_factory_ex(
+ cert_pairs, config->num_key_cert_pairs, config->pem_root_certs,
+ get_tsi_client_certificate_request_type(
+ server_creds->config.client_certificate_request),
+ ssl_cipher_suites(), alpn_protocol_strings, (uint16_t)num_alpn_protocols,
+ &new_handshaker_factory);
+ gpr_free(cert_pairs);
+ gpr_free((void *)alpn_protocol_strings);
+
+ if (result != TSI_OK) {
+ gpr_log(GPR_ERROR, "Handshaker factory creation failed with %s.",
+ tsi_result_to_string(result));
+ return false;
+ }
+ tsi_ssl_server_handshaker_factory_unref(sc->server_handshaker_factory);
+ sc->server_handshaker_factory = new_handshaker_factory;
+ return true;
+}
+
+/* Attempts to fetch the server certificate config if a callback is available.
+ * Current certificate config will continue to be used if the callback returns
+ * an error. Returns true if new credentials were sucessfully loaded. */
+static bool try_fetch_ssl_server_credentials(
+ grpc_ssl_server_security_connector *sc) {
+ grpc_ssl_server_certificate_config *certificate_config = NULL;
+ bool status;
+
+ GPR_ASSERT(sc != NULL);
+ if (!server_connector_has_cert_config_fetcher(sc)) return false;
+
+ grpc_ssl_server_credentials *server_creds =
+ (grpc_ssl_server_credentials *)sc->base.server_creds;
+ grpc_ssl_certificate_config_reload_status cb_result =
+ server_creds->certificate_config_fetcher.cb(
+ server_creds->certificate_config_fetcher.user_data,
+ &certificate_config);
+ if (cb_result == GRPC_SSL_CERTIFICATE_CONFIG_RELOAD_UNCHANGED) {
+ gpr_log(GPR_DEBUG, "No change in SSL server credentials.");
+ status = false;
+ } else if (cb_result == GRPC_SSL_CERTIFICATE_CONFIG_RELOAD_NEW) {
+ status = try_replace_server_handshaker_factory(sc, certificate_config);
+ } else {
+ // Log error, continue using previously-loaded credentials.
+ gpr_log(GPR_ERROR,
+ "Failed fetching new server credentials, continuing to "
+ "use previously-loaded credentials.");
+ status = false;
+ }
+
+ if (certificate_config != NULL) {
+ grpc_ssl_server_certificate_config_destroy(certificate_config);
+ }
+ return status;
+}
+
static void ssl_server_add_handshakers(grpc_exec_ctx *exec_ctx,
grpc_server_security_connector *sc,
grpc_handshake_manager *handshake_mgr) {
grpc_ssl_server_security_connector *c =
(grpc_ssl_server_security_connector *)sc;
// Instantiate TSI handshaker.
+ try_fetch_ssl_server_credentials(c);
tsi_handshaker *tsi_hs = NULL;
tsi_result result = tsi_ssl_server_handshaker_factory_create_handshaker(
c->server_handshaker_factory, &tsi_hs);
@@ -595,7 +718,6 @@ static void ssl_server_add_handshakers(grpc_exec_ctx *exec_ctx,
tsi_result_to_string(result));
return;
}
-
// Create handshakers.
grpc_handshake_manager_add(
handshake_mgr,
@@ -857,31 +979,6 @@ grpc_slice grpc_get_default_ssl_roots_for_testing(void) {
return compute_default_pem_root_certs_once();
}
-static tsi_client_certificate_request_type
-get_tsi_client_certificate_request_type(
- grpc_ssl_client_certificate_request_type grpc_request_type) {
- switch (grpc_request_type) {
- case GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE:
- return TSI_DONT_REQUEST_CLIENT_CERTIFICATE;
-
- case GRPC_SSL_REQUEST_CLIENT_CERTIFICATE_BUT_DONT_VERIFY:
- return TSI_REQUEST_CLIENT_CERTIFICATE_BUT_DONT_VERIFY;
-
- case GRPC_SSL_REQUEST_CLIENT_CERTIFICATE_AND_VERIFY:
- return TSI_REQUEST_CLIENT_CERTIFICATE_AND_VERIFY;
-
- case GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_BUT_DONT_VERIFY:
- return TSI_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_BUT_DONT_VERIFY;
-
- case GRPC_SSL_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY:
- return TSI_REQUEST_AND_REQUIRE_CLIENT_CERTIFICATE_AND_VERIFY;
-
- default:
- // Is this a sane default
- return TSI_DONT_REQUEST_CLIENT_CERTIFICATE;
- }
-}
-
const char *grpc_get_default_ssl_roots(void) {
/* TODO(jboeuf@google.com): Maybe revisit the approach which consists in
loading all the roots once for the lifetime of the process. */
@@ -897,18 +994,14 @@ grpc_security_status grpc_ssl_channel_security_connector_create(
grpc_call_credentials *request_metadata_creds,
const grpc_ssl_config *config, const char *target_name,
const char *overridden_target_name, grpc_channel_security_connector **sc) {
- size_t num_alpn_protocols = grpc_chttp2_num_alpn_versions();
+ size_t num_alpn_protocols = 0;
const char **alpn_protocol_strings =
- (const char **)gpr_malloc(sizeof(const char *) * num_alpn_protocols);
+ fill_alpn_protocol_strings(&num_alpn_protocols);
tsi_result result = TSI_OK;
grpc_ssl_channel_security_connector *c;
- size_t i;
const char *pem_root_certs;
char *port;
bool has_key_cert_pair;
- for (i = 0; i < num_alpn_protocols; i++) {
- alpn_protocol_strings[i] = grpc_chttp2_get_alpn_version_index(i);
- }
if (config == NULL || target_name == NULL) {
gpr_log(GPR_ERROR, "An ssl channel needs a config and a target name.");
@@ -965,50 +1058,64 @@ error:
return GRPC_SECURITY_ERROR;
}
+static grpc_ssl_server_security_connector *
+grpc_ssl_server_security_connector_initialize(
+ grpc_server_credentials *server_creds) {
+ grpc_ssl_server_security_connector *c =
+ (grpc_ssl_server_security_connector *)gpr_zalloc(
+ sizeof(grpc_ssl_server_security_connector));
+ gpr_ref_init(&c->base.base.refcount, 1);
+ c->base.base.url_scheme = GRPC_SSL_URL_SCHEME;
+ c->base.base.vtable = &ssl_server_vtable;
+ c->base.add_handshakers = ssl_server_add_handshakers;
+ c->base.server_creds = grpc_server_credentials_ref(server_creds);
+ return c;
+}
+
grpc_security_status grpc_ssl_server_security_connector_create(
- grpc_exec_ctx *exec_ctx, grpc_server_credentials *server_creds,
- const grpc_ssl_server_config *config, grpc_server_security_connector **sc) {
- size_t num_alpn_protocols = grpc_chttp2_num_alpn_versions();
- const char **alpn_protocol_strings =
- (const char **)gpr_malloc(sizeof(const char *) * num_alpn_protocols);
+ grpc_exec_ctx *exec_ctx, grpc_server_credentials *gsc,
+ grpc_server_security_connector **sc) {
tsi_result result = TSI_OK;
- grpc_ssl_server_security_connector *c;
- size_t i;
+ grpc_ssl_server_credentials *server_credentials =
+ (grpc_ssl_server_credentials *)gsc;
+ grpc_security_status retval = GRPC_SECURITY_OK;
- for (i = 0; i < num_alpn_protocols; i++) {
- alpn_protocol_strings[i] = grpc_chttp2_get_alpn_version_index(i);
- }
+ GPR_ASSERT(server_credentials != NULL);
+ GPR_ASSERT(sc != NULL);
- if (config == NULL || config->num_key_cert_pairs == 0) {
- gpr_log(GPR_ERROR, "An SSL server needs a key and a cert.");
- goto error;
+ grpc_ssl_server_security_connector *c =
+ grpc_ssl_server_security_connector_initialize(gsc);
+ if (server_connector_has_cert_config_fetcher(c)) {
+ // Load initial credentials from certificate_config_fetcher:
+ if (!try_fetch_ssl_server_credentials(c)) {
+ gpr_log(GPR_ERROR, "Failed loading SSL server credentials from fetcher.");
+ retval = GRPC_SECURITY_ERROR;
+ }
+ } else {
+ size_t num_alpn_protocols = 0;
+ const char **alpn_protocol_strings =
+ fill_alpn_protocol_strings(&num_alpn_protocols);
+ result = tsi_create_ssl_server_handshaker_factory_ex(
+ server_credentials->config.pem_key_cert_pairs,
+ server_credentials->config.num_key_cert_pairs,
+ server_credentials->config.pem_root_certs,
+ get_tsi_client_certificate_request_type(
+ server_credentials->config.client_certificate_request),
+ ssl_cipher_suites(), alpn_protocol_strings,
+ (uint16_t)num_alpn_protocols, &c->server_handshaker_factory);
+ gpr_free((void *)alpn_protocol_strings);
+ if (result != TSI_OK) {
+ gpr_log(GPR_ERROR, "Handshaker factory creation failed with %s.",
+ tsi_result_to_string(result));
+ retval = GRPC_SECURITY_ERROR;
+ }
}
- c = (grpc_ssl_server_security_connector *)gpr_zalloc(
- sizeof(grpc_ssl_server_security_connector));
- gpr_ref_init(&c->base.base.refcount, 1);
- c->base.base.url_scheme = GRPC_SSL_URL_SCHEME;
- c->base.base.vtable = &ssl_server_vtable;
- c->base.server_creds = grpc_server_credentials_ref(server_creds);
- result = tsi_create_ssl_server_handshaker_factory_ex(
- config->pem_key_cert_pairs, config->num_key_cert_pairs,
- config->pem_root_certs, get_tsi_client_certificate_request_type(
- config->client_certificate_request),
- ssl_cipher_suites(), alpn_protocol_strings, (uint16_t)num_alpn_protocols,
- &c->server_handshaker_factory);
- if (result != TSI_OK) {
- gpr_log(GPR_ERROR, "Handshaker factory creation failed with %s.",
- tsi_result_to_string(result));
- ssl_server_destroy(exec_ctx, &c->base.base);
- *sc = NULL;
- goto error;
+ if (retval == GRPC_SECURITY_OK) {
+ *sc = &c->base;
+ } else {
+ if (c != NULL) ssl_server_destroy(exec_ctx, &c->base.base);
+ if (sc != NULL) *sc = NULL;
}
- c->base.add_handshakers = ssl_server_add_handshakers;
- *sc = &c->base;
- gpr_free((void *)alpn_protocol_strings);
- return GRPC_SECURITY_OK;
-
-error:
- gpr_free((void *)alpn_protocol_strings);
- return GRPC_SECURITY_ERROR;
+ return retval;
}
diff --git a/src/core/lib/security/transport/security_connector.h b/src/core/lib/security/transport/security_connector.h
index 8287151f44..54a563bb2c 100644
--- a/src/core/lib/security/transport/security_connector.h
+++ b/src/core/lib/security/transport/security_connector.h
@@ -248,8 +248,8 @@ typedef struct {
specific error code otherwise.
*/
grpc_security_status grpc_ssl_server_security_connector_create(
- grpc_exec_ctx *exec_ctx, grpc_server_credentials *server_creds,
- const grpc_ssl_server_config *config, grpc_server_security_connector **sc);
+ grpc_exec_ctx *exec_ctx, grpc_server_credentials *server_credentials,
+ grpc_server_security_connector **sc);
/* Util. */
const tsi_peer_property *tsi_peer_get_property_by_name(const tsi_peer *peer,
diff --git a/src/cpp/client/channel_cc.cc b/src/cpp/client/channel_cc.cc
index 19a25c838f..4049fc846b 100644
--- a/src/cpp/client/channel_cc.cc
+++ b/src/cpp/client/channel_cc.cc
@@ -52,7 +52,7 @@ namespace {
int kConnectivityCheckIntervalMsec = 500;
void WatchStateChange(void* arg);
-class TagSaver final : public CompletionQueueTag {
+class TagSaver final : public internal::CompletionQueueTag {
public:
explicit TagSaver(void* tag) : tag_(tag) {}
~TagSaver() override {}
@@ -259,8 +259,9 @@ grpc::string Channel::GetServiceConfigJSON() const {
&channel_info.service_config_json);
}
-Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
- CompletionQueue* cq) {
+internal::Call Channel::CreateCall(const internal::RpcMethod& method,
+ ClientContext* context,
+ CompletionQueue* cq) {
const bool kRegistered = method.channel_tag() && context->authority().empty();
grpc_call* c_call = NULL;
if (kRegistered) {
@@ -292,10 +293,11 @@ Call Channel::CreateCall(const RpcMethod& method, ClientContext* context,
}
grpc_census_call_set_context(c_call, context->census_context());
context->set_call(c_call, shared_from_this());
- return Call(c_call, this, cq);
+ return internal::Call(c_call, this, cq);
}
-void Channel::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
+void Channel::PerformOpsOnCall(internal::CallOpSetInterface* ops,
+ internal::Call* call) {
static const size_t MAX_OPS = 8;
size_t nops = 0;
grpc_op cops[MAX_OPS];
diff --git a/src/cpp/client/generic_stub.cc b/src/cpp/client/generic_stub.cc
index 693b8bea56..fc18ce9093 100644
--- a/src/cpp/client/generic_stub.cc
+++ b/src/cpp/client/generic_stub.cc
@@ -27,8 +27,9 @@ std::unique_ptr<GenericClientAsyncReaderWriter> CallInternal(
ChannelInterface* channel, ClientContext* context,
const grpc::string& method, CompletionQueue* cq, bool start, void* tag) {
return std::unique_ptr<GenericClientAsyncReaderWriter>(
- GenericClientAsyncReaderWriter::Create(
- channel, cq, RpcMethod(method.c_str(), RpcMethod::BIDI_STREAMING),
+ internal::ClientAsyncReaderWriterFactory<ByteBuffer, ByteBuffer>::Create(
+ channel, cq, internal::RpcMethod(method.c_str(),
+ internal::RpcMethod::BIDI_STREAMING),
context, start, tag));
}
@@ -52,8 +53,9 @@ std::unique_ptr<GenericClientAsyncResponseReader> GenericStub::PrepareUnaryCall(
ClientContext* context, const grpc::string& method,
const ByteBuffer& request, CompletionQueue* cq) {
return std::unique_ptr<GenericClientAsyncResponseReader>(
- GenericClientAsyncResponseReader::Create(
- channel_.get(), cq, RpcMethod(method.c_str(), RpcMethod::NORMAL_RPC),
+ internal::ClientAsyncResponseReaderFactory<ByteBuffer>::Create(
+ channel_.get(), cq,
+ internal::RpcMethod(method.c_str(), internal::RpcMethod::NORMAL_RPC),
context, request, false));
}
diff --git a/src/cpp/common/completion_queue_cc.cc b/src/cpp/common/completion_queue_cc.cc
index 4a2e2be688..eb6dc8cc5f 100644
--- a/src/cpp/common/completion_queue_cc.cc
+++ b/src/cpp/common/completion_queue_cc.cc
@@ -60,7 +60,7 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
case GRPC_QUEUE_SHUTDOWN:
return SHUTDOWN;
case GRPC_OP_COMPLETE:
- auto cq_tag = static_cast<CompletionQueueTag*>(ev.tag);
+ auto cq_tag = static_cast<internal::CompletionQueueTag*>(ev.tag);
*ok = ev.success != 0;
*tag = cq_tag;
if (cq_tag->FinalizeResult(tag, ok)) {
@@ -87,7 +87,7 @@ bool CompletionQueue::CompletionQueueTLSCache::Flush(void** tag, bool* ok) {
flushed_ = true;
if (grpc_completion_queue_thread_local_cache_flush(cq_->cq_, &res_tag,
&res)) {
- auto cq_tag = static_cast<CompletionQueueTag*>(res_tag);
+ auto cq_tag = static_cast<internal::CompletionQueueTag*>(res_tag);
*ok = res == 1;
if (cq_tag->FinalizeResult(tag, ok)) {
return true;
diff --git a/src/cpp/server/health/default_health_check_service.cc b/src/cpp/server/health/default_health_check_service.cc
index d2cba6d662..10dbd3c39a 100644
--- a/src/cpp/server/health/default_health_check_service.cc
+++ b/src/cpp/server/health/default_health_check_service.cc
@@ -37,11 +37,12 @@ const char kHealthCheckMethodName[] = "/grpc.health.v1.Health/Check";
DefaultHealthCheckService::HealthCheckServiceImpl::HealthCheckServiceImpl(
DefaultHealthCheckService* service)
: service_(service), method_(nullptr) {
- MethodHandler* handler =
- new RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer, ByteBuffer>(
+ internal::MethodHandler* handler =
+ new internal::RpcMethodHandler<HealthCheckServiceImpl, ByteBuffer,
+ ByteBuffer>(
std::mem_fn(&HealthCheckServiceImpl::Check), this);
- method_ = new RpcServiceMethod(kHealthCheckMethodName, RpcMethod::NORMAL_RPC,
- handler);
+ method_ = new internal::RpcServiceMethod(
+ kHealthCheckMethodName, internal::RpcMethod::NORMAL_RPC, handler);
AddMethod(method_);
}
diff --git a/src/cpp/server/health/default_health_check_service.h b/src/cpp/server/health/default_health_check_service.h
index 09d5cebe98..99d6680c50 100644
--- a/src/cpp/server/health/default_health_check_service.h
+++ b/src/cpp/server/health/default_health_check_service.h
@@ -41,7 +41,7 @@ class DefaultHealthCheckService final : public HealthCheckServiceInterface {
private:
const DefaultHealthCheckService* const service_;
- RpcServiceMethod* method_;
+ internal::RpcServiceMethod* method_;
};
DefaultHealthCheckService();
diff --git a/src/cpp/server/server_cc.cc b/src/cpp/server/server_cc.cc
index d982a3d2b7..6480482774 100644
--- a/src/cpp/server/server_cc.cc
+++ b/src/cpp/server/server_cc.cc
@@ -90,7 +90,8 @@ class Server::UnimplementedAsyncRequest final
ServerCompletionQueue* const cq_;
};
-typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus>
+typedef internal::SneakyCallOpSet<internal::CallOpSendInitialMetadata,
+ internal::CallOpServerSendStatus>
UnimplementedAsyncResponseOp;
class Server::UnimplementedAsyncResponse final
: public UnimplementedAsyncResponseOp {
@@ -108,12 +109,12 @@ class Server::UnimplementedAsyncResponse final
UnimplementedAsyncRequest* const request_;
};
-class ShutdownTag : public CompletionQueueTag {
+class ShutdownTag : public internal::CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool* status) { return false; }
};
-class DummyTag : public CompletionQueueTag {
+class DummyTag : public internal::CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool* status) {
*status = true;
@@ -121,15 +122,15 @@ class DummyTag : public CompletionQueueTag {
}
};
-class Server::SyncRequest final : public CompletionQueueTag {
+class Server::SyncRequest final : public internal::CompletionQueueTag {
public:
- SyncRequest(RpcServiceMethod* method, void* tag)
+ SyncRequest(internal::RpcServiceMethod* method, void* tag)
: method_(method),
tag_(tag),
in_flight_(false),
- has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
- method->method_type() ==
- RpcMethod::SERVER_STREAMING),
+ has_request_payload_(
+ method->method_type() == internal::RpcMethod::NORMAL_RPC ||
+ method->method_type() == internal::RpcMethod::SERVER_STREAMING),
call_details_(nullptr),
cq_(nullptr) {
grpc_metadata_array_init(&request_metadata_);
@@ -212,14 +213,14 @@ class Server::SyncRequest final : public CompletionQueueTag {
void Run(std::shared_ptr<GlobalCallbacks> global_callbacks) {
ctx_.BeginCompletionOp(&call_);
global_callbacks->PreSynchronousRequest(&ctx_);
- method_->handler()->RunHandler(
- MethodHandler::HandlerParameter(&call_, &ctx_, request_payload_));
+ method_->handler()->RunHandler(internal::MethodHandler::HandlerParameter(
+ &call_, &ctx_, request_payload_));
global_callbacks->PostSynchronousRequest(&ctx_);
request_payload_ = nullptr;
cq_.Shutdown();
- CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag();
+ internal::CompletionQueueTag* op_tag = ctx_.GetCompletionOpTag();
cq_.TryPluck(op_tag, gpr_inf_future(GPR_CLOCK_REALTIME));
/* Ensure the cq_ is shutdown */
@@ -229,15 +230,15 @@ class Server::SyncRequest final : public CompletionQueueTag {
private:
CompletionQueue cq_;
- Call call_;
+ internal::Call call_;
ServerContext ctx_;
const bool has_request_payload_;
grpc_byte_buffer* request_payload_;
- RpcServiceMethod* const method_;
+ internal::RpcServiceMethod* const method_;
};
private:
- RpcServiceMethod* const method_;
+ internal::RpcServiceMethod* const method_;
void* const tag_;
bool in_flight_;
const bool has_request_payload_;
@@ -311,14 +312,15 @@ class Server::SyncRequestThreadManager : public ThreadManager {
// object
}
- void AddSyncMethod(RpcServiceMethod* method, void* tag) {
+ void AddSyncMethod(internal::RpcServiceMethod* method, void* tag) {
sync_requests_.emplace_back(new SyncRequest(method, tag));
}
void AddUnknownSyncMethod() {
if (!sync_requests_.empty()) {
- unknown_method_.reset(new RpcServiceMethod(
- "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
+ unknown_method_.reset(new internal::RpcServiceMethod(
+ "unknown", internal::RpcMethod::BIDI_STREAMING,
+ new internal::UnknownMethodHandler));
sync_requests_.emplace_back(
new SyncRequest(unknown_method_.get(), nullptr));
}
@@ -355,8 +357,8 @@ class Server::SyncRequestThreadManager : public ThreadManager {
CompletionQueue* server_cq_;
int cq_timeout_msec_;
std::vector<std::unique_ptr<SyncRequest>> sync_requests_;
- std::unique_ptr<RpcServiceMethod> unknown_method_;
- std::unique_ptr<RpcServiceMethod> health_check_;
+ std::unique_ptr<internal::RpcServiceMethod> unknown_method_;
+ std::unique_ptr<internal::RpcServiceMethod> health_check_;
std::shared_ptr<Server::GlobalCallbacks> global_callbacks_;
};
@@ -439,13 +441,13 @@ std::shared_ptr<Channel> Server::InProcessChannel(
}
static grpc_server_register_method_payload_handling PayloadHandlingForMethod(
- RpcServiceMethod* method) {
+ internal::RpcServiceMethod* method) {
switch (method->method_type()) {
- case RpcMethod::NORMAL_RPC:
- case RpcMethod::SERVER_STREAMING:
+ case internal::RpcMethod::NORMAL_RPC:
+ case internal::RpcMethod::SERVER_STREAMING:
return GRPC_SRM_PAYLOAD_READ_INITIAL_BYTE_BUFFER;
- case RpcMethod::CLIENT_STREAMING:
- case RpcMethod::BIDI_STREAMING:
+ case internal::RpcMethod::CLIENT_STREAMING:
+ case internal::RpcMethod::BIDI_STREAMING:
return GRPC_SRM_PAYLOAD_NONE;
}
GPR_UNREACHABLE_CODE(return GRPC_SRM_PAYLOAD_NONE;);
@@ -466,7 +468,7 @@ bool Server::RegisterService(const grpc::string* host, Service* service) {
continue;
}
- RpcServiceMethod* method = it->get();
+ internal::RpcServiceMethod* method = it->get();
void* tag = grpc_server_register_method(
server_, method->name(), host ? host->c_str() : nullptr,
PayloadHandlingForMethod(method), 0);
@@ -606,7 +608,8 @@ void Server::Wait() {
}
}
-void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
+void Server::PerformOpsOnCall(internal::CallOpSetInterface* ops,
+ internal::Call* call) {
static const size_t MAX_OPS = 8;
size_t nops = 0;
grpc_op cops[MAX_OPS];
@@ -622,8 +625,8 @@ void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
ServerInterface::BaseAsyncRequest::BaseAsyncRequest(
ServerInterface* server, ServerContext* context,
- ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag,
- bool delete_on_finalize)
+ internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ void* tag, bool delete_on_finalize)
: server_(server),
context_(context),
stream_(stream),
@@ -645,7 +648,8 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
}
context_->set_call(call_);
context_->cq_ = call_cq_;
- Call call(call_, server_, call_cq_, server_->max_receive_message_size());
+ internal::Call call(call_, server_, call_cq_,
+ server_->max_receive_message_size());
if (*status && call_) {
context_->BeginCompletionOp(&call);
}
@@ -660,7 +664,8 @@ bool ServerInterface::BaseAsyncRequest::FinalizeResult(void** tag,
ServerInterface::RegisteredAsyncRequest::RegisteredAsyncRequest(
ServerInterface* server, ServerContext* context,
- ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
+ internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ void* tag)
: BaseAsyncRequest(server, context, stream, call_cq, tag, true) {}
void ServerInterface::RegisteredAsyncRequest::IssueRequest(
@@ -675,7 +680,7 @@ void ServerInterface::RegisteredAsyncRequest::IssueRequest(
ServerInterface::GenericAsyncRequest::GenericAsyncRequest(
ServerInterface* server, GenericServerContext* context,
- ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
+ internal::ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
: BaseAsyncRequest(server, context, stream, call_cq, tag,
delete_on_finalize) {
@@ -718,7 +723,7 @@ Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
UnimplementedAsyncRequest* request)
: request_(request) {
Status status(StatusCode::UNIMPLEMENTED, "");
- UnknownMethodHandler::FillOps(request_->context(), this);
+ internal::UnknownMethodHandler::FillOps(request_->context(), this);
request_->stream()->call_.PerformOps(this);
}
diff --git a/src/cpp/server/server_context.cc b/src/cpp/server/server_context.cc
index d7876a000b..f2cb6363f5 100644
--- a/src/cpp/server/server_context.cc
+++ b/src/cpp/server/server_context.cc
@@ -37,7 +37,7 @@ namespace grpc {
// CompletionOp
-class ServerContext::CompletionOp final : public CallOpSetInterface {
+class ServerContext::CompletionOp final : public internal::CallOpSetInterface {
public:
// initial refs: one in the server context, one in the cq
CompletionOp()
@@ -146,7 +146,7 @@ ServerContext::~ServerContext() {
}
}
-void ServerContext::BeginCompletionOp(Call* call) {
+void ServerContext::BeginCompletionOp(internal::Call* call) {
GPR_ASSERT(!completion_op_);
completion_op_ = new CompletionOp();
if (has_notify_when_done_tag_) {
@@ -155,8 +155,8 @@ void ServerContext::BeginCompletionOp(Call* call) {
call->PerformOps(completion_op_);
}
-CompletionQueueTag* ServerContext::GetCompletionOpTag() {
- return static_cast<CompletionQueueTag*>(completion_op_);
+internal::CompletionQueueTag* ServerContext::GetCompletionOpTag() {
+ return static_cast<internal::CompletionQueueTag*>(completion_op_);
}
void ServerContext::AddInitialMetadata(const grpc::string& key,
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.c b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
index cd1bd98abc..843e05ab66 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.c
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.c
@@ -155,8 +155,14 @@ grpc_google_iam_credentials_create_type grpc_google_iam_credentials_create_impor
grpc_metadata_credentials_create_from_plugin_type grpc_metadata_credentials_create_from_plugin_import;
grpc_secure_channel_create_type grpc_secure_channel_create_import;
grpc_server_credentials_release_type grpc_server_credentials_release_import;
+grpc_ssl_server_certificate_config_create_type grpc_ssl_server_certificate_config_create_import;
+grpc_ssl_server_certificate_config_destroy_type grpc_ssl_server_certificate_config_destroy_import;
grpc_ssl_server_credentials_create_type grpc_ssl_server_credentials_create_import;
grpc_ssl_server_credentials_create_ex_type grpc_ssl_server_credentials_create_ex_import;
+grpc_ssl_server_credentials_create_options_using_config_type grpc_ssl_server_credentials_create_options_using_config_import;
+grpc_ssl_server_credentials_create_options_using_config_fetcher_type grpc_ssl_server_credentials_create_options_using_config_fetcher_import;
+grpc_ssl_server_credentials_options_destroy_type grpc_ssl_server_credentials_options_destroy_import;
+grpc_ssl_server_credentials_create_with_options_type grpc_ssl_server_credentials_create_with_options_import;
grpc_server_add_secure_http2_port_type grpc_server_add_secure_http2_port_import;
grpc_call_set_credentials_type grpc_call_set_credentials_import;
grpc_server_credentials_set_auth_metadata_processor_type grpc_server_credentials_set_auth_metadata_processor_import;
@@ -465,8 +471,14 @@ void grpc_rb_load_imports(HMODULE library) {
grpc_metadata_credentials_create_from_plugin_import = (grpc_metadata_credentials_create_from_plugin_type) GetProcAddress(library, "grpc_metadata_credentials_create_from_plugin");
grpc_secure_channel_create_import = (grpc_secure_channel_create_type) GetProcAddress(library, "grpc_secure_channel_create");
grpc_server_credentials_release_import = (grpc_server_credentials_release_type) GetProcAddress(library, "grpc_server_credentials_release");
+ grpc_ssl_server_certificate_config_create_import = (grpc_ssl_server_certificate_config_create_type) GetProcAddress(library, "grpc_ssl_server_certificate_config_create");
+ grpc_ssl_server_certificate_config_destroy_import = (grpc_ssl_server_certificate_config_destroy_type) GetProcAddress(library, "grpc_ssl_server_certificate_config_destroy");
grpc_ssl_server_credentials_create_import = (grpc_ssl_server_credentials_create_type) GetProcAddress(library, "grpc_ssl_server_credentials_create");
grpc_ssl_server_credentials_create_ex_import = (grpc_ssl_server_credentials_create_ex_type) GetProcAddress(library, "grpc_ssl_server_credentials_create_ex");
+ grpc_ssl_server_credentials_create_options_using_config_import = (grpc_ssl_server_credentials_create_options_using_config_type) GetProcAddress(library, "grpc_ssl_server_credentials_create_options_using_config");
+ grpc_ssl_server_credentials_create_options_using_config_fetcher_import = (grpc_ssl_server_credentials_create_options_using_config_fetcher_type) GetProcAddress(library, "grpc_ssl_server_credentials_create_options_using_config_fetcher");
+ grpc_ssl_server_credentials_options_destroy_import = (grpc_ssl_server_credentials_options_destroy_type) GetProcAddress(library, "grpc_ssl_server_credentials_options_destroy");
+ grpc_ssl_server_credentials_create_with_options_import = (grpc_ssl_server_credentials_create_with_options_type) GetProcAddress(library, "grpc_ssl_server_credentials_create_with_options");
grpc_server_add_secure_http2_port_import = (grpc_server_add_secure_http2_port_type) GetProcAddress(library, "grpc_server_add_secure_http2_port");
grpc_call_set_credentials_import = (grpc_call_set_credentials_type) GetProcAddress(library, "grpc_call_set_credentials");
grpc_server_credentials_set_auth_metadata_processor_import = (grpc_server_credentials_set_auth_metadata_processor_type) GetProcAddress(library, "grpc_server_credentials_set_auth_metadata_processor");
diff --git a/src/ruby/ext/grpc/rb_grpc_imports.generated.h b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
index c7e78b70dc..998eef800e 100644
--- a/src/ruby/ext/grpc/rb_grpc_imports.generated.h
+++ b/src/ruby/ext/grpc/rb_grpc_imports.generated.h
@@ -446,12 +446,30 @@ extern grpc_secure_channel_create_type grpc_secure_channel_create_import;
typedef void(*grpc_server_credentials_release_type)(grpc_server_credentials *creds);
extern grpc_server_credentials_release_type grpc_server_credentials_release_import;
#define grpc_server_credentials_release grpc_server_credentials_release_import
+typedef grpc_ssl_server_certificate_config *(*grpc_ssl_server_certificate_config_create_type)(const char *pem_root_certs, const grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs, size_t num_key_cert_pairs);
+extern grpc_ssl_server_certificate_config_create_type grpc_ssl_server_certificate_config_create_import;
+#define grpc_ssl_server_certificate_config_create grpc_ssl_server_certificate_config_create_import
+typedef void(*grpc_ssl_server_certificate_config_destroy_type)(grpc_ssl_server_certificate_config *config);
+extern grpc_ssl_server_certificate_config_destroy_type grpc_ssl_server_certificate_config_destroy_import;
+#define grpc_ssl_server_certificate_config_destroy grpc_ssl_server_certificate_config_destroy_import
typedef grpc_server_credentials *(*grpc_ssl_server_credentials_create_type)(const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs, size_t num_key_cert_pairs, int force_client_auth, void *reserved);
extern grpc_ssl_server_credentials_create_type grpc_ssl_server_credentials_create_import;
#define grpc_ssl_server_credentials_create grpc_ssl_server_credentials_create_import
typedef grpc_server_credentials *(*grpc_ssl_server_credentials_create_ex_type)(const char *pem_root_certs, grpc_ssl_pem_key_cert_pair *pem_key_cert_pairs, size_t num_key_cert_pairs, grpc_ssl_client_certificate_request_type client_certificate_request, void *reserved);
extern grpc_ssl_server_credentials_create_ex_type grpc_ssl_server_credentials_create_ex_import;
#define grpc_ssl_server_credentials_create_ex grpc_ssl_server_credentials_create_ex_import
+typedef grpc_ssl_server_credentials_options *(*grpc_ssl_server_credentials_create_options_using_config_type)(grpc_ssl_client_certificate_request_type client_certificate_request, grpc_ssl_server_certificate_config *certificate_config);
+extern grpc_ssl_server_credentials_create_options_using_config_type grpc_ssl_server_credentials_create_options_using_config_import;
+#define grpc_ssl_server_credentials_create_options_using_config grpc_ssl_server_credentials_create_options_using_config_import
+typedef grpc_ssl_server_credentials_options *(*grpc_ssl_server_credentials_create_options_using_config_fetcher_type)(grpc_ssl_client_certificate_request_type client_certificate_request, grpc_ssl_server_certificate_config_callback cb, void *user_data);
+extern grpc_ssl_server_credentials_create_options_using_config_fetcher_type grpc_ssl_server_credentials_create_options_using_config_fetcher_import;
+#define grpc_ssl_server_credentials_create_options_using_config_fetcher grpc_ssl_server_credentials_create_options_using_config_fetcher_import
+typedef void(*grpc_ssl_server_credentials_options_destroy_type)(grpc_ssl_server_credentials_options *options);
+extern grpc_ssl_server_credentials_options_destroy_type grpc_ssl_server_credentials_options_destroy_import;
+#define grpc_ssl_server_credentials_options_destroy grpc_ssl_server_credentials_options_destroy_import
+typedef grpc_server_credentials *(*grpc_ssl_server_credentials_create_with_options_type)(grpc_ssl_server_credentials_options *options);
+extern grpc_ssl_server_credentials_create_with_options_type grpc_ssl_server_credentials_create_with_options_import;
+#define grpc_ssl_server_credentials_create_with_options grpc_ssl_server_credentials_create_with_options_import
typedef int(*grpc_server_add_secure_http2_port_type)(grpc_server *server, const char *addr, grpc_server_credentials *creds);
extern grpc_server_add_secure_http2_port_type grpc_server_add_secure_http2_port_import;
#define grpc_server_add_secure_http2_port grpc_server_add_secure_http2_port_import
diff --git a/test/cpp/codegen/compiler_test_golden b/test/cpp/codegen/compiler_test_golden
index 3d664e8825..026a94112a 100644
--- a/test/cpp/codegen/compiler_test_golden
+++ b/test/cpp/codegen/compiler_test_golden
@@ -39,7 +39,6 @@
namespace grpc {
class CompletionQueue;
class Channel;
-class RpcService;
class ServerCompletionQueue;
class ServerContext;
} // namespace grpc
@@ -169,10 +168,10 @@ class ServiceA final {
::grpc::ClientReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* MethodA4Raw(::grpc::ClientContext* context) override;
::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* AsyncMethodA4Raw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq, void* tag) override;
::grpc::ClientAsyncReaderWriter< ::grpc::testing::Request, ::grpc::testing::Response>* PrepareAsyncMethodA4Raw(::grpc::ClientContext* context, ::grpc::CompletionQueue* cq) override;
- const ::grpc::RpcMethod rpcmethod_MethodA1_;
- const ::grpc::RpcMethod rpcmethod_MethodA2_;
- const ::grpc::RpcMethod rpcmethod_MethodA3_;
- const ::grpc::RpcMethod rpcmethod_MethodA4_;
+ const ::grpc::internal::RpcMethod rpcmethod_MethodA1_;
+ const ::grpc::internal::RpcMethod rpcmethod_MethodA2_;
+ const ::grpc::internal::RpcMethod rpcmethod_MethodA3_;
+ const ::grpc::internal::RpcMethod rpcmethod_MethodA4_;
};
static std::unique_ptr<Stub> NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions());
@@ -352,7 +351,7 @@ class ServiceA final {
public:
WithStreamedUnaryMethod_MethodA1() {
::grpc::Service::MarkMethodStreamed(0,
- new ::grpc::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodA1<BaseClass>::StreamedMethodA1, this, std::placeholders::_1, std::placeholders::_2)));
+ new ::grpc::internal::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodA1<BaseClass>::StreamedMethodA1, this, std::placeholders::_1, std::placeholders::_2)));
}
~WithStreamedUnaryMethod_MethodA1() override {
BaseClassMustBeDerivedFromService(this);
@@ -373,7 +372,7 @@ class ServiceA final {
public:
WithSplitStreamingMethod_MethodA3() {
::grpc::Service::MarkMethodStreamed(2,
- new ::grpc::SplitServerStreamingHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithSplitStreamingMethod_MethodA3<BaseClass>::StreamedMethodA3, this, std::placeholders::_1, std::placeholders::_2)));
+ new ::grpc::internal::SplitServerStreamingHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithSplitStreamingMethod_MethodA3<BaseClass>::StreamedMethodA3, this, std::placeholders::_1, std::placeholders::_2)));
}
~WithSplitStreamingMethod_MethodA3() override {
BaseClassMustBeDerivedFromService(this);
@@ -427,7 +426,7 @@ class ServiceB final {
std::shared_ptr< ::grpc::ChannelInterface> channel_;
::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* AsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override;
::grpc::ClientAsyncResponseReader< ::grpc::testing::Response>* PrepareAsyncMethodB1Raw(::grpc::ClientContext* context, const ::grpc::testing::Request& request, ::grpc::CompletionQueue* cq) override;
- const ::grpc::RpcMethod rpcmethod_MethodB1_;
+ const ::grpc::internal::RpcMethod rpcmethod_MethodB1_;
};
static std::unique_ptr<Stub> NewStub(const std::shared_ptr< ::grpc::ChannelInterface>& channel, const ::grpc::StubOptions& options = ::grpc::StubOptions());
@@ -484,7 +483,7 @@ class ServiceB final {
public:
WithStreamedUnaryMethod_MethodB1() {
::grpc::Service::MarkMethodStreamed(0,
- new ::grpc::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodB1<BaseClass>::StreamedMethodB1, this, std::placeholders::_1, std::placeholders::_2)));
+ new ::grpc::internal::StreamedUnaryHandler< ::grpc::testing::Request, ::grpc::testing::Response>(std::bind(&WithStreamedUnaryMethod_MethodB1<BaseClass>::StreamedMethodB1, this, std::placeholders::_1, std::placeholders::_2)));
}
~WithStreamedUnaryMethod_MethodB1() override {
BaseClassMustBeDerivedFromService(this);
diff --git a/test/cpp/microbenchmarks/bm_cq.cc b/test/cpp/microbenchmarks/bm_cq.cc
index a0c0414f2f..020ec0461c 100644
--- a/test/cpp/microbenchmarks/bm_cq.cc
+++ b/test/cpp/microbenchmarks/bm_cq.cc
@@ -70,7 +70,7 @@ BENCHMARK(BM_CreateDestroyCore);
static void DoneWithCompletionOnStack(grpc_exec_ctx* exec_ctx, void* arg,
grpc_cq_completion* completion) {}
-class DummyTag final : public CompletionQueueTag {
+class DummyTag final : public internal::CompletionQueueTag {
public:
bool FinalizeResult(void** tag, bool* status) override { return true; }
};
diff --git a/test/cpp/qps/client_async.cc b/test/cpp/qps/client_async.cc
index b5c7208664..a541f94fa5 100644
--- a/test/cpp/qps/client_async.cc
+++ b/test/cpp/qps/client_async.cc
@@ -245,9 +245,20 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
if (!cli_cqs_[cq_[thread_idx]]->Next(&got_tag, &ok)) {
return;
}
- ClientRpcContext* ctx;
+ ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
std::mutex* shutdown_mu = &shutdown_state_[thread_idx]->mutex;
- do {
+ shutdown_mu->lock();
+ while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
+ [&, ctx, ok, entry_ptr, shutdown_mu]() {
+ if (!ctx->RunNextState(ok, entry_ptr)) {
+ // The RPC and callback are done, so clone the ctx
+ // and kickstart the new one
+ ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());
+ delete ctx;
+ }
+ shutdown_mu->unlock();
+ },
+ &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME))) {
t->UpdateHistogram(entry_ptr);
// Got a regular event, so process it
ctx = ClientRpcContext::detag(got_tag);
@@ -265,18 +276,7 @@ class AsyncClient : public ClientImpl<StubType, RequestType> {
shutdown_mu->unlock();
return;
}
- } while (cli_cqs_[cq_[thread_idx]]->DoThenAsyncNext(
- [&, ctx, ok, entry_ptr, shutdown_mu]() {
- bool next_ok = ok;
- if (!ctx->RunNextState(next_ok, entry_ptr)) {
- // The RPC and callback are done, so clone the ctx
- // and kickstart the new one
- ctx->StartNewClone(cli_cqs_[cq_[thread_idx]].get());
- delete ctx;
- }
- shutdown_mu->unlock();
- },
- &got_tag, &ok, gpr_inf_future(GPR_CLOCK_REALTIME)));
+ }
}
std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 4cf80e9e3d..1c1a5636a9 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -206,13 +206,12 @@ class AsyncQpsServerTest final : public grpc::testing::Server {
return;
}
ServerRpcContext *ctx;
- std::mutex *mu_ptr;
+ std::mutex *mu_ptr = &shutdown_state_[thread_idx]->mutex;
do {
ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
// Proceed while holding a lock to make sure that
// this thread isn't supposed to shut down
- mu_ptr = &shutdown_state_[thread_idx]->mutex;
mu_ptr->lock();
if (shutdown_state_[thread_idx]->shutdown) {
mu_ptr->unlock();