aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-07-07 11:05:10 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-07-07 11:05:10 -0700
commite822963e8b2285daa02becf6fe5a8e5ad2935c91 (patch)
tree48c7910e04ed85326a7dc0eb5501e94c19704bcd /src/core
parent9188d7a2df8217aa9414113a215668118a0dd700 (diff)
parent361d1a9b8a0b4aeb3302edd9d9a5e52fb90d2d0d (diff)
Merge github.com:grpc/grpc into just-say-goodbye-when-we-are-done
Diffstat (limited to 'src/core')
-rw-r--r--src/core/iomgr/alarm.h2
-rw-r--r--src/core/iomgr/fd_posix.c5
-rw-r--r--src/core/iomgr/pollset_multipoller_with_epoll.c26
-rw-r--r--src/core/support/log_linux.c11
-rw-r--r--src/core/surface/byte_buffer.c14
-rw-r--r--src/core/surface/call.c8
-rw-r--r--src/core/surface/call.h2
-rw-r--r--src/core/transport/chttp2/incoming_metadata.c1
-rw-r--r--src/core/transport/chttp2/parsing.c3
-rw-r--r--src/core/transport/chttp2_transport.c1
-rw-r--r--src/core/transport/stream_op.h2
11 files changed, 51 insertions, 24 deletions
diff --git a/src/core/iomgr/alarm.h b/src/core/iomgr/alarm.h
index e5262e2199..c067a0b8a3 100644
--- a/src/core/iomgr/alarm.h
+++ b/src/core/iomgr/alarm.h
@@ -41,9 +41,9 @@
typedef struct grpc_alarm {
gpr_timespec deadline;
gpr_uint32 heap_index; /* INVALID_HEAP_INDEX if not in heap */
+ int triggered;
struct grpc_alarm *next;
struct grpc_alarm *prev;
- int triggered;
grpc_iomgr_cb_func cb;
void *cb_arg;
} grpc_alarm;
diff --git a/src/core/iomgr/fd_posix.c b/src/core/iomgr/fd_posix.c
index e8c24c772a..6ad377ce1c 100644
--- a/src/core/iomgr/fd_posix.c
+++ b/src/core/iomgr/fd_posix.c
@@ -369,16 +369,17 @@ gpr_uint32 grpc_fd_begin_poll(grpc_fd *fd, grpc_pollset *pollset,
watcher->fd = NULL;
watcher->pollset = NULL;
gpr_mu_unlock(&fd->watcher_mu);
+ GRPC_FD_UNREF(fd, "poll");
return 0;
}
/* if there is nobody polling for read, but we need to, then start doing so */
- if (!fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) {
+ if (read_mask && !fd->read_watcher && gpr_atm_acq_load(&fd->readst) > READY) {
fd->read_watcher = watcher;
mask |= read_mask;
}
/* if there is nobody polling for write, but we need to, then start doing so
*/
- if (!fd->write_watcher && gpr_atm_acq_load(&fd->writest) > READY) {
+ if (write_mask && !fd->write_watcher && gpr_atm_acq_load(&fd->writest) > READY) {
fd->write_watcher = watcher;
mask |= write_mask;
}
diff --git a/src/core/iomgr/pollset_multipoller_with_epoll.c b/src/core/iomgr/pollset_multipoller_with_epoll.c
index dcf08d379c..1900bbf9e1 100644
--- a/src/core/iomgr/pollset_multipoller_with_epoll.c
+++ b/src/core/iomgr/pollset_multipoller_with_epoll.c
@@ -54,17 +54,25 @@ static void multipoll_with_epoll_pollset_add_fd(grpc_pollset *pollset,
pollset_hdr *h = pollset->data.ptr;
struct epoll_event ev;
int err;
-
- ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
- ev.data.ptr = fd;
- err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
- if (err < 0) {
- /* FDs may be added to a pollset multiple times, so EEXIST is normal. */
- if (errno != EEXIST) {
- gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd,
- strerror(errno));
+ grpc_fd_watcher watcher;
+
+ /* We pretend to be polling whilst adding an fd to keep the fd from being
+ closed during the add. This may result in a spurious wakeup being assigned
+ to this pollset whilst adding, but that should be benign. */
+ GPR_ASSERT(grpc_fd_begin_poll(fd, pollset, 0, 0, &watcher) == 0);
+ if (watcher.fd != NULL) {
+ ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
+ ev.data.ptr = fd;
+ err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
+ if (err < 0) {
+ /* FDs may be added to a pollset multiple times, so EEXIST is normal. */
+ if (errno != EEXIST) {
+ gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd,
+ strerror(errno));
+ }
}
}
+ grpc_fd_end_poll(&watcher, 0, 0);
}
static void multipoll_with_epoll_pollset_del_fd(grpc_pollset *pollset,
diff --git a/src/core/support/log_linux.c b/src/core/support/log_linux.c
index 48349d2c83..7937466b79 100644
--- a/src/core/support/log_linux.c
+++ b/src/core/support/log_linux.c
@@ -43,7 +43,9 @@
#ifdef GPR_LINUX
+#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
+#include <grpc/support/string_util.h>
#include <grpc/support/time.h>
#include <stdio.h>
#include <stdarg.h>
@@ -71,6 +73,7 @@ void gpr_log(const char *file, int line, gpr_log_severity severity,
void gpr_default_log(gpr_log_func_args *args) {
char *final_slash;
+ char *prefix;
const char *display_file;
char time_buffer[64];
gpr_timespec now = gpr_now();
@@ -89,10 +92,12 @@ void gpr_default_log(gpr_log_func_args *args) {
strcpy(time_buffer, "error:strftime");
}
- fprintf(stderr, "%s%s.%09d %7ld %s:%d] %s\n",
+ gpr_asprintf(&prefix, "%s%s.%09d %7tu %s:%d]",
gpr_log_severity_string(args->severity), time_buffer,
- (int)(now.tv_nsec), gettid(), display_file, args->line,
- args->message);
+ (int)(now.tv_nsec), gettid(), display_file, args->line);
+
+ fprintf(stderr, "%-60s %s\n", prefix, args->message);
+ gpr_free(prefix);
}
#endif
diff --git a/src/core/surface/byte_buffer.c b/src/core/surface/byte_buffer.c
index 4817e00454..a930949f2d 100644
--- a/src/core/surface/byte_buffer.c
+++ b/src/core/surface/byte_buffer.c
@@ -55,6 +55,20 @@ grpc_byte_buffer *grpc_raw_compressed_byte_buffer_create(
return bb;
}
+grpc_byte_buffer *grpc_raw_byte_buffer_from_reader(
+ grpc_byte_buffer_reader *reader) {
+ grpc_byte_buffer *bb = malloc(sizeof(grpc_byte_buffer));
+ gpr_slice slice;
+ bb->type = GRPC_BB_RAW;
+ bb->data.raw.compression = GRPC_COMPRESS_NONE;
+ gpr_slice_buffer_init(&bb->data.raw.slice_buffer);
+
+ while (grpc_byte_buffer_reader_next(reader, &slice)) {
+ gpr_slice_buffer_add(&bb->data.raw.slice_buffer, slice);
+ }
+ return bb;
+}
+
grpc_byte_buffer *grpc_byte_buffer_copy(grpc_byte_buffer *bb) {
switch (bb->type) {
case GRPC_BB_RAW:
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index ae1b215767..fc09137b67 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -76,14 +76,14 @@ typedef struct {
typedef struct {
/* Overall status of the operation: starts OK, may degrade to
non-OK */
- int success;
- /* Completion function to call at the end of the operation */
- grpc_ioreq_completion_func on_complete;
- void *user_data;
+ gpr_uint8 success;
/* a bit mask of which request ops are needed (1u << opid) */
gpr_uint16 need_mask;
/* a bit mask of which request ops are now completed */
gpr_uint16 complete_mask;
+ /* Completion function to call at the end of the operation */
+ grpc_ioreq_completion_func on_complete;
+ void *user_data;
} reqinfo_master;
/* Status data for a request can come from several sources; this
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index fb3662b50d..3b6f9c942e 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -78,8 +78,8 @@ typedef union {
typedef struct {
grpc_ioreq_op op;
- grpc_ioreq_data data;
gpr_uint32 flags; /**< A copy of the write flags from grpc_op */
+ grpc_ioreq_data data;
} grpc_ioreq;
typedef void (*grpc_ioreq_completion_func)(grpc_call *call, int success,
diff --git a/src/core/transport/chttp2/incoming_metadata.c b/src/core/transport/chttp2/incoming_metadata.c
index a4b7174329..68e0912b9c 100644
--- a/src/core/transport/chttp2/incoming_metadata.c
+++ b/src/core/transport/chttp2/incoming_metadata.c
@@ -124,6 +124,7 @@ void grpc_incoming_metadata_buffer_move_to_referencing_sopb(
sopb->ops[i].data.metadata.list.tail =
(void *)(delta + (gpr_intptr)sopb->ops[i].data.metadata.list.tail);
}
+ src->count = 0;
}
void grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op(
diff --git a/src/core/transport/chttp2/parsing.c b/src/core/transport/chttp2/parsing.c
index 8f682e9017..4664a0895c 100644
--- a/src/core/transport/chttp2/parsing.c
+++ b/src/core/transport/chttp2/parsing.c
@@ -109,9 +109,6 @@ void grpc_chttp2_publish_reads(
transport_parsing->incoming_stream_id;
}
- /* TODO(ctiller): re-implement */
- GPR_ASSERT(transport_parsing->initial_window_update == 0);
-
/* copy parsing qbuf to global qbuf */
gpr_slice_buffer_move_into(&transport_parsing->qbuf, &transport_global->qbuf);
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index d955de8865..391577c948 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -942,6 +942,7 @@ static void recv_data(void *tp, gpr_slice *slices, size_t nslices,
if (t->parsing.initial_window_update != 0) {
grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
update_global_window, t);
+ t->parsing.initial_window_update = 0;
}
/* handle higher level things */
grpc_chttp2_publish_reads(&t->global, &t->parsing);
diff --git a/src/core/transport/stream_op.h b/src/core/transport/stream_op.h
index 842fc932b9..964d39d14f 100644
--- a/src/core/transport/stream_op.h
+++ b/src/core/transport/stream_op.h
@@ -41,7 +41,7 @@
#include "src/core/transport/metadata.h"
/* this many stream ops are inlined into a sopb before allocating */
-#define GRPC_SOPB_INLINE_ELEMENTS 16
+#define GRPC_SOPB_INLINE_ELEMENTS 4
/* Operations that can be performed on a stream.
Used by grpc_stream_op. */