diff options
-rw-r--r-- | include/grpc/byte_buffer.h | 4 | ||||
-rw-r--r-- | src/core/iomgr/fd_posix.c | 5 | ||||
-rw-r--r-- | src/core/iomgr/pollset_multipoller_with_epoll.c | 26 | ||||
-rw-r--r-- | src/core/surface/byte_buffer.c | 14 | ||||
-rw-r--r-- | src/core/transport/chttp2/parsing.c | 3 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 1 | ||||
-rw-r--r-- | src/ruby/ext/grpc/extconf.rb | 2 | ||||
-rw-r--r-- | test/core/surface/byte_buffer_reader_test.c | 25 | ||||
-rw-r--r-- | test/cpp/qps/server_async.cc | 35 |
9 files changed, 91 insertions, 24 deletions
diff --git a/include/grpc/byte_buffer.h b/include/grpc/byte_buffer.h index a62054ac19..913e2a7697 100644 --- a/include/grpc/byte_buffer.h +++ b/include/grpc/byte_buffer.h @@ -102,6 +102,10 @@ void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader); int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader, gpr_slice *slice); +/** Returns a RAW byte buffer instance from the output of \a reader. */ +grpc_byte_buffer *grpc_raw_byte_buffer_from_reader( + grpc_byte_buffer_reader *reader); + #ifdef __cplusplus } #endif diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c index e8c24c772a..6ad377ce1c 100644 --- a/src/core/iomgr/fd_posix.c +++ b/src/core/iomgr/fd_posix.c @@ -369,16 +369,17 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset, watcher->fd = NULL; watcher->pollset = NULL; gpr_mu_unlock(&fd->watcher_mu); + GRPC_FD_UNREF(fd, "poll"); return 0; } /* if there is nobody polling for read, but we need to, then start doing so */ - if (!fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) { + if (read_mask && !fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) { fd->read_watcher = watcher; mask |= read_mask; } /* if there is nobody polling for write, but we need to, then start doing so */ - if (!fd->write_watcher && gpr_atm_acq_load(&fd->writest) > READY) { + if (write_mask && !fd->write_watcher && gpr_atm_acq_load(&fd->writest) > READY) { fd->write_watcher = watcher; mask |= write_mask; } diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c index dcf08d379c..1900bbf9e1 100644 --- a/src/core/iomgr/pollset_multipoller_with_epoll.c +++ b/src/core/iomgr/pollset_multipoller_with_epoll.c @@ -54,17 +54,25 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset, pollset_hdr *h = pollset->data.ptr; struct epoll_event ev; int err; - - ev.events = EPOLLIN | EPOLLOUT | EPOLLET; - ev.data.ptr = fd; - err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev); - if (err < 0) { - /* FDs may be added to a pollset multiple times, so EEXIST is normal. */ - if (errno != EEXIST) { - gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd, - strerror(errno)); + grpc_fd_watcher watcher; + + /* We pretend to be polling whilst adding an fd to keep the fd from being + closed during the add. This may result in a spurious wakeup being assigned + to this pollset whilst adding, but that should be benign. */ + GPR_ASSERT(grpc_fd_begin_poll(fd, pollset, 0, 0, &watcher) == 0); + if (watcher.fd != NULL) { + ev.events = EPOLLIN | EPOLLOUT | EPOLLET; + ev.data.ptr = fd; + err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev); + if (err < 0) { + /* FDs may be added to a pollset multiple times, so EEXIST is normal. */ + if (errno != EEXIST) { + gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd, + strerror(errno)); + } } } + grpc_fd_end_poll(&watcher, 0, 0); } static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset, diff --git a/src/core/surface/byte_buffer.c b/src/core/surface/byte_buffer.c index 4817e00454..a930949f2d 100644 --- a/src/core/surface/byte_buffer.c +++ b/src/core/surface/byte_buffer.c @@ -55,6 +55,20 @@ grpc_byte_buffer *grpc_raw_compressed_byte_buffer_create( return bb; } +grpc_byte_buffer *grpc_raw_byte_buffer_from_reader( + grpc_byte_buffer_reader *reader) { + grpc_byte_buffer *bb = malloc(sizeof(grpc_byte_buffer)); + gpr_slice slice; + bb->type = GRPC_BB_RAW; + bb->data.raw.compression = GRPC_COMPRESS_NONE; + gpr_slice_buffer_init(&bb->data.raw.slice_buffer); + + while (grpc_byte_buffer_reader_next(reader, &slice)) { + gpr_slice_buffer_add(&bb->data.raw.slice_buffer, slice); + } + return bb; +} + grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) { switch (bb->type) { case GRPC_BB_RAW: diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c index 8f682e9017..4664a0895c 100644 --- a/src/core/transport/chttp2/parsing.c +++ b/src/core/transport/chttp2/parsing.c @@ -109,9 +109,6 @@ void grpc_chttp2_publish_reads( transport_parsing->incoming_stream_id; } - /* TODO(ctiller): re-implement */ - GPR_ASSERT(transport_parsing->initial_window_update == 0); - /* copy parsing qbuf to global qbuf */ gpr_slice_buffer_move_into(&transport_parsing->qbuf, &transport_global->qbuf); diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index 0a7b8f5bf9..0aa28da8f7 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -933,6 +933,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices, if (t->parsing.initial_window_update != 0) { grpc_chttp2_stream_map_for_each(&t->parsing_stream_map, update_global_window, t); + t->parsing.initial_window_update = 0; } /* handle higher level things */ grpc_chttp2_publish_reads(&t->global, &t->parsing); diff --git a/src/ruby/ext/grpc/extconf.rb b/src/ruby/ext/grpc/extconf.rb index 0ff8bb9aa7..6dd0234489 100644 --- a/src/ruby/ext/grpc/extconf.rb +++ b/src/ruby/ext/grpc/extconf.rb @@ -89,7 +89,7 @@ $CFLAGS << ' -Wno-return-type ' $CFLAGS << ' -Wall ' $CFLAGS << ' -pedantic ' -$LDFLAGS << ' -lgrpc -lgpr -ldl' +$LDFLAGS << ' -lgrpc -lgpr -lz -ldl' crash('need grpc lib') unless have_library('grpc', 'grpc_channel_destroy') have_library('grpc', 'grpc_channel_destroy') diff --git a/test/core/surface/byte_buffer_reader_test.c b/test/core/surface/byte_buffer_reader_test.c index 7c2cb9484a..d9c60e4212 100644 --- a/test/core/surface/byte_buffer_reader_test.c +++ b/test/core/surface/byte_buffer_reader_test.c @@ -160,6 +160,30 @@ static void test_read_deflate_compressed_slice(void) { read_compressed_slice(GRPC_COMPRESS_DEFLATE, INPUT_SIZE); } +static void test_byte_buffer_from_reader(void) { + gpr_slice slice; + grpc_byte_buffer *buffer, *buffer_from_reader; + grpc_byte_buffer_reader reader; + + LOG_TEST("test_byte_buffer_from_reader"); + slice = gpr_slice_malloc(4); + memcpy(GPR_SLICE_START_PTR(slice), "test", 4); + buffer = grpc_raw_byte_buffer_create(&slice, 1); + gpr_slice_unref(slice); + grpc_byte_buffer_reader_init(&reader, buffer); + + buffer_from_reader = grpc_raw_byte_buffer_from_reader(&reader); + GPR_ASSERT(buffer->type == buffer_from_reader->type); + GPR_ASSERT(buffer_from_reader->data.raw.compression == GRPC_COMPRESS_NONE); + GPR_ASSERT(buffer_from_reader->data.raw.slice_buffer.count == 1); + GPR_ASSERT(memcmp(GPR_SLICE_START_PTR( + buffer_from_reader->data.raw.slice_buffer.slices[0]), + "test", 4) == 0); + + grpc_byte_buffer_destroy(buffer); + grpc_byte_buffer_destroy(buffer_from_reader); +} + int main(int argc, char **argv) { grpc_test_init(argc, argv); test_read_one_slice(); @@ -167,6 +191,7 @@ int main(int argc, char **argv) { test_read_none_compressed_slice(); test_read_gzip_compressed_slice(); test_read_deflate_compressed_slice(); + test_byte_buffer_from_reader(); return 0; } diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc index 210aef4fd6..f5251e961b 100644 --- a/test/cpp/qps/server_async.cc +++ b/test/cpp/qps/server_async.cc @@ -64,7 +64,7 @@ namespace testing { class AsyncQpsServerTest : public Server { public: - AsyncQpsServerTest(const ServerConfig &config, int port) : shutdown_(false) { + AsyncQpsServerTest(const ServerConfig &config, int port) { char *server_address = NULL; gpr_join_host_port(&server_address, "::", port); @@ -97,6 +97,9 @@ class AsyncQpsServerTest : public Server { } } for (int i = 0; i < config.threads(); i++) { + shutdown_state_.emplace_back(new PerThreadShutdownState()); + } + for (int i = 0; i < config.threads(); i++) { threads_.push_back(std::thread([=]() { // Wait until work is available or we are shutting down bool ok; @@ -105,11 +108,9 @@ class AsyncQpsServerTest : public Server { ServerRpcContext *ctx = detag(got_tag); // The tag is a pointer to an RPC context to invoke bool still_going = ctx->RunNextState(ok); - std::unique_lock<std::mutex> g(shutdown_mutex_); - if (!shutdown_) { + if (!shutdown_state_[i]->shutdown()) { // this RPC context is done, so refresh it if (!still_going) { - g.unlock(); ctx->Reset(); } } else { @@ -122,9 +123,8 @@ class AsyncQpsServerTest : public Server { } ~AsyncQpsServerTest() { server_->Shutdown(); - { - std::lock_guard<std::mutex> g(shutdown_mutex_); - shutdown_ = true; + for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) { + (*ss)->set_shutdown(); } for (auto thr = threads_.begin(); thr != threads_.end(); thr++) { thr->join(); @@ -316,8 +316,25 @@ class AsyncQpsServerTest : public Server { TestService::AsyncService async_service_; std::forward_list<ServerRpcContext *> contexts_; - std::mutex shutdown_mutex_; - bool shutdown_; + class PerThreadShutdownState { + public: + PerThreadShutdownState() : shutdown_(false) {} + + bool shutdown() const { + std::lock_guard<std::mutex> lock(mutex_); + return shutdown_; + } + + void set_shutdown() { + std::lock_guard<std::mutex> lock(mutex_); + shutdown_ = true; + } + + private: + mutable std::mutex mutex_; + bool shutdown_; + }; + std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_; }; std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config, |