aboutsummaryrefslogtreecommitdiffhomepage
path: root/include/grpc++
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2017-04-20 19:24:57 +0000
committerGravatar Craig Tiller <ctiller@google.com>2017-04-20 19:24:57 +0000
commit59db187c853c32cdb031f6cc6af65f9df76c07ef (patch)
treedf82946ada84d286dfa1cbacbeadfc82cf0a68f7 /include/grpc++
parentae1e29eed96798cadfed15b2e1e63e92dcf6d36e (diff)
parentc4c296e2d8ad6b90f5e36e4739ae3d68c66ace1f (diff)
Merge branch 'tsan-c++' into hybrid
Diffstat (limited to 'include/grpc++')
-rw-r--r--include/grpc++/impl/codegen/async_stream.h129
-rw-r--r--include/grpc++/impl/codegen/async_unary_call.h91
-rw-r--r--include/grpc++/impl/codegen/call.h34
-rw-r--r--include/grpc++/impl/codegen/core_codegen.h4
-rw-r--r--include/grpc++/impl/codegen/core_codegen_interface.h4
-rw-r--r--include/grpc++/impl/codegen/status_helper.h47
6 files changed, 158 insertions, 151 deletions
diff --git a/include/grpc++/impl/codegen/async_stream.h b/include/grpc++/impl/codegen/async_stream.h
index 8f529895ca..f97d824baf 100644
--- a/include/grpc++/impl/codegen/async_stream.h
+++ b/include/grpc++/impl/codegen/async_stream.h
@@ -145,17 +145,19 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> {
public:
/// Create a stream and write the first request out.
template <class W>
- ClientAsyncReader(ChannelInterface* channel, CompletionQueue* cq,
- const RpcMethod& method, ClientContext* context,
- const W& request, void* tag)
- : context_(context), call_(channel->CreateCall(method, context, cq)) {
- init_ops_.set_output_tag(tag);
- init_ops_.SendInitialMetadata(context->send_initial_metadata_,
- context->initial_metadata_flags());
- // TODO(ctiller): don't assert
- GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok());
- init_ops_.ClientSendClose();
- call_.PerformOps(&init_ops_);
+ static ClientAsyncReader* Create(ChannelInterface* channel,
+ CompletionQueue* cq, const RpcMethod& method,
+ ClientContext* context, const W& request,
+ void* tag) {
+ Call call = channel->CreateCall(method, context, cq);
+ return new (g_core_codegen_interface->grpc_call_arena_alloc(
+ call.call(), sizeof(ClientAsyncReader)))
+ ClientAsyncReader(call, context, request, tag);
+ }
+
+ // always allocated against a call arena, no memory free required
+ static void operator delete(void* ptr, std::size_t size) {
+ assert(size == sizeof(ClientAsyncReader));
}
void ReadInitialMetadata(void* tag) override {
@@ -185,6 +187,19 @@ class ClientAsyncReader final : public ClientAsyncReaderInterface<R> {
}
private:
+ template <class W>
+ ClientAsyncReader(Call call, ClientContext* context, const W& request,
+ void* tag)
+ : context_(context), call_(call) {
+ init_ops_.set_output_tag(tag);
+ init_ops_.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ // TODO(ctiller): don't assert
+ GPR_CODEGEN_ASSERT(init_ops_.SendMessage(request).ok());
+ init_ops_.ClientSendClose();
+ call_.PerformOps(&init_ops_);
+ }
+
ClientContext* context_;
Call call_;
CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
@@ -210,23 +225,19 @@ template <class W>
class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
public:
template <class R>
- ClientAsyncWriter(ChannelInterface* channel, CompletionQueue* cq,
- const RpcMethod& method, ClientContext* context,
- R* response, void* tag)
- : context_(context), call_(channel->CreateCall(method, context, cq)) {
- finish_ops_.RecvMessage(response);
- finish_ops_.AllowNoMessage();
- // if corked bit is set in context, we buffer up the initial metadata to
- // coalesce with later message to be sent. No op is performed.
- if (context_->initial_metadata_corked_) {
- write_ops_.SendInitialMetadata(context->send_initial_metadata_,
- context->initial_metadata_flags());
- } else {
- write_ops_.set_output_tag(tag);
- write_ops_.SendInitialMetadata(context->send_initial_metadata_,
- context->initial_metadata_flags());
- call_.PerformOps(&write_ops_);
- }
+ static ClientAsyncWriter* Create(ChannelInterface* channel,
+ CompletionQueue* cq, const RpcMethod& method,
+ ClientContext* context, R* response,
+ void* tag) {
+ Call call = channel->CreateCall(method, context, cq);
+ return new (g_core_codegen_interface->grpc_call_arena_alloc(
+ call.call(), sizeof(ClientAsyncWriter)))
+ ClientAsyncWriter(call, context, response, tag);
+ }
+
+ // always allocated against a call arena, no memory free required
+ static void operator delete(void* ptr, std::size_t size) {
+ assert(size == sizeof(ClientAsyncWriter));
}
void ReadInitialMetadata(void* tag) override {
@@ -271,6 +282,24 @@ class ClientAsyncWriter final : public ClientAsyncWriterInterface<W> {
}
private:
+ template <class R>
+ ClientAsyncWriter(Call call, ClientContext* context, R* response, void* tag)
+ : context_(context), call_(call) {
+ finish_ops_.RecvMessage(response);
+ finish_ops_.AllowNoMessage();
+ // if corked bit is set in context, we buffer up the initial metadata to
+ // coalesce with later message to be sent. No op is performed.
+ if (context_->initial_metadata_corked_) {
+ write_ops_.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ } else {
+ write_ops_.set_output_tag(tag);
+ write_ops_.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ call_.PerformOps(&write_ops_);
+ }
+ }
+
ClientContext* context_;
Call call_;
CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
@@ -298,21 +327,20 @@ template <class W, class R>
class ClientAsyncReaderWriter final
: public ClientAsyncReaderWriterInterface<W, R> {
public:
- ClientAsyncReaderWriter(ChannelInterface* channel, CompletionQueue* cq,
- const RpcMethod& method, ClientContext* context,
- void* tag)
- : context_(context), call_(channel->CreateCall(method, context, cq)) {
- if (context_->initial_metadata_corked_) {
- // if corked bit is set in context, we buffer up the initial metadata to
- // coalesce with later message to be sent. No op is performed.
- write_ops_.SendInitialMetadata(context->send_initial_metadata_,
- context->initial_metadata_flags());
- } else {
- write_ops_.set_output_tag(tag);
- write_ops_.SendInitialMetadata(context->send_initial_metadata_,
- context->initial_metadata_flags());
- call_.PerformOps(&write_ops_);
- }
+ static ClientAsyncReaderWriter* Create(ChannelInterface* channel,
+ CompletionQueue* cq,
+ const RpcMethod& method,
+ ClientContext* context, void* tag) {
+ Call call = channel->CreateCall(method, context, cq);
+
+ return new (g_core_codegen_interface->grpc_call_arena_alloc(
+ call.call(), sizeof(ClientAsyncReaderWriter)))
+ ClientAsyncReaderWriter(call, context, tag);
+ }
+
+ // always allocated against a call arena, no memory free required
+ static void operator delete(void* ptr, std::size_t size) {
+ assert(size == sizeof(ClientAsyncReaderWriter));
}
void ReadInitialMetadata(void* tag) override {
@@ -366,6 +394,21 @@ class ClientAsyncReaderWriter final
}
private:
+ ClientAsyncReaderWriter(Call call, ClientContext* context, void* tag)
+ : context_(context), call_(call) {
+ if (context_->initial_metadata_corked_) {
+ // if corked bit is set in context, we buffer up the initial metadata to
+ // coalesce with later message to be sent. No op is performed.
+ write_ops_.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ } else {
+ write_ops_.set_output_tag(tag);
+ write_ops_.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ call_.PerformOps(&write_ops_);
+ }
+ }
+
ClientContext* context_;
Call call_;
CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
diff --git a/include/grpc++/impl/codegen/async_unary_call.h b/include/grpc++/impl/codegen/async_unary_call.h
index b77a16b699..a147a6acbf 100644
--- a/include/grpc++/impl/codegen/async_unary_call.h
+++ b/include/grpc++/impl/codegen/async_unary_call.h
@@ -34,6 +34,7 @@
#ifndef GRPCXX_IMPL_CODEGEN_ASYNC_UNARY_CALL_H
#define GRPCXX_IMPL_CODEGEN_ASYNC_UNARY_CALL_H
+#include <assert.h>
#include <grpc++/impl/codegen/call.h>
#include <grpc++/impl/codegen/channel_interface.h>
#include <grpc++/impl/codegen/client_context.h>
@@ -59,57 +60,67 @@ class ClientAsyncResponseReader final
: public ClientAsyncResponseReaderInterface<R> {
public:
template <class W>
- ClientAsyncResponseReader(ChannelInterface* channel, CompletionQueue* cq,
- const RpcMethod& method, ClientContext* context,
- const W& request)
- : context_(context),
- call_(channel->CreateCall(method, context, cq)),
- collection_(std::make_shared<CallOpSetCollection>()) {
- collection_->init_buf_.SetCollection(collection_);
- collection_->init_buf_.SendInitialMetadata(
- context->send_initial_metadata_, context->initial_metadata_flags());
- // TODO(ctiller): don't assert
- GPR_CODEGEN_ASSERT(collection_->init_buf_.SendMessage(request).ok());
- collection_->init_buf_.ClientSendClose();
- call_.PerformOps(&collection_->init_buf_);
+ static ClientAsyncResponseReader* Create(ChannelInterface* channel,
+ CompletionQueue* cq,
+ const RpcMethod& method,
+ ClientContext* context,
+ const W& request) {
+ Call call = channel->CreateCall(method, context, cq);
+ return new (g_core_codegen_interface->grpc_call_arena_alloc(
+ call.call(), sizeof(ClientAsyncResponseReader)))
+ ClientAsyncResponseReader(call, context, request);
+ }
+
+ // always allocated against a call arena, no memory free required
+ static void operator delete(void* ptr, std::size_t size) {
+ assert(size == sizeof(ClientAsyncResponseReader));
}
void ReadInitialMetadata(void* tag) {
GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
- collection_->meta_buf_.SetCollection(collection_);
- collection_->meta_buf_.set_output_tag(tag);
- collection_->meta_buf_.RecvInitialMetadata(context_);
- call_.PerformOps(&collection_->meta_buf_);
+ meta_buf_.set_output_tag(tag);
+ meta_buf_.RecvInitialMetadata(context_);
+ call_.PerformOps(&meta_buf_);
}
void Finish(R* msg, Status* status, void* tag) {
- collection_->finish_buf_.SetCollection(collection_);
- collection_->finish_buf_.set_output_tag(tag);
+ finish_buf_.set_output_tag(tag);
if (!context_->initial_metadata_received_) {
- collection_->finish_buf_.RecvInitialMetadata(context_);
+ finish_buf_.RecvInitialMetadata(context_);
}
- collection_->finish_buf_.RecvMessage(msg);
- collection_->finish_buf_.AllowNoMessage();
- collection_->finish_buf_.ClientRecvStatus(context_, status);
- call_.PerformOps(&collection_->finish_buf_);
+ finish_buf_.RecvMessage(msg);
+ finish_buf_.AllowNoMessage();
+ finish_buf_.ClientRecvStatus(context_, status);
+ call_.PerformOps(&finish_buf_);
}
private:
- ClientContext* context_;
+ ClientContext* const context_;
Call call_;
- class CallOpSetCollection : public CallOpSetCollectionInterface {
- public:
- SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
- CallOpClientSendClose>
- init_buf_;
- CallOpSet<CallOpRecvInitialMetadata> meta_buf_;
- CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>,
- CallOpClientRecvStatus>
- finish_buf_;
- };
- std::shared_ptr<CallOpSetCollection> collection_;
+ template <class W>
+ ClientAsyncResponseReader(Call call, ClientContext* context, const W& request)
+ : context_(context), call_(call) {
+ init_buf_.SendInitialMetadata(context->send_initial_metadata_,
+ context->initial_metadata_flags());
+ // TODO(ctiller): don't assert
+ GPR_CODEGEN_ASSERT(init_buf_.SendMessage(request).ok());
+ init_buf_.ClientSendClose();
+ call_.PerformOps(&init_buf_);
+ }
+
+ // disable operator new
+ static void* operator new(std::size_t size);
+ static void* operator new(std::size_t size, void* p) { return p; };
+
+ SneakyCallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
+ CallOpClientSendClose>
+ init_buf_;
+ CallOpSet<CallOpRecvInitialMetadata> meta_buf_;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>,
+ CallOpClientRecvStatus>
+ finish_buf_;
};
template <class W>
@@ -179,4 +190,12 @@ class ServerAsyncResponseWriter final : public ServerAsyncStreamingInterface {
} // namespace grpc
+namespace std {
+template <class R>
+class default_delete<grpc::ClientAsyncResponseReader<R>> {
+ public:
+ void operator()(void* p) {}
+};
+}
+
#endif // GRPCXX_IMPL_CODEGEN_ASYNC_UNARY_CALL_H
diff --git a/include/grpc++/impl/codegen/call.h b/include/grpc++/impl/codegen/call.h
index dd63c21ff1..e477193720 100644
--- a/include/grpc++/impl/codegen/call.h
+++ b/include/grpc++/impl/codegen/call.h
@@ -34,6 +34,7 @@
#ifndef GRPCXX_IMPL_CODEGEN_CALL_H
#define GRPCXX_IMPL_CODEGEN_CALL_H
+#include <assert.h>
#include <cstring>
#include <functional>
#include <map>
@@ -47,9 +48,9 @@
#include <grpc++/impl/codegen/serialization_traits.h>
#include <grpc++/impl/codegen/slice.h>
#include <grpc++/impl/codegen/status.h>
-#include <grpc++/impl/codegen/status_helper.h>
#include <grpc++/impl/codegen/string_ref.h>
+#include <grpc/impl/codegen/atm.h>
#include <grpc/impl/codegen/compression_types.h>
#include <grpc/impl/codegen/grpc_types.h>
@@ -468,7 +469,7 @@ class CallOpServerSendStatus {
trailing_metadata_ = FillMetadataArray(
trailing_metadata, &trailing_metadata_count_, send_error_details_);
send_status_available_ = true;
- send_status_code_ = static_cast<grpc_status_code>(GetCanonicalCode(status));
+ send_status_code_ = static_cast<grpc_status_code>(status.error_code());
send_error_message_ = status.error_message();
}
@@ -579,17 +580,6 @@ class CallOpClientRecvStatus {
grpc_slice error_message_;
};
-/// An abstract collection of CallOpSet's, to be used whenever
-/// CallOpSet objects must be thought of as a group. Each member
-/// of the group should have a shared_ptr back to the collection,
-/// as will the object that instantiates the collection, allowing
-/// for automatic ref-counting. In practice, any actual use should
-/// derive from this base class. This is specifically necessary if
-/// some of the CallOpSet's in the collection are "Sneaky" and don't
-/// report back to the C++ layer CQ operations
-class CallOpSetCollectionInterface
- : public std::enable_shared_from_this<CallOpSetCollectionInterface> {};
-
/// An abstract collection of call ops, used to generate the
/// grpc_call_op structure to pass down to the lower layers,
/// and as it is-a CompletionQueueTag, also massages the final
@@ -597,18 +587,9 @@ class CallOpSetCollectionInterface
/// API.
class CallOpSetInterface : public CompletionQueueTag {
public:
- CallOpSetInterface() {}
/// Fills in grpc_op, starting from ops[*nops] and moving
/// upwards.
- virtual void FillOps(grpc_op* ops, size_t* nops) = 0;
-
- /// Mark this as belonging to a collection if needed
- void SetCollection(std::shared_ptr<CallOpSetCollectionInterface> collection) {
- collection_ = collection;
- }
-
- protected:
- std::shared_ptr<CallOpSetCollectionInterface> collection_;
+ virtual void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) = 0;
};
/// Primary implementaiton of CallOpSetInterface.
@@ -629,13 +610,15 @@ class CallOpSet : public CallOpSetInterface,
public Op6 {
public:
CallOpSet() : return_tag_(this) {}
- void FillOps(grpc_op* ops, size_t* nops) override {
+ void FillOps(grpc_call* call, grpc_op* ops, size_t* nops) override {
this->Op1::AddOp(ops, nops);
this->Op2::AddOp(ops, nops);
this->Op3::AddOp(ops, nops);
this->Op4::AddOp(ops, nops);
this->Op5::AddOp(ops, nops);
this->Op6::AddOp(ops, nops);
+ g_core_codegen_interface->grpc_call_ref(call);
+ call_ = call;
}
bool FinalizeResult(void** tag, bool* status) override {
@@ -646,7 +629,7 @@ class CallOpSet : public CallOpSetInterface,
this->Op5::FinishOp(status);
this->Op6::FinishOp(status);
*tag = return_tag_;
- collection_.reset(); // drop the ref at this point
+ g_core_codegen_interface->grpc_call_unref(call_);
return true;
}
@@ -654,6 +637,7 @@ class CallOpSet : public CallOpSetInterface,
private:
void* return_tag_;
+ grpc_call* call_;
};
/// A CallOpSet that does not post completions to the completion queue.
diff --git a/include/grpc++/impl/codegen/core_codegen.h b/include/grpc++/impl/codegen/core_codegen.h
index 3cb7da8ef6..a1593729eb 100644
--- a/include/grpc++/impl/codegen/core_codegen.h
+++ b/include/grpc++/impl/codegen/core_codegen.h
@@ -75,6 +75,10 @@ class CoreCodegen final : public CoreCodegenInterface {
void gpr_cv_signal(gpr_cv* cv) override;
void gpr_cv_broadcast(gpr_cv* cv) override;
+ void grpc_call_ref(grpc_call* call) override;
+ void grpc_call_unref(grpc_call* call) override;
+ virtual void* grpc_call_arena_alloc(grpc_call* call, size_t length) override;
+
void grpc_byte_buffer_destroy(grpc_byte_buffer* bb) override;
int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader* reader,
diff --git a/include/grpc++/impl/codegen/core_codegen_interface.h b/include/grpc++/impl/codegen/core_codegen_interface.h
index a1a0aaf3ca..7cc3a82476 100644
--- a/include/grpc++/impl/codegen/core_codegen_interface.h
+++ b/include/grpc++/impl/codegen/core_codegen_interface.h
@@ -102,6 +102,10 @@ class CoreCodegenInterface {
virtual grpc_byte_buffer* grpc_raw_byte_buffer_create(grpc_slice* slice,
size_t nslices) = 0;
+ virtual void grpc_call_ref(grpc_call* call) = 0;
+ virtual void grpc_call_unref(grpc_call* call) = 0;
+ virtual void* grpc_call_arena_alloc(grpc_call* call, size_t length) = 0;
+
virtual grpc_slice grpc_empty_slice() = 0;
virtual grpc_slice grpc_slice_malloc(size_t length) = 0;
virtual void grpc_slice_unref(grpc_slice slice) = 0;
diff --git a/include/grpc++/impl/codegen/status_helper.h b/include/grpc++/impl/codegen/status_helper.h
deleted file mode 100644
index bfe45d9e5b..0000000000
--- a/include/grpc++/impl/codegen/status_helper.h
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *
- * Copyright 2016, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef GRPCXX_IMPL_CODEGEN_STATUS_HELPER_H
-#define GRPCXX_IMPL_CODEGEN_STATUS_HELPER_H
-
-#include <grpc++/impl/codegen/status.h>
-
-namespace grpc {
-
-inline StatusCode GetCanonicalCode(const Status& status) {
- return status.error_code();
-}
-
-} // namespace grpc
-
-#endif // GRPCXX_IMPL_CODEGEN_STATUS_HELPER_H