diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/core/transport/chttp2/internal.h | 9 | ||||
-rw-r--r-- | src/core/transport/chttp2/stream_lists.c | 22 | ||||
-rw-r--r-- | src/core/transport/chttp2/writing.c | 10 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 7 | ||||
-rw-r--r-- | src/cpp/common/alarm.cc | 44 | ||||
-rw-r--r-- | src/cpp/server/server.cc | 14 | ||||
-rw-r--r-- | src/cpp/server/server_builder.cc | 2 |
7 files changed, 43 insertions, 65 deletions
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h index 0e1e2c4265..d76d31be23 100644 --- a/src/core/transport/chttp2/internal.h +++ b/src/core/transport/chttp2/internal.h @@ -485,7 +485,8 @@ struct grpc_chttp2_stream { /** Someone is unlocking the transport mutex: check to see if writes are required, and schedule them if so */ -int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global, +int grpc_chttp2_unlocking_check_writes(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *global, grpc_chttp2_transport_writing *writing, int is_parsing); void grpc_chttp2_perform_writes( @@ -568,8 +569,12 @@ void grpc_chttp2_list_add_writing_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing); void grpc_chttp2_list_flush_writing_stalled_by_transport( - grpc_chttp2_transport_writing *transport_writing, bool is_window_available); + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing, + bool is_window_available); +void grpc_chttp2_list_add_stalled_by_transport( + grpc_chttp2_transport_writing *transport_writing, + grpc_chttp2_stream_writing *stream_writing); int grpc_chttp2_list_pop_stalled_by_transport( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global); diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c index 2f31a47cb3..b284c78818 100644 --- a/src/core/transport/chttp2/stream_lists.c +++ b/src/core/transport/chttp2/stream_lists.c @@ -316,13 +316,16 @@ int grpc_chttp2_list_pop_check_read_ops( void grpc_chttp2_list_add_writing_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing) { - stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), - STREAM_FROM_WRITING(stream_writing), + grpc_chttp2_stream *stream = STREAM_FROM_WRITING(stream_writing); + if (!stream->included[GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT]) { + GRPC_CHTTP2_STREAM_REF(&stream->global, "chttp2_writing_stalled"); + } + stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), stream, GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT); } void grpc_chttp2_list_flush_writing_stalled_by_transport( - grpc_chttp2_transport_writing *transport_writing, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing, bool is_window_available) { grpc_chttp2_stream *stream; grpc_chttp2_transport *transport = TRANSPORT_FROM_WRITING(transport_writing); @@ -331,11 +334,22 @@ void grpc_chttp2_list_flush_writing_stalled_by_transport( if (is_window_available) { grpc_chttp2_list_add_writable_stream(&transport->global, &stream->global); } else { - stream_list_add(transport, stream, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); + grpc_chttp2_list_add_stalled_by_transport(transport_writing, + &stream->writing); } + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &stream->global, + "chttp2_writing_stalled"); } } +void grpc_chttp2_list_add_stalled_by_transport( + grpc_chttp2_transport_writing *transport_writing, + grpc_chttp2_stream_writing *stream_writing) { + stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), + STREAM_FROM_WRITING(stream_writing), + GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); +} + int grpc_chttp2_list_pop_stalled_by_transport( grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global **stream_global) { diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c index cafecf1046..356fd8174a 100644 --- a/src/core/transport/chttp2/writing.c +++ b/src/core/transport/chttp2/writing.c @@ -44,7 +44,7 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing); int grpc_chttp2_unlocking_check_writes( - grpc_chttp2_transport_global *transport_global, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_transport_writing *transport_writing, int is_parsing) { grpc_chttp2_stream_global *stream_global; grpc_chttp2_stream_writing *stream_writing; @@ -76,8 +76,8 @@ int grpc_chttp2_unlocking_check_writes( GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window, transport_global, outgoing_window); bool is_window_available = transport_writing->outgoing_window > 0; - grpc_chttp2_list_flush_writing_stalled_by_transport(transport_writing, - is_window_available); + grpc_chttp2_list_flush_writing_stalled_by_transport( + exec_ctx, transport_writing, is_window_available); /* for each grpc_chttp2_stream that's become writable, frame it's data (according to available window sizes) and add to the output buffer */ @@ -133,8 +133,8 @@ int grpc_chttp2_unlocking_check_writes( GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); } } else { - grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing, - stream_writing); + grpc_chttp2_list_add_stalled_by_transport(transport_writing, + stream_writing); } } if (stream_global->send_trailing_metadata) { diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 617d98875c..b9f511e946 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -598,7 +598,7 @@ static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); } static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { GPR_TIMER_BEGIN("unlock", 0); if (!t->writing_active && !t->closed && - grpc_chttp2_unlocking_check_writes(&t->global, &t->writing, + grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing, t->parsing_active)) { t->writing_active = 1; REF_TRANSPORT(t, "writing"); @@ -1019,6 +1019,11 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, stream_global->recv_initial_metadata_ready = NULL; } if (stream_global->recv_message_ready != NULL) { + while (stream_global->seen_error && + (bs = grpc_chttp2_incoming_frame_queue_pop( + &stream_global->incoming_frames)) != NULL) { + grpc_byte_stream_destroy(exec_ctx, bs); + } if (stream_global->incoming_frames.head != NULL) { *stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop( &stream_global->incoming_frames); diff --git a/src/cpp/common/alarm.cc b/src/cpp/common/alarm.cc deleted file mode 100644 index 0c96be20da..0000000000 --- a/src/cpp/common/alarm.cc +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright 2015-2016, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#include <grpc++/alarm.h> -#include <grpc++/impl/grpc_library.h> - -namespace grpc { - -Alarm::~Alarm() { - grpc_alarm_destroy(alarm_); -} - -void Alarm::Cancel() { grpc_alarm_cancel(alarm_); } - -} // namespace grpc diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index 0d31140924..6d31a608c8 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -272,27 +272,25 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag { grpc_completion_queue* cq_; }; -static grpc_server* CreateServer(const ChannelArguments& args) { - grpc_channel_args channel_args; - args.SetChannelArgs(&channel_args); - return grpc_server_create(&channel_args, nullptr); -} - static internal::GrpcLibraryInitializer g_gli_initializer; Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, - int max_message_size, const ChannelArguments& args) + int max_message_size, ChannelArguments* args) : max_message_size_(max_message_size), started_(false), shutdown_(false), num_running_cb_(0), sync_methods_(new std::list<SyncRequest>), has_generic_service_(false), - server_(CreateServer(args)), + server_(nullptr), thread_pool_(thread_pool), thread_pool_owned_(thread_pool_owned) { g_gli_initializer.summon(); gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks); global_callbacks_ = g_callbacks; + global_callbacks_->UpdateArguments(args); + grpc_channel_args channel_args; + args->SetChannelArgs(&channel_args); + server_ = grpc_server_create(&channel_args, nullptr); grpc_server_register_completion_queue(server_, cq_.cq(), nullptr); } diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc index c54cf6474f..134e5f1d5f 100644 --- a/src/cpp/server/server_builder.cc +++ b/src/cpp/server/server_builder.cc @@ -103,7 +103,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() { args.SetInt(GRPC_COMPRESSION_ALGORITHM_STATE_ARG, compression_options_.enabled_algorithms_bitset); std::unique_ptr<Server> server( - new Server(thread_pool.release(), true, max_message_size_, args)); + new Server(thread_pool.release(), true, max_message_size_, &args)); for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) { grpc_server_register_completion_queue(server->server_, (*cq)->cq(), nullptr); |