aboutsummaryrefslogtreecommitdiffhomepage
path: root/include
diff options
context:
space:
mode:
authorGravatar yang-g <yangg@google.com>2015-07-16 21:03:49 -0700
committerGravatar yang-g <yangg@google.com>2015-07-16 21:03:49 -0700
commitfb58d239ef360b4fcdeeda358e46bb107ceceaa4 (patch)
tree5768a672fc92eacf5c4a573b3d52c1b3723bc738 /include
parent7ef7232dd9eba4be7d81b59a6bb59c4fd634ea08 (diff)
parent65ef0fffaecf8396cb93b3a76416d099f7b07438 (diff)
Merge with head
Diffstat (limited to 'include')
-rw-r--r--include/grpc++/fixed_size_thread_pool.h67
-rw-r--r--include/grpc++/impl/call.h141
-rw-r--r--include/grpc++/server_context.h6
-rw-r--r--include/grpc++/stream.h26
-rw-r--r--include/grpc++/thread_pool_interface.h2
5 files changed, 218 insertions, 24 deletions
diff --git a/include/grpc++/fixed_size_thread_pool.h b/include/grpc++/fixed_size_thread_pool.h
new file mode 100644
index 0000000000..9f0cbfbae9
--- /dev/null
+++ b/include/grpc++/fixed_size_thread_pool.h
@@ -0,0 +1,67 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#ifndef GRPC_INTERNAL_CPP_SERVER_THREAD_POOL_H
+#define GRPC_INTERNAL_CPP_SERVER_THREAD_POOL_H
+
+#include <grpc++/config.h>
+
+#include <grpc++/impl/sync.h>
+#include <grpc++/impl/thd.h>
+#include <grpc++/thread_pool_interface.h>
+
+#include <queue>
+#include <vector>
+
+namespace grpc {
+
+class FixedSizeThreadPool GRPC_FINAL : public ThreadPoolInterface {
+ public:
+ explicit FixedSizeThreadPool(int num_threads);
+ ~FixedSizeThreadPool();
+
+ void ScheduleCallback(const std::function<void()>& callback) GRPC_OVERRIDE;
+
+ private:
+ grpc::mutex mu_;
+ grpc::condition_variable cv_;
+ bool shutdown_;
+ std::queue<std::function<void()>> callbacks_;
+ std::vector<grpc::thread*> threads_;
+
+ void ThreadFunc();
+};
+
+} // namespace grpc
+
+#endif // GRPC_INTERNAL_CPP_SERVER_THREAD_POOL_H
diff --git a/include/grpc++/impl/call.h b/include/grpc++/impl/call.h
index 64fa5d6efb..1fa4490779 100644
--- a/include/grpc++/impl/call.h
+++ b/include/grpc++/impl/call.h
@@ -60,6 +60,93 @@ void FillMetadataMap(grpc_metadata_array* arr,
grpc_metadata* FillMetadataArray(
const std::multimap<grpc::string, grpc::string>& metadata);
+/// Per-message write options.
+class WriteOptions {
+ public:
+ WriteOptions() : flags_(0) {}
+ WriteOptions(const WriteOptions& other) : flags_(other.flags_) {}
+
+ /// Clear all flags.
+ inline void Clear() {
+ flags_ = 0;
+ }
+
+ /// Returns raw flags bitset.
+ inline gpr_uint32 flags() const {
+ return flags_;
+ }
+
+ /// Sets flag for the disabling of compression for the next message write.
+ ///
+ /// \sa GRPC_WRITE_NO_COMPRESS
+ inline WriteOptions& set_no_compression() {
+ SetBit(GRPC_WRITE_NO_COMPRESS);
+ return *this;
+ }
+
+ /// Clears flag for the disabling of compression for the next message write.
+ ///
+ /// \sa GRPC_WRITE_NO_COMPRESS
+ inline WriteOptions& clear_no_compression() {
+ ClearBit(GRPC_WRITE_NO_COMPRESS);
+ return *this;
+ }
+
+ /// Get value for the flag indicating whether compression for the next
+ /// message write is forcefully disabled.
+ ///
+ /// \sa GRPC_WRITE_NO_COMPRESS
+ inline bool get_no_compression() const {
+ return GetBit(GRPC_WRITE_NO_COMPRESS);
+ }
+
+ /// Sets flag indicating that the write may be buffered and need not go out on
+ /// the wire immediately.
+ ///
+ /// \sa GRPC_WRITE_BUFFER_HINT
+ inline WriteOptions& set_buffer_hint() {
+ SetBit(GRPC_WRITE_BUFFER_HINT);
+ return *this;
+ }
+
+ /// Clears flag indicating that the write may be buffered and need not go out
+ /// on the wire immediately.
+ ///
+ /// \sa GRPC_WRITE_BUFFER_HINT
+ inline WriteOptions& clear_buffer_hint() {
+ ClearBit(GRPC_WRITE_BUFFER_HINT);
+ return *this;
+ }
+
+ /// Get value for the flag indicating that the write may be buffered and need
+ /// not go out on the wire immediately.
+ ///
+ /// \sa GRPC_WRITE_BUFFER_HINT
+ inline bool get_buffer_hint() const {
+ return GetBit(GRPC_WRITE_BUFFER_HINT);
+ }
+
+ WriteOptions& operator=(const WriteOptions& rhs) {
+ flags_ = rhs.flags_;
+ return *this;
+ }
+
+ private:
+ void SetBit(const gpr_int32 mask) {
+ flags_ |= mask;
+ }
+
+ void ClearBit(const gpr_int32 mask) {
+ flags_ &= ~mask;
+ }
+
+ bool GetBit(const gpr_int32 mask) const {
+ return flags_ & mask;
+ }
+
+ gpr_uint32 flags_;
+};
+
/// Default argument for CallOpSet. I is unused by the class, but can be
/// used for generating multiple names for the same thing.
template <int I>
@@ -104,6 +191,12 @@ class CallOpSendMessage {
public:
CallOpSendMessage() : send_buf_(nullptr), own_buf_(false) {}
+ /// Send \a message using \a options for the write. The \a options are cleared
+ /// after use.
+ template <class M>
+ Status SendMessage(const M& message,
+ const WriteOptions& options) GRPC_MUST_USE_RESULT;
+
template <class M>
Status SendMessage(const M& message) GRPC_MUST_USE_RESULT;
@@ -112,8 +205,10 @@ class CallOpSendMessage {
if (send_buf_ == nullptr) return;
grpc_op* op = &ops[(*nops)++];
op->op = GRPC_OP_SEND_MESSAGE;
- op->flags = 0;
+ op->flags = write_options_.flags();
op->data.send_message = send_buf_;
+ // Flags are per-message: clear them after use.
+ write_options_.Clear();
}
void FinishOp(bool* status, int max_message_size) {
if (own_buf_) grpc_byte_buffer_destroy(send_buf_);
@@ -122,14 +217,22 @@ class CallOpSendMessage {
private:
grpc_byte_buffer* send_buf_;
+ WriteOptions write_options_;
bool own_buf_;
};
template <class M>
-Status CallOpSendMessage::SendMessage(const M& message) {
+Status CallOpSendMessage::SendMessage(const M& message,
+ const WriteOptions& options) {
+ write_options_ = options;
return SerializationTraits<M>::Serialize(message, &send_buf_, &own_buf_);
}
+template <class M>
+Status CallOpSendMessage::SendMessage(const M& message) {
+ return SendMessage(message, WriteOptions());
+}
+
template <class R>
class CallOpRecvMessage {
public:
@@ -172,17 +275,34 @@ class CallOpRecvMessage {
grpc_byte_buffer* recv_buf_;
};
+namespace CallOpGenericRecvMessageHelper {
+class DeserializeFunc {
+ public:
+ virtual Status Deserialize(grpc_byte_buffer* buf, int max_message_size) = 0;
+};
+
+template <class R>
+class DeserializeFuncType GRPC_FINAL : public DeserializeFunc {
+ public:
+ DeserializeFuncType(R* message) : message_(message) {}
+ Status Deserialize(grpc_byte_buffer* buf,
+ int max_message_size) GRPC_OVERRIDE {
+ return SerializationTraits<R>::Deserialize(buf, message_, max_message_size);
+ }
+
+ private:
+ R* message_; // Not a managed pointer because management is external to this
+};
+} // namespace CallOpGenericRecvMessageHelper
+
class CallOpGenericRecvMessage {
public:
CallOpGenericRecvMessage() : got_message(false) {}
template <class R>
void RecvMessage(R* message) {
- deserialize_ = [message](grpc_byte_buffer* buf,
- int max_message_size) -> Status {
- return SerializationTraits<R>::Deserialize(buf, message,
- max_message_size);
- };
+ deserialize_.reset(
+ new CallOpGenericRecvMessageHelper::DeserializeFuncType<R>(message));
}
bool got_message;
@@ -201,7 +321,7 @@ class CallOpGenericRecvMessage {
if (recv_buf_) {
if (*status) {
got_message = true;
- *status = deserialize_(recv_buf_, max_message_size).ok();
+ *status = deserialize_->Deserialize(recv_buf_, max_message_size).ok();
} else {
got_message = false;
grpc_byte_buffer_destroy(recv_buf_);
@@ -210,12 +330,11 @@ class CallOpGenericRecvMessage {
got_message = false;
*status = false;
}
- deserialize_ = DeserializeFunc();
+ deserialize_.reset();
}
private:
- typedef std::function<Status(grpc_byte_buffer*, int)> DeserializeFunc;
- DeserializeFunc deserialize_;
+ std::unique_ptr<CallOpGenericRecvMessageHelper::DeserializeFunc> deserialize_;
grpc_byte_buffer* recv_buf_;
};
diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h
index a4ee986df1..2123d03291 100644
--- a/include/grpc++/server_context.h
+++ b/include/grpc++/server_context.h
@@ -99,9 +99,7 @@ class ServerContext {
return client_metadata_;
}
- std::shared_ptr<const AuthContext> auth_context() const {
- return auth_context_;
- }
+ std::shared_ptr<const AuthContext> auth_context() const;
private:
friend class ::grpc::Server;
@@ -147,7 +145,7 @@ class ServerContext {
grpc_call* call_;
CompletionQueue* cq_;
bool sent_initial_metadata_;
- std::shared_ptr<const AuthContext> auth_context_;
+ mutable std::shared_ptr<const AuthContext> auth_context_;
std::multimap<grpc::string, grpc::string> client_metadata_;
std::multimap<grpc::string, grpc::string> initial_metadata_;
std::multimap<grpc::string, grpc::string> trailing_metadata_;
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h
index dd5e52d6d3..3903f2ec06 100644
--- a/include/grpc++/stream.h
+++ b/include/grpc++/stream.h
@@ -79,7 +79,11 @@ class WriterInterface {
// Blocking write msg to the stream. Returns true on success.
// Returns false when the stream has been closed.
- virtual bool Write(const W& msg) = 0;
+ virtual bool Write(const W& msg, const WriteOptions& options) = 0;
+
+ inline bool Write(const W& msg) {
+ return Write(msg, WriteOptions());
+ }
};
template <class R>
@@ -168,9 +172,10 @@ class ClientWriter : public ClientWriterInterface<W> {
cq_.Pluck(&ops);
}
- bool Write(const W& msg) GRPC_OVERRIDE {
+ using WriterInterface<W>::Write;
+ bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
CallOpSet<CallOpSendMessage> ops;
- if (!ops.SendMessage(msg).ok()) {
+ if (!ops.SendMessage(msg, options).ok()) {
return false;
}
call_.PerformOps(&ops);
@@ -246,9 +251,10 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
return cq_.Pluck(&ops) && ops.got_message;
}
- bool Write(const W& msg) GRPC_OVERRIDE {
+ using WriterInterface<W>::Write;
+ bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
CallOpSet<CallOpSendMessage> ops;
- if (!ops.SendMessage(msg).ok()) return false;
+ if (!ops.SendMessage(msg, options).ok()) return false;
call_.PerformOps(&ops);
return cq_.Pluck(&ops);
}
@@ -317,9 +323,10 @@ class ServerWriter GRPC_FINAL : public WriterInterface<W> {
call_->cq()->Pluck(&ops);
}
- bool Write(const W& msg) GRPC_OVERRIDE {
+ using WriterInterface<W>::Write;
+ bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
- if (!ops.SendMessage(msg).ok()) {
+ if (!ops.SendMessage(msg, options).ok()) {
return false;
}
if (!ctx_->sent_initial_metadata_) {
@@ -359,9 +366,10 @@ class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
return call_->cq()->Pluck(&ops) && ops.got_message;
}
- bool Write(const W& msg) GRPC_OVERRIDE {
+ using WriterInterface<W>::Write;
+ bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
- if (!ops.SendMessage(msg).ok()) {
+ if (!ops.SendMessage(msg, options).ok()) {
return false;
}
if (!ctx_->sent_initial_metadata_) {
diff --git a/include/grpc++/thread_pool_interface.h b/include/grpc++/thread_pool_interface.h
index ead307f6a2..ac4458d530 100644
--- a/include/grpc++/thread_pool_interface.h
+++ b/include/grpc++/thread_pool_interface.h
@@ -47,6 +47,8 @@ class ThreadPoolInterface {
virtual void ScheduleCallback(const std::function<void()>& callback) = 0;
};
+ThreadPoolInterface* CreateDefaultThreadPool();
+
} // namespace grpc
#endif // GRPCXX_THREAD_POOL_INTERFACE_H