aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/core/transport/chttp2/internal.h9
-rw-r--r--src/core/transport/chttp2/stream_lists.c22
-rw-r--r--src/core/transport/chttp2/writing.c10
-rw-r--r--src/core/transport/chttp2_transport.c7
-rw-r--r--src/cpp/common/alarm.cc44
-rw-r--r--src/cpp/server/server.cc14
-rw-r--r--src/cpp/server/server_builder.cc2
7 files changed, 43 insertions, 65 deletions
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index 0e1e2c4265..d76d31be23 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -485,7 +485,8 @@ struct grpc_chttp2_stream {
/** Someone is unlocking the transport mutex: check to see if writes
are required, and schedule them if so */
-int grpc_chttp2_unlocking_check_writes(grpc_chttp2_transport_global *global,
+int grpc_chttp2_unlocking_check_writes(grpc_exec_ctx *exec_ctx,
+ grpc_chttp2_transport_global *global,
grpc_chttp2_transport_writing *writing,
int is_parsing);
void grpc_chttp2_perform_writes(
@@ -568,8 +569,12 @@ void grpc_chttp2_list_add_writing_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing);
void grpc_chttp2_list_flush_writing_stalled_by_transport(
- grpc_chttp2_transport_writing *transport_writing, bool is_window_available);
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing,
+ bool is_window_available);
+void grpc_chttp2_list_add_stalled_by_transport(
+ grpc_chttp2_transport_writing *transport_writing,
+ grpc_chttp2_stream_writing *stream_writing);
int grpc_chttp2_list_pop_stalled_by_transport(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global);
diff --git a/src/core/transport/chttp2/stream_lists.c b/src/core/transport/chttp2/stream_lists.c
index 2f31a47cb3..b284c78818 100644
--- a/src/core/transport/chttp2/stream_lists.c
+++ b/src/core/transport/chttp2/stream_lists.c
@@ -316,13 +316,16 @@ int grpc_chttp2_list_pop_check_read_ops(
void grpc_chttp2_list_add_writing_stalled_by_transport(
grpc_chttp2_transport_writing *transport_writing,
grpc_chttp2_stream_writing *stream_writing) {
- stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
- STREAM_FROM_WRITING(stream_writing),
+ grpc_chttp2_stream *stream = STREAM_FROM_WRITING(stream_writing);
+ if (!stream->included[GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT]) {
+ GRPC_CHTTP2_STREAM_REF(&stream->global, "chttp2_writing_stalled");
+ }
+ stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), stream,
GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT);
}
void grpc_chttp2_list_flush_writing_stalled_by_transport(
- grpc_chttp2_transport_writing *transport_writing,
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing,
bool is_window_available) {
grpc_chttp2_stream *stream;
grpc_chttp2_transport *transport = TRANSPORT_FROM_WRITING(transport_writing);
@@ -331,11 +334,22 @@ void grpc_chttp2_list_flush_writing_stalled_by_transport(
if (is_window_available) {
grpc_chttp2_list_add_writable_stream(&transport->global, &stream->global);
} else {
- stream_list_add(transport, stream, GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
+ grpc_chttp2_list_add_stalled_by_transport(transport_writing,
+ &stream->writing);
}
+ GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &stream->global,
+ "chttp2_writing_stalled");
}
}
+void grpc_chttp2_list_add_stalled_by_transport(
+ grpc_chttp2_transport_writing *transport_writing,
+ grpc_chttp2_stream_writing *stream_writing) {
+ stream_list_add(TRANSPORT_FROM_WRITING(transport_writing),
+ STREAM_FROM_WRITING(stream_writing),
+ GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT);
+}
+
int grpc_chttp2_list_pop_stalled_by_transport(
grpc_chttp2_transport_global *transport_global,
grpc_chttp2_stream_global **stream_global) {
diff --git a/src/core/transport/chttp2/writing.c b/src/core/transport/chttp2/writing.c
index cafecf1046..356fd8174a 100644
--- a/src/core/transport/chttp2/writing.c
+++ b/src/core/transport/chttp2/writing.c
@@ -44,7 +44,7 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_writing *transport_writing);
int grpc_chttp2_unlocking_check_writes(
- grpc_chttp2_transport_global *transport_global,
+ grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_writing *transport_writing, int is_parsing) {
grpc_chttp2_stream_global *stream_global;
grpc_chttp2_stream_writing *stream_writing;
@@ -76,8 +76,8 @@ int grpc_chttp2_unlocking_check_writes(
GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window,
transport_global, outgoing_window);
bool is_window_available = transport_writing->outgoing_window > 0;
- grpc_chttp2_list_flush_writing_stalled_by_transport(transport_writing,
- is_window_available);
+ grpc_chttp2_list_flush_writing_stalled_by_transport(
+ exec_ctx, transport_writing, is_window_available);
/* for each grpc_chttp2_stream that's become writable, frame it's data
(according to available window sizes) and add to the output buffer */
@@ -133,8 +133,8 @@ int grpc_chttp2_unlocking_check_writes(
GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
}
} else {
- grpc_chttp2_list_add_writing_stalled_by_transport(transport_writing,
- stream_writing);
+ grpc_chttp2_list_add_stalled_by_transport(transport_writing,
+ stream_writing);
}
}
if (stream_global->send_trailing_metadata) {
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 617d98875c..b9f511e946 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -598,7 +598,7 @@ static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); }
static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
GPR_TIMER_BEGIN("unlock", 0);
if (!t->writing_active && !t->closed &&
- grpc_chttp2_unlocking_check_writes(&t->global, &t->writing,
+ grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing,
t->parsing_active)) {
t->writing_active = 1;
REF_TRANSPORT(t, "writing");
@@ -1019,6 +1019,11 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
stream_global->recv_initial_metadata_ready = NULL;
}
if (stream_global->recv_message_ready != NULL) {
+ while (stream_global->seen_error &&
+ (bs = grpc_chttp2_incoming_frame_queue_pop(
+ &stream_global->incoming_frames)) != NULL) {
+ grpc_byte_stream_destroy(exec_ctx, bs);
+ }
if (stream_global->incoming_frames.head != NULL) {
*stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop(
&stream_global->incoming_frames);
diff --git a/src/cpp/common/alarm.cc b/src/cpp/common/alarm.cc
deleted file mode 100644
index 0c96be20da..0000000000
--- a/src/cpp/common/alarm.cc
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Copyright 2015-2016, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <grpc++/alarm.h>
-#include <grpc++/impl/grpc_library.h>
-
-namespace grpc {
-
-Alarm::~Alarm() {
- grpc_alarm_destroy(alarm_);
-}
-
-void Alarm::Cancel() { grpc_alarm_cancel(alarm_); }
-
-} // namespace grpc
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 0d31140924..6d31a608c8 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -272,27 +272,25 @@ class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
grpc_completion_queue* cq_;
};
-static grpc_server* CreateServer(const ChannelArguments& args) {
- grpc_channel_args channel_args;
- args.SetChannelArgs(&channel_args);
- return grpc_server_create(&channel_args, nullptr);
-}
-
static internal::GrpcLibraryInitializer g_gli_initializer;
Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
- int max_message_size, const ChannelArguments& args)
+ int max_message_size, ChannelArguments* args)
: max_message_size_(max_message_size),
started_(false),
shutdown_(false),
num_running_cb_(0),
sync_methods_(new std::list<SyncRequest>),
has_generic_service_(false),
- server_(CreateServer(args)),
+ server_(nullptr),
thread_pool_(thread_pool),
thread_pool_owned_(thread_pool_owned) {
g_gli_initializer.summon();
gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks);
global_callbacks_ = g_callbacks;
+ global_callbacks_->UpdateArguments(args);
+ grpc_channel_args channel_args;
+ args->SetChannelArgs(&channel_args);
+ server_ = grpc_server_create(&channel_args, nullptr);
grpc_server_register_completion_queue(server_, cq_.cq(), nullptr);
}
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index c54cf6474f..134e5f1d5f 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -103,7 +103,7 @@ std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
args.SetInt(GRPC_COMPRESSION_ALGORITHM_STATE_ARG,
compression_options_.enabled_algorithms_bitset);
std::unique_ptr<Server> server(
- new Server(thread_pool.release(), true, max_message_size_, args));
+ new Server(thread_pool.release(), true, max_message_size_, &args));
for (auto cq = cqs_.begin(); cq != cqs_.end(); ++cq) {
grpc_server_register_completion_queue(server->server_, (*cq)->cq(),
nullptr);