aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/common
diff options
context:
space:
mode:
Diffstat (limited to 'src/cpp/common')
-rw-r--r--src/cpp/common/call.cc92
-rw-r--r--src/cpp/common/completion_queue.cc33
-rw-r--r--src/cpp/common/core_codegen.cc256
-rw-r--r--src/cpp/common/core_codegen.h71
4 files changed, 331 insertions, 121 deletions
diff --git a/src/cpp/common/call.cc b/src/cpp/common/call.cc
deleted file mode 100644
index 5b87c2a806..0000000000
--- a/src/cpp/common/call.cc
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- *
- * Copyright 2015, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <grpc++/impl/call.h>
-
-#include <grpc/support/alloc.h>
-#include <grpc++/channel.h>
-#include <grpc++/client_context.h>
-#include <grpc++/support/byte_buffer.h>
-#include "src/core/profiling/timers.h"
-
-namespace grpc {
-
-void FillMetadataMap(
- grpc_metadata_array* arr,
- std::multimap<grpc::string_ref, grpc::string_ref>* metadata) {
- for (size_t i = 0; i < arr->count; i++) {
- // TODO(yangg) handle duplicates?
- metadata->insert(std::pair<grpc::string_ref, grpc::string_ref>(
- arr->metadata[i].key, grpc::string_ref(arr->metadata[i].value,
- arr->metadata[i].value_length)));
- }
- grpc_metadata_array_destroy(arr);
- grpc_metadata_array_init(arr);
-}
-
-// TODO(yangg) if the map is changed before we send, the pointers will be a
-// mess. Make sure it does not happen.
-grpc_metadata* FillMetadataArray(
- const std::multimap<grpc::string, grpc::string>& metadata) {
- if (metadata.empty()) {
- return nullptr;
- }
- grpc_metadata* metadata_array =
- (grpc_metadata*)gpr_malloc(metadata.size() * sizeof(grpc_metadata));
- size_t i = 0;
- for (auto iter = metadata.cbegin(); iter != metadata.cend(); ++iter, ++i) {
- metadata_array[i].key = iter->first.c_str();
- metadata_array[i].value = iter->second.c_str();
- metadata_array[i].value_length = iter->second.size();
- }
- return metadata_array;
-}
-
-Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq)
- : call_hook_(call_hook), cq_(cq), call_(call), max_message_size_(-1) {}
-
-Call::Call(grpc_call* call, CallHook* call_hook, CompletionQueue* cq,
- int max_message_size)
- : call_hook_(call_hook),
- cq_(cq),
- call_(call),
- max_message_size_(max_message_size) {}
-
-void Call::PerformOps(CallOpSetInterface* ops) {
- if (max_message_size_ > 0) {
- ops->set_max_message_size(max_message_size_);
- }
- call_hook_->PerformOpsOnCall(ops, this);
-}
-
-} // namespace grpc
diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc
index 4f76dfff1d..729dc33749 100644
--- a/src/cpp/common/completion_queue.cc
+++ b/src/cpp/common/completion_queue.cc
@@ -34,7 +34,6 @@
#include <memory>
-#include <grpc++/impl/codegen/completion_queue_tag.h>
#include <grpc++/impl/grpc_library.h>
#include <grpc++/support/time.h>
#include <grpc/grpc.h>
@@ -43,16 +42,13 @@
namespace grpc {
static internal::GrpcLibraryInitializer g_gli_initializer;
-CompletionQueue::CompletionQueue() {
- g_gli_initializer.summon();
- cq_ = grpc_completion_queue_create(nullptr);
-}
CompletionQueue::CompletionQueue(grpc_completion_queue* take) : cq_(take) {}
-CompletionQueue::~CompletionQueue() { grpc_completion_queue_destroy(cq_); }
-
-void CompletionQueue::Shutdown() { grpc_completion_queue_shutdown(cq_); }
+void CompletionQueue::Shutdown() {
+ g_gli_initializer.summon();
+ grpc_completion_queue_shutdown(cq_);
+}
CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
void** tag, bool* ok, gpr_timespec deadline) {
@@ -75,25 +71,4 @@ CompletionQueue::NextStatus CompletionQueue::AsyncNextInternal(
}
}
-bool CompletionQueue::Pluck(CompletionQueueTag* tag) {
- auto deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
- auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr);
- bool ok = ev.success != 0;
- void* ignored = tag;
- GPR_ASSERT(tag->FinalizeResult(&ignored, &ok));
- GPR_ASSERT(ignored == tag);
- // Ignore mutations by FinalizeResult: Pluck returns the C API status
- return ev.success != 0;
-}
-
-void CompletionQueue::TryPluck(CompletionQueueTag* tag) {
- auto deadline = gpr_time_0(GPR_CLOCK_REALTIME);
- auto ev = grpc_completion_queue_pluck(cq_, tag, deadline, nullptr);
- if (ev.type == GRPC_QUEUE_TIMEOUT) return;
- bool ok = ev.success != 0;
- void* ignored = tag;
- // the tag must be swallowed if using TryPluck
- GPR_ASSERT(!tag->FinalizeResult(&ignored, &ok));
-}
-
} // namespace grpc
diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc
new file mode 100644
index 0000000000..45e9e278a0
--- /dev/null
+++ b/src/cpp/common/core_codegen.cc
@@ -0,0 +1,256 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include "src/cpp/common/core_codegen.h"
+
+#include <stdlib.h>
+
+#include <grpc++/support/config.h>
+#include <grpc/byte_buffer.h>
+#include <grpc/byte_buffer_reader.h>
+#include <grpc/grpc.h>
+#include <grpc/impl/codegen/alloc.h>
+#include <grpc/impl/codegen/byte_buffer.h>
+#include <grpc/impl/codegen/log.h>
+#include <grpc/support/port_platform.h>
+#include <grpc/support/slice.h>
+#include <grpc/support/slice_buffer.h>
+
+#include "src/core/profiling/timers.h"
+
+namespace {
+
+const int kGrpcBufferWriterMaxBufferLength = 8192;
+
+class GrpcBufferWriter GRPC_FINAL
+ : public ::grpc::protobuf::io::ZeroCopyOutputStream {
+ public:
+ explicit GrpcBufferWriter(grpc_byte_buffer** bp, int block_size)
+ : block_size_(block_size), byte_count_(0), have_backup_(false) {
+ *bp = grpc_raw_byte_buffer_create(NULL, 0);
+ slice_buffer_ = &(*bp)->data.raw.slice_buffer;
+ }
+
+ ~GrpcBufferWriter() GRPC_OVERRIDE {
+ if (have_backup_) {
+ gpr_slice_unref(backup_slice_);
+ }
+ }
+
+ bool Next(void** data, int* size) GRPC_OVERRIDE {
+ if (have_backup_) {
+ slice_ = backup_slice_;
+ have_backup_ = false;
+ } else {
+ slice_ = gpr_slice_malloc(block_size_);
+ }
+ *data = GPR_SLICE_START_PTR(slice_);
+ // On win x64, int is only 32bit
+ GPR_ASSERT(GPR_SLICE_LENGTH(slice_) <= INT_MAX);
+ byte_count_ += * size = (int)GPR_SLICE_LENGTH(slice_);
+ gpr_slice_buffer_add(slice_buffer_, slice_);
+ return true;
+ }
+
+ void BackUp(int count) GRPC_OVERRIDE {
+ gpr_slice_buffer_pop(slice_buffer_);
+ if (count == block_size_) {
+ backup_slice_ = slice_;
+ } else {
+ backup_slice_ =
+ gpr_slice_split_tail(&slice_, GPR_SLICE_LENGTH(slice_) - count);
+ gpr_slice_buffer_add(slice_buffer_, slice_);
+ }
+ have_backup_ = true;
+ byte_count_ -= count;
+ }
+
+ grpc::protobuf::int64 ByteCount() const GRPC_OVERRIDE { return byte_count_; }
+
+ private:
+ const int block_size_;
+ int64_t byte_count_;
+ gpr_slice_buffer* slice_buffer_;
+ bool have_backup_;
+ gpr_slice backup_slice_;
+ gpr_slice slice_;
+};
+
+class GrpcBufferReader GRPC_FINAL
+ : public ::grpc::protobuf::io::ZeroCopyInputStream {
+ public:
+ explicit GrpcBufferReader(grpc_byte_buffer* buffer)
+ : byte_count_(0), backup_count_(0) {
+ grpc_byte_buffer_reader_init(&reader_, buffer);
+ }
+ ~GrpcBufferReader() GRPC_OVERRIDE {
+ grpc_byte_buffer_reader_destroy(&reader_);
+ }
+
+ bool Next(const void** data, int* size) GRPC_OVERRIDE {
+ if (backup_count_ > 0) {
+ *data = GPR_SLICE_START_PTR(slice_) + GPR_SLICE_LENGTH(slice_) -
+ backup_count_;
+ GPR_ASSERT(backup_count_ <= INT_MAX);
+ *size = (int)backup_count_;
+ backup_count_ = 0;
+ return true;
+ }
+ if (!grpc_byte_buffer_reader_next(&reader_, &slice_)) {
+ return false;
+ }
+ gpr_slice_unref(slice_);
+ *data = GPR_SLICE_START_PTR(slice_);
+ // On win x64, int is only 32bit
+ GPR_ASSERT(GPR_SLICE_LENGTH(slice_) <= INT_MAX);
+ byte_count_ += * size = (int)GPR_SLICE_LENGTH(slice_);
+ return true;
+ }
+
+ void BackUp(int count) GRPC_OVERRIDE { backup_count_ = count; }
+
+ bool Skip(int count) GRPC_OVERRIDE {
+ const void* data;
+ int size;
+ while (Next(&data, &size)) {
+ if (size >= count) {
+ BackUp(size - count);
+ return true;
+ }
+ // size < count;
+ count -= size;
+ }
+ // error or we have too large count;
+ return false;
+ }
+
+ grpc::protobuf::int64 ByteCount() const GRPC_OVERRIDE {
+ return byte_count_ - backup_count_;
+ }
+
+ private:
+ int64_t byte_count_;
+ int64_t backup_count_;
+ grpc_byte_buffer_reader reader_;
+ gpr_slice slice_;
+};
+} // namespace
+
+namespace grpc {
+
+grpc_completion_queue* CoreCodegen::grpc_completion_queue_create(
+ void* reserved) {
+ return ::grpc_completion_queue_create(reserved);
+}
+
+void CoreCodegen::grpc_completion_queue_destroy(grpc_completion_queue* cq) {
+ ::grpc_completion_queue_destroy(cq);
+}
+
+grpc_event CoreCodegen::grpc_completion_queue_pluck(grpc_completion_queue* cq,
+ void* tag,
+ gpr_timespec deadline,
+ void* reserved) {
+ return ::grpc_completion_queue_pluck(cq, tag, deadline, reserved);
+}
+
+void* CoreCodegen::gpr_malloc(size_t size) { return ::gpr_malloc(size); }
+
+void CoreCodegen::gpr_free(void* p) { return ::gpr_free(p); }
+
+void CoreCodegen::grpc_byte_buffer_destroy(grpc_byte_buffer* bb) {
+ ::grpc_byte_buffer_destroy(bb);
+}
+
+void CoreCodegen::grpc_metadata_array_init(grpc_metadata_array* array) {
+ ::grpc_metadata_array_init(array);
+}
+
+void CoreCodegen::grpc_metadata_array_destroy(grpc_metadata_array* array) {
+ ::grpc_metadata_array_destroy(array);
+}
+
+gpr_timespec CoreCodegen::gpr_inf_future(gpr_clock_type type) {
+ return ::gpr_inf_future(type);
+}
+
+void CoreCodegen::assert_fail(const char* failed_assertion) {
+ gpr_log(GPR_ERROR, "assertion failed: %s", failed_assertion);
+ abort();
+}
+
+Status CoreCodegen::SerializeProto(const grpc::protobuf::Message& msg,
+ grpc_byte_buffer** bp) {
+ GPR_TIMER_SCOPE("SerializeProto", 0);
+ int byte_size = msg.ByteSize();
+ if (byte_size <= kGrpcBufferWriterMaxBufferLength) {
+ gpr_slice slice = gpr_slice_malloc(byte_size);
+ GPR_ASSERT(GPR_SLICE_END_PTR(slice) ==
+ msg.SerializeWithCachedSizesToArray(GPR_SLICE_START_PTR(slice)));
+ *bp = grpc_raw_byte_buffer_create(&slice, 1);
+ gpr_slice_unref(slice);
+ return Status::OK;
+ } else {
+ GrpcBufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength);
+ return msg.SerializeToZeroCopyStream(&writer)
+ ? Status::OK
+ : Status(StatusCode::INTERNAL, "Failed to serialize message");
+ }
+}
+
+Status CoreCodegen::DeserializeProto(grpc_byte_buffer* buffer,
+ grpc::protobuf::Message* msg,
+ int max_message_size) {
+ GPR_TIMER_SCOPE("DeserializeProto", 0);
+ if (buffer == nullptr) {
+ return Status(StatusCode::INTERNAL, "No payload");
+ }
+ Status result = Status::OK;
+ {
+ GrpcBufferReader reader(buffer);
+ ::grpc::protobuf::io::CodedInputStream decoder(&reader);
+ if (max_message_size > 0) {
+ decoder.SetTotalBytesLimit(max_message_size, max_message_size);
+ }
+ if (!msg->ParseFromCodedStream(&decoder)) {
+ result = Status(StatusCode::INTERNAL, msg->InitializationErrorString());
+ }
+ if (!decoder.ConsumedEntireMessage()) {
+ result = Status(StatusCode::INTERNAL, "Did not read entire message");
+ }
+ }
+ grpc_byte_buffer_destroy(buffer);
+ return result;
+}
+
+} // namespace grpc
diff --git a/src/cpp/common/core_codegen.h b/src/cpp/common/core_codegen.h
new file mode 100644
index 0000000000..0d8c6b79f7
--- /dev/null
+++ b/src/cpp/common/core_codegen.h
@@ -0,0 +1,71 @@
+/*
+ *
+ * Copyright 2016, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+// This file should be compiled as part of grpc++.
+
+#include <grpc++/impl/codegen/core_codegen_interface.h>
+#include <grpc/impl/codegen/grpc_types.h>
+#include <grpc/byte_buffer.h>
+
+namespace grpc {
+
+/// Implementation of the core codegen interface.
+class CoreCodegen : public CoreCodegenInterface {
+ private:
+ Status SerializeProto(const grpc::protobuf::Message& msg,
+ grpc_byte_buffer** bp) override;
+
+ Status DeserializeProto(grpc_byte_buffer* buffer,
+ grpc::protobuf::Message* msg,
+ int max_message_size) override;
+
+ grpc_completion_queue* grpc_completion_queue_create(void* reserved) override;
+ void grpc_completion_queue_destroy(grpc_completion_queue* cq) override;
+ grpc_event grpc_completion_queue_pluck(grpc_completion_queue* cq, void* tag,
+ gpr_timespec deadline,
+ void* reserved) override;
+
+ void* gpr_malloc(size_t size) override;
+ void gpr_free(void* p) override;
+
+ void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) override;
+
+ void grpc_metadata_array_init(grpc_metadata_array* array) override;
+ void grpc_metadata_array_destroy(grpc_metadata_array* array) override;
+
+ gpr_timespec gpr_inf_future(gpr_clock_type type) override;
+
+ void assert_fail(const char* failed_assertion) override;
+};
+
+} // namespace grpc