aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--include/grpc/byte_buffer.h4
-rw-r--r--src/core/iomgr/fd_posix.c5
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c26
-rw-r--r--src/core/surface/byte_buffer.c14
-rw-r--r--src/core/transport/chttp2/parsing.c3
-rw-r--r--src/core/transport/chttp2_transport.c1
-rw-r--r--src/ruby/ext/grpc/extconf.rb2
-rw-r--r--test/core/surface/byte_buffer_reader_test.c25
-rw-r--r--test/cpp/qps/server_async.cc35
9 files changed, 91 insertions, 24 deletions
diff --git a/include/grpc/byte_buffer.h b/include/grpc/byte_buffer.h
index a62054ac19..913e2a7697 100644
--- a/include/grpc/byte_buffer.h
+++ b/include/grpc/byte_buffer.h
@@ -102,6 +102,10 @@ void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader);
int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader,
gpr_slice *slice);
+/** Returns a RAW byte buffer instance from the output of \a reader. */
+grpc_byte_buffer *grpc_raw_byte_buffer_from_reader(
+ grpc_byte_buffer_reader *reader);
+
#ifdef __cplusplus
}
#endif
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index e8c24c772a..6ad377ce1c 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -369,16 +369,17 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
watcher->fd = NULL;
watcher->pollset = NULL;
gpr_mu_unlock(&fd->watcher_mu);
+ GRPC_FD_UNREF(fd, "poll");
return 0;
}
/* if there is nobody polling for read, but we need to, then start doing so */
- if (!fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) {
+ if (read_mask && !fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) {
fd->read_watcher = watcher;
mask |= read_mask;
}
/* if there is nobody polling for write, but we need to, then start doing so
*/
- if (!fd->write_watcher && gpr_atm_acq_load(&fd->writest) > READY) {
+ if (write_mask && !fd->write_watcher && gpr_atm_acq_load(&fd->writest) > READY) {
fd->write_watcher = watcher;
mask |= write_mask;
}
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index dcf08d379c..1900bbf9e1 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -54,17 +54,25 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
pollset_hdr *h = pollset->data.ptr;
struct epoll_event ev;
int err;
-
- ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
- ev.data.ptr = fd;
- err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
- if (err < 0) {
- /* FDs may be added to a pollset multiple times, so EEXIST is normal. */
- if (errno != EEXIST) {
- gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd,
- strerror(errno));
+ grpc_fd_watcher watcher;
+
+ /* We pretend to be polling whilst adding an fd to keep the fd from being
+ closed during the add. This may result in a spurious wakeup being assigned
+ to this pollset whilst adding, but that should be benign. */
+ GPR_ASSERT(grpc_fd_begin_poll(fd, pollset, 0, 0, &watcher) == 0);
+ if (watcher.fd != NULL) {
+ ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+ ev.data.ptr = fd;
+ err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
+ if (err < 0) {
+ /* FDs may be added to a pollset multiple times, so EEXIST is normal. */
+ if (errno != EEXIST) {
+ gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd,
+ strerror(errno));
+ }
}
}
+ grpc_fd_end_poll(&watcher, 0, 0);
}
static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset,
diff --git a/src/core/surface/byte_buffer.c b/src/core/surface/byte_buffer.c
index 4817e00454..a930949f2d 100644
--- a/src/core/surface/byte_buffer.c
+++ b/src/core/surface/byte_buffer.c
@@ -55,6 +55,20 @@ grpc_byte_buffer *grpc_raw_compressed_byte_buffer_create(
return bb;
}
+grpc_byte_buffer *grpc_raw_byte_buffer_from_reader(
+ grpc_byte_buffer_reader *reader) {
+ grpc_byte_buffer *bb = malloc(sizeof(grpc_byte_buffer));
+ gpr_slice slice;
+ bb->type = GRPC_BB_RAW;
+ bb->data.raw.compression = GRPC_COMPRESS_NONE;
+ gpr_slice_buffer_init(&bb->data.raw.slice_buffer);
+
+ while (grpc_byte_buffer_reader_next(reader, &slice)) {
+ gpr_slice_buffer_add(&bb->data.raw.slice_buffer, slice);
+ }
+ return bb;
+}
+
grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) {
switch (bb->type) {
case GRPC_BB_RAW:
diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c
index 8f682e9017..4664a0895c 100644
--- a/src/core/transport/chttp2/parsing.c
+++ b/src/core/transport/chttp2/parsing.c
@@ -109,9 +109,6 @@ void grpc_chttp2_publish_reads(
transport_parsing->incoming_stream_id;
}
- /* TODO(ctiller): re-implement */
- GPR_ASSERT(transport_parsing->initial_window_update == 0);
-
/* copy parsing qbuf to global qbuf */
gpr_slice_buffer_move_into(&transport_parsing->qbuf, &transport_global->qbuf);
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 0a7b8f5bf9..0aa28da8f7 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -933,6 +933,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
if (t->parsing.initial_window_update != 0) {
grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
update_global_window, t);
+ t->parsing.initial_window_update = 0;
}
/* handle higher level things */
grpc_chttp2_publish_reads(&t->global, &t->parsing);
diff --git a/src/ruby/ext/grpc/extconf.rb b/src/ruby/ext/grpc/extconf.rb
index 0ff8bb9aa7..6dd0234489 100644
--- a/src/ruby/ext/grpc/extconf.rb
+++ b/src/ruby/ext/grpc/extconf.rb
@@ -89,7 +89,7 @@ $CFLAGS << ' -Wno-return-type '
$CFLAGS << ' -Wall '
$CFLAGS << ' -pedantic '
-$LDFLAGS << ' -lgrpc -lgpr -ldl'
+$LDFLAGS << ' -lgrpc -lgpr -lz -ldl'
crash('need grpc lib') unless have_library('grpc', 'grpc_channel_destroy')
have_library('grpc', 'grpc_channel_destroy')
diff --git a/test/core/surface/byte_buffer_reader_test.c b/test/core/surface/byte_buffer_reader_test.c
index 7c2cb9484a..d9c60e4212 100644
--- a/test/core/surface/byte_buffer_reader_test.c
+++ b/test/core/surface/byte_buffer_reader_test.c
@@ -160,6 +160,30 @@ static void test_read_deflate_compressed_slice(void) {
read_compressed_slice(GRPC_COMPRESS_DEFLATE, INPUT_SIZE);
}
+static void test_byte_buffer_from_reader(void) {
+ gpr_slice slice;
+ grpc_byte_buffer *buffer, *buffer_from_reader;
+ grpc_byte_buffer_reader reader;
+
+ LOG_TEST("test_byte_buffer_from_reader");
+ slice = gpr_slice_malloc(4);
+ memcpy(GPR_SLICE_START_PTR(slice), "test", 4);
+ buffer = grpc_raw_byte_buffer_create(&slice, 1);
+ gpr_slice_unref(slice);
+ grpc_byte_buffer_reader_init(&reader, buffer);
+
+ buffer_from_reader = grpc_raw_byte_buffer_from_reader(&reader);
+ GPR_ASSERT(buffer->type == buffer_from_reader->type);
+ GPR_ASSERT(buffer_from_reader->data.raw.compression == GRPC_COMPRESS_NONE);
+ GPR_ASSERT(buffer_from_reader->data.raw.slice_buffer.count == 1);
+ GPR_ASSERT(memcmp(GPR_SLICE_START_PTR(
+ buffer_from_reader->data.raw.slice_buffer.slices[0]),
+ "test", 4) == 0);
+
+ grpc_byte_buffer_destroy(buffer);
+ grpc_byte_buffer_destroy(buffer_from_reader);
+}
+
int main(int argc, char **argv) {
grpc_test_init(argc, argv);
test_read_one_slice();
@@ -167,6 +191,7 @@ int main(int argc, char **argv) {
test_read_none_compressed_slice();
test_read_gzip_compressed_slice();
test_read_deflate_compressed_slice();
+ test_byte_buffer_from_reader();
return 0;
}
diff --git a/test/cpp/qps/server_async.cc b/test/cpp/qps/server_async.cc
index 210aef4fd6..f5251e961b 100644
--- a/test/cpp/qps/server_async.cc
+++ b/test/cpp/qps/server_async.cc
@@ -64,7 +64,7 @@ namespace testing {
class AsyncQpsServerTest : public Server {
public:
- AsyncQpsServerTest(const ServerConfig &config, int port) : shutdown_(false) {
+ AsyncQpsServerTest(const ServerConfig &config, int port) {
char *server_address = NULL;
gpr_join_host_port(&server_address, "::", port);
@@ -97,6 +97,9 @@ class AsyncQpsServerTest : public Server {
}
}
for (int i = 0; i < config.threads(); i++) {
+ shutdown_state_.emplace_back(new PerThreadShutdownState());
+ }
+ for (int i = 0; i < config.threads(); i++) {
threads_.push_back(std::thread([=]() {
// Wait until work is available or we are shutting down
bool ok;
@@ -105,11 +108,9 @@ class AsyncQpsServerTest : public Server {
ServerRpcContext *ctx = detag(got_tag);
// The tag is a pointer to an RPC context to invoke
bool still_going = ctx->RunNextState(ok);
- std::unique_lock<std::mutex> g(shutdown_mutex_);
- if (!shutdown_) {
+ if (!shutdown_state_[i]->shutdown()) {
// this RPC context is done, so refresh it
if (!still_going) {
- g.unlock();
ctx->Reset();
}
} else {
@@ -122,9 +123,8 @@ class AsyncQpsServerTest : public Server {
}
~AsyncQpsServerTest() {
server_->Shutdown();
- {
- std::lock_guard<std::mutex> g(shutdown_mutex_);
- shutdown_ = true;
+ for (auto ss = shutdown_state_.begin(); ss != shutdown_state_.end(); ++ss) {
+ (*ss)->set_shutdown();
}
for (auto thr = threads_.begin(); thr != threads_.end(); thr++) {
thr->join();
@@ -316,8 +316,25 @@ class AsyncQpsServerTest : public Server {
TestService::AsyncService async_service_;
std::forward_list<ServerRpcContext *> contexts_;
- std::mutex shutdown_mutex_;
- bool shutdown_;
+ class PerThreadShutdownState {
+ public:
+ PerThreadShutdownState() : shutdown_(false) {}
+
+ bool shutdown() const {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return shutdown_;
+ }
+
+ void set_shutdown() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ shutdown_ = true;
+ }
+
+ private:
+ mutable std::mutex mutex_;
+ bool shutdown_;
+ };
+ std::vector<std::unique_ptr<PerThreadShutdownState>> shutdown_state_;
};
std::unique_ptr<Server> CreateAsyncServer(const ServerConfig &config,