diff options
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/iomgr/pollset_posix.c | 7 | ||||
-rw-r--r-- | src/core/iomgr/tcp_posix.c | 4 | ||||
-rw-r--r-- | src/core/profiling/basic_timers.c | 32 | ||||
-rw-r--r-- | src/core/profiling/stap_timers.c | 13 | ||||
-rw-r--r-- | src/core/profiling/timers.h | 41 | ||||
-rw-r--r-- | src/core/security/security_connector.c | 2 | ||||
-rw-r--r-- | src/core/security/security_connector.h | 4 | ||||
-rw-r--r-- | src/core/support/subprocess_posix.c | 8 | ||||
-rw-r--r-- | src/core/surface/call.c | 61 | ||||
-rw-r--r-- | src/core/surface/completion_queue.c | 4 | ||||
-rw-r--r-- | src/core/surface/server.c | 2 | ||||
-rw-r--r-- | src/core/transport/chttp2/frame.h | 2 | ||||
-rw-r--r-- | src/core/transport/chttp2/frame_rst_stream.c | 40 | ||||
-rw-r--r-- | src/core/transport/chttp2/frame_rst_stream.h | 11 | ||||
-rw-r--r-- | src/core/transport/chttp2/hpack_parser.c | 4 | ||||
-rw-r--r-- | src/core/transport/chttp2/hpack_parser.h | 2 | ||||
-rw-r--r-- | src/core/transport/chttp2_transport.c | 117 |
17 files changed, 242 insertions, 112 deletions
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c index 4d1bcad9e2..ab1af0d4ee 100644 --- a/src/core/iomgr/pollset_posix.c +++ b/src/core/iomgr/pollset_posix.c @@ -258,7 +258,6 @@ static void unary_poll_do_promote(void *args, int success) { grpc_pollset *pollset = up_args->pollset; grpc_fd *fd = up_args->fd; int do_shutdown_cb = 0; - gpr_free(up_args); /* * This is quite tricky. There are a number of cases to keep in mind here: @@ -273,8 +272,12 @@ static void unary_poll_do_promote(void *args, int success) { /* First we need to ensure that nobody is polling concurrently */ while (pollset->counter != 0) { grpc_pollset_kick(pollset); - gpr_cv_wait(&pollset->cv, &pollset->mu, gpr_inf_future); + grpc_iomgr_add_callback(unary_poll_do_promote, up_args); + gpr_mu_unlock(&pollset->mu); + return; } + + gpr_free(up_args); /* At this point the pollset may no longer be a unary poller. In that case * we should just call the right add function and be done. */ /* TODO(klempner): If we're not careful this could cause infinite recursion. diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c index f7dae5f86c..cd6b2ecae6 100644 --- a/src/core/iomgr/tcp_posix.c +++ b/src/core/iomgr/tcp_posix.c @@ -138,8 +138,10 @@ static void slice_state_remove_prefix(grpc_tcp_slice_state *state, native "trim the first N bytes" operation to splice */ /* TODO(klempner): This really shouldn't be modifying the current slice unless we own the slices array. */ - *current_slice = gpr_slice_split_tail(current_slice, prefix_bytes); + gpr_slice tail; + tail = gpr_slice_split_tail(current_slice, prefix_bytes); gpr_slice_unref(*current_slice); + *current_slice = tail; return; } else { gpr_slice_unref(*current_slice); diff --git a/src/core/profiling/basic_timers.c b/src/core/profiling/basic_timers.c index 124a8d6621..ae37f584eb 100644 --- a/src/core/profiling/basic_timers.c +++ b/src/core/profiling/basic_timers.c @@ -55,6 +55,7 @@ typedef enum { typedef struct grpc_timer_entry { grpc_precise_clock tm; int tag; + const char* tagstr; marker_type type; void* id; const char* file; @@ -70,18 +71,19 @@ static void log_report() { int i; for (i = 0; i < count; i++) { grpc_timer_entry* entry = &(log[i]); - printf("GRPC_LAT_PROF " GRPC_PRECISE_CLOCK_FORMAT " %p %c %d %p %s %d\n", + printf("GRPC_LAT_PROF " GRPC_PRECISE_CLOCK_FORMAT + " %p %c %d(%s) %p %s %d\n", GRPC_PRECISE_CLOCK_PRINTF_ARGS(&entry->tm), (void*)(gpr_intptr)gpr_thd_currentid(), entry->type, entry->tag, - entry->id, entry->file, entry->line); + entry->tagstr, entry->id, entry->file, entry->line); } /* Now clear out the log */ count = 0; } -static void grpc_timers_log_add(int tag, marker_type type, void* id, - const char* file, int line) { +static void grpc_timers_log_add(int tag, const char* tagstr, marker_type type, + void* id, const char* file, int line) { grpc_timer_entry* entry; /* TODO (vpai) : Improve concurrency */ @@ -93,6 +95,7 @@ static void grpc_timers_log_add(int tag, marker_type type, void* id, grpc_precise_clock_now(&entry->tm); entry->tag = tag; + entry->tagstr = tagstr; entry->type = type; entry->id = id; entry->file = file; @@ -100,28 +103,31 @@ static void grpc_timers_log_add(int tag, marker_type type, void* id, } /* Latency profiler API implementation. */ -void grpc_timer_add_mark(int tag, void* id, const char* file, int line) { +void grpc_timer_add_mark(int tag, const char* tagstr, void* id, + const char* file, int line) { if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { - grpc_timers_log_add(tag, MARK, id, file, line); + grpc_timers_log_add(tag, tagstr, MARK, id, file, line); } } -void grpc_timer_add_important_mark(int tag, void* id, const char* file, - int line) { +void grpc_timer_add_important_mark(int tag, const char* tagstr, void* id, + const char* file, int line) { if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { - grpc_timers_log_add(tag, IMPORTANT, id, file, line); + grpc_timers_log_add(tag, tagstr, IMPORTANT, id, file, line); } } -void grpc_timer_begin(int tag, void* id, const char* file, int line) { +void grpc_timer_begin(int tag, const char* tagstr, void* id, const char* file, + int line) { if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { - grpc_timers_log_add(tag, BEGIN, id, file, line); + grpc_timers_log_add(tag, tagstr, BEGIN, id, file, line); } } -void grpc_timer_end(int tag, void* id, const char* file, int line) { +void grpc_timer_end(int tag, const char* tagstr, void* id, const char* file, + int line) { if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { - grpc_timers_log_add(tag, END, id, file, line); + grpc_timers_log_add(tag, tagstr, END, id, file, line); } } diff --git a/src/core/profiling/stap_timers.c b/src/core/profiling/stap_timers.c index 064c86e794..99975163f9 100644 --- a/src/core/profiling/stap_timers.c +++ b/src/core/profiling/stap_timers.c @@ -42,20 +42,23 @@ #include "src/core/profiling/stap_probes.h" /* Latency profiler API implementation. */ -void grpc_timer_add_mark(int tag, void* id, const char* file, int line) { +void grpc_timer_add_mark(int tag, const char* tagstr, void* id, + const char* file, int line) { _STAP_ADD_MARK(tag); } -void grpc_timer_add_important_mark(int tag, void* id, const char* file, - int line) { +void grpc_timer_add_important_mark(int tag, const char* tagstr, void* id, + const char* file, int line) { _STAP_ADD_IMPORTANT_MARK(tag); } -void grpc_timer_begin(int tag, void* id, const char* file, int line) { +void grpc_timer_begin(int tag, const char* tagstr, void* id, const char* file, + int line) { _STAP_TIMING_NS_BEGIN(tag); } -void grpc_timer_end(int tag, void* id, const char* file, int line) { +void grpc_timer_end(int tag, const char* tagstr, void* id, const char* file, + int line) { _STAP_TIMING_NS_END(tag); } diff --git a/src/core/profiling/timers.h b/src/core/profiling/timers.h index 4fb465c237..036d02f187 100644 --- a/src/core/profiling/timers.h +++ b/src/core/profiling/timers.h @@ -41,11 +41,14 @@ extern "C" { void grpc_timers_global_init(void); void grpc_timers_global_destroy(void); -void grpc_timer_add_mark(int tag, void *id, const char *file, int line); -void grpc_timer_add_important_mark(int tag, void *id, const char *file, - int line); -void grpc_timer_begin(int tag, void *id, const char *file, int line); -void grpc_timer_end(int tag, void *id, const char *file, int line); +void grpc_timer_add_mark(int tag, const char *tagstr, void *id, + const char *file, int line); +void grpc_timer_add_important_mark(int tag, const char *tagstr, void *id, + const char *file, int line); +void grpc_timer_begin(int tag, const char *tagstr, void *id, const char *file, + int line); +void grpc_timer_end(int tag, const char *tagstr, void *id, const char *file, + int line); enum grpc_profiling_tags { /* Any GRPC_PTAG_* >= than the threshold won't generate any profiling mark. */ @@ -103,25 +106,27 @@ enum grpc_profiling_tags { #endif /* Generic profiling interface. */ -#define GRPC_TIMER_MARK(tag, id) \ - if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \ - grpc_timer_add_mark(tag, ((void *)(gpr_intptr)(id)), __FILE__, __LINE__); \ +#define GRPC_TIMER_MARK(tag, id) \ + if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \ + grpc_timer_add_mark(tag, #tag, ((void *)(gpr_intptr)(id)), __FILE__, \ + __LINE__); \ } -#define GRPC_TIMER_IMPORTANT_MARK(tag, id) \ - if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \ - grpc_timer_add_important_mark(tag, ((void *)(gpr_intptr)(id)), __FILE__, \ - __LINE__); \ +#define GRPC_TIMER_IMPORTANT_MARK(tag, id) \ + if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \ + grpc_timer_add_important_mark(tag, #tag, ((void *)(gpr_intptr)(id)), \ + __FILE__, __LINE__); \ } -#define GRPC_TIMER_BEGIN(tag, id) \ - if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \ - grpc_timer_begin(tag, ((void *)(gpr_intptr)(id)), __FILE__, __LINE__); \ +#define GRPC_TIMER_BEGIN(tag, id) \ + if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \ + grpc_timer_begin(tag, #tag, ((void *)(gpr_intptr)(id)), __FILE__, \ + __LINE__); \ } -#define GRPC_TIMER_END(tag, id) \ - if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \ - grpc_timer_end(tag, ((void *)(gpr_intptr)(id)), __FILE__, __LINE__); \ +#define GRPC_TIMER_END(tag, id) \ + if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \ + grpc_timer_end(tag, #tag, ((void *)(gpr_intptr)(id)), __FILE__, __LINE__); \ } #ifdef GRPC_STAP_PROFILER diff --git a/src/core/security/security_connector.c b/src/core/security/security_connector.c index 61cb20f6b9..b17e0e0dfa 100644 --- a/src/core/security/security_connector.c +++ b/src/core/security/security_connector.c @@ -82,7 +82,7 @@ static const char *ssl_cipher_suites(void) { /* -- Common methods. -- */ /* Returns the first property with that name. */ -static const tsi_peer_property *tsi_peer_get_property_by_name( +const tsi_peer_property *tsi_peer_get_property_by_name( const tsi_peer *peer, const char *name) { size_t i; if (peer == NULL) return NULL; diff --git a/src/core/security/security_connector.h b/src/core/security/security_connector.h index 47abe05cff..87b7ca9b8b 100644 --- a/src/core/security/security_connector.h +++ b/src/core/security/security_connector.h @@ -198,4 +198,8 @@ typedef struct { grpc_security_status grpc_ssl_server_security_connector_create( const grpc_ssl_server_config *config, grpc_security_connector **sc); +/* Util. */ +const tsi_peer_property *tsi_peer_get_property_by_name( + const tsi_peer *peer, const char *name); + #endif /* GRPC_INTERNAL_CORE_SECURITY_SECURITY_CONNECTOR_H */ diff --git a/src/core/support/subprocess_posix.c b/src/core/support/subprocess_posix.c index 4580537aa8..b4631fa0ed 100644 --- a/src/core/support/subprocess_posix.c +++ b/src/core/support/subprocess_posix.c @@ -55,9 +55,9 @@ struct gpr_subprocess { int joined; }; -char *gpr_subprocess_binary_extension() { return ""; } +const char *gpr_subprocess_binary_extension() { return ""; } -gpr_subprocess *gpr_subprocess_create(int argc, char **argv) { +gpr_subprocess *gpr_subprocess_create(int argc, const char **argv) { gpr_subprocess *r; int pid; char **exec_args; @@ -92,7 +92,11 @@ void gpr_subprocess_destroy(gpr_subprocess *p) { int gpr_subprocess_join(gpr_subprocess *p) { int status; +retry: if (waitpid(p->pid, &status, 0) == -1) { + if (errno == EINTR) { + goto retry; + } gpr_log(GPR_ERROR, "waitpid failed: %s", strerror(errno)); return -1; } diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 50df36cae9..4d2ba7cd7d 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -401,6 +401,7 @@ static int is_op_live(grpc_call *call, grpc_ioreq_op op) { static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); } static int need_more_data(grpc_call *call) { + if (call->read_state == READ_STATE_STREAM_CLOSED) return 0; return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) || (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) && grpc_bbq_empty(&call->incoming_queue)) || is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) || @@ -408,8 +409,7 @@ static int need_more_data(grpc_call *call) { is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) || (is_op_live(call, GRPC_IOREQ_RECV_CLOSE) && grpc_bbq_empty(&call->incoming_queue)) || - (call->write_state == WRITE_STATE_INITIAL && !call->is_client && - call->read_state < READ_STATE_GOT_INITIAL_METADATA); + (call->write_state == WRITE_STATE_INITIAL && !call->is_client); } static void unlock(grpc_call *call) { @@ -536,9 +536,8 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, switch ((grpc_ioreq_op)i) { case GRPC_IOREQ_RECV_MESSAGE: case GRPC_IOREQ_SEND_MESSAGE: - if (master->success) { - call->request_set[i] = REQSET_EMPTY; - } else { + call->request_set[i] = REQSET_EMPTY; + if (!master->success) { call->write_state = WRITE_STATE_WRITE_CLOSED; } break; @@ -583,11 +582,29 @@ static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, int success) { } } +static void early_out_write_ops(grpc_call *call) { + switch (call->write_state) { + case WRITE_STATE_WRITE_CLOSED: + finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0); + finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0); + finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0); + finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1); + /* fallthrough */ + case WRITE_STATE_STARTED: + finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0); + /* fallthrough */ + case WRITE_STATE_INITIAL: + /* do nothing */ + break; + } +} + static void call_on_done_send(void *pc, int success) { grpc_call *call = pc; lock(call); if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) { finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, success); + call->write_state = WRITE_STATE_STARTED; } if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_MESSAGE)) { finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, success); @@ -596,7 +613,13 @@ static void call_on_done_send(void *pc, int success) { finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, success); finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, success); finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1); + call->write_state = WRITE_STATE_WRITE_CLOSED; + } + if (!success) { + call->write_state = WRITE_STATE_WRITE_CLOSED; + early_out_write_ops(call); } + call->send_ops.nops = 0; call->last_send_contains = 0; call->sending = 0; unlock(call); @@ -810,7 +833,6 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { op->send_ops = &call->send_ops; op->bind_pollset = grpc_cq_pollset(call->cq); call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA; - call->write_state = WRITE_STATE_STARTED; call->send_initial_metadata_count = 0; /* fall through intended */ case WRITE_STATE_STARTED: @@ -826,7 +848,6 @@ static int fill_send_ops(grpc_call *call, grpc_transport_op *op) { op->is_last_send = 1; op->send_ops = &call->send_ops; call->last_send_contains |= 1 << GRPC_IOREQ_SEND_CLOSE; - call->write_state = WRITE_STATE_WRITE_CLOSED; if (!call->is_client) { /* send trailing metadata */ data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA]; @@ -918,23 +939,6 @@ static void finish_read_ops(grpc_call *call) { } } -static void early_out_write_ops(grpc_call *call) { - switch (call->write_state) { - case WRITE_STATE_WRITE_CLOSED: - finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0); - finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0); - finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0); - finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1); - /* fallthrough */ - case WRITE_STATE_STARTED: - finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0); - /* fallthrough */ - case WRITE_STATE_INITIAL: - /* do nothing */ - break; - } -} - static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, size_t nreqs, grpc_ioreq_completion_func completion, @@ -1175,6 +1179,10 @@ static void set_cancelled_value(grpc_status_code status, void *dest) { } static void finish_batch(grpc_call *call, int success, void *tag) { + grpc_cq_end_op(call->cq, tag, call, success); +} + +static void finish_batch_with_close(grpc_call *call, int success, void *tag) { grpc_cq_end_op(call->cq, tag, call, 1); } @@ -1185,6 +1193,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, size_t out; const grpc_op *op; grpc_ioreq *req; + void (*finish_func)(grpc_call *, int, void *) = finish_batch; GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag); @@ -1268,6 +1277,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, op->data.recv_status_on_client.trailing_metadata; req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_CLOSE; + finish_func = finish_batch_with_close; break; case GRPC_OP_RECV_CLOSE_ON_SERVER: req = &reqs[out++]; @@ -1277,13 +1287,14 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, op->data.recv_close_on_server.cancelled; req = &reqs[out++]; req->op = GRPC_IOREQ_RECV_CLOSE; + finish_func = finish_batch_with_close; break; } } grpc_cq_begin_op(call->cq, call); - return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_batch, + return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_func, tag); } diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 48910afda3..8c9ca48a05 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -275,14 +275,14 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); memset(&ret, 0, sizeof(ret)); ret.type = GRPC_QUEUE_TIMEOUT; - GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base); + GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); return ret; } } gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); ret = ev->base; gpr_free(ev); - GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base); + GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); return ret; } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 351ed5b758..d75af7291b 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -427,6 +427,8 @@ static void server_on_recv(void *ptr, int success) { grpc_iomgr_add_callback(kill_zombie, elem); } else if (calld->state == PENDING) { call_list_remove(calld, PENDING_START); + calld->state = ZOMBIED; + grpc_iomgr_add_callback(kill_zombie, elem); } gpr_mu_unlock(&chand->server->mu); break; diff --git a/src/core/transport/chttp2/frame.h b/src/core/transport/chttp2/frame.h index ac76c4cc9c..c9e3e13042 100644 --- a/src/core/transport/chttp2/frame.h +++ b/src/core/transport/chttp2/frame.h @@ -53,12 +53,14 @@ typedef struct { gpr_uint8 send_ping_ack; gpr_uint8 process_ping_reply; gpr_uint8 goaway; + gpr_uint8 rst_stream; gpr_int64 initial_window_update; gpr_uint32 window_update; gpr_uint32 goaway_last_stream_index; gpr_uint32 goaway_error; gpr_slice goaway_text; + gpr_uint32 rst_stream_reason; } grpc_chttp2_parse_state; #define GRPC_CHTTP2_FRAME_DATA 0 diff --git a/src/core/transport/chttp2/frame_rst_stream.c b/src/core/transport/chttp2/frame_rst_stream.c index 368ca86481..3016aac7a2 100644 --- a/src/core/transport/chttp2/frame_rst_stream.c +++ b/src/core/transport/chttp2/frame_rst_stream.c @@ -32,6 +32,9 @@ */ #include "src/core/transport/chttp2/frame_rst_stream.h" + +#include <grpc/support/log.h> + #include "src/core/transport/chttp2/frame.h" gpr_slice grpc_chttp2_rst_stream_create(gpr_uint32 id, gpr_uint32 code) { @@ -54,3 +57,40 @@ gpr_slice grpc_chttp2_rst_stream_create(gpr_uint32 id, gpr_uint32 code) { return slice; } + +grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame( + grpc_chttp2_rst_stream_parser *parser, gpr_uint32 length, gpr_uint8 flags) { + if (length != 4) { + gpr_log(GPR_ERROR, "invalid rst_stream: length=%d, flags=%02x", length, flags); + return GRPC_CHTTP2_CONNECTION_ERROR; + } + parser->byte = 0; + return GRPC_CHTTP2_PARSE_OK; +} + +grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse( + void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, + int is_last) { + gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice); + gpr_uint8 *const end = GPR_SLICE_END_PTR(slice); + gpr_uint8 *cur = beg; + grpc_chttp2_rst_stream_parser *p = parser; + + while (p->byte != 4 && cur != end) { + p->reason_bytes[p->byte] = *cur; + cur++; + p->byte++; + } + + if (p->byte == 4) { + GPR_ASSERT(is_last); + state->rst_stream = 1; + state->rst_stream_reason = + (((gpr_uint32)p->reason_bytes[0]) << 24) | + (((gpr_uint32)p->reason_bytes[1]) << 16) | + (((gpr_uint32)p->reason_bytes[2]) << 8) | + (((gpr_uint32)p->reason_bytes[3])); + } + + return GRPC_CHTTP2_PARSE_OK; +} diff --git a/src/core/transport/chttp2/frame_rst_stream.h b/src/core/transport/chttp2/frame_rst_stream.h index 2d3ee18637..07a3c98d03 100644 --- a/src/core/transport/chttp2/frame_rst_stream.h +++ b/src/core/transport/chttp2/frame_rst_stream.h @@ -35,7 +35,18 @@ #define GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_RST_STREAM_H #include <grpc/support/slice.h> +#include "src/core/transport/chttp2/frame.h" + +typedef struct { + gpr_uint8 byte; + gpr_uint8 reason_bytes[4]; +} grpc_chttp2_rst_stream_parser; gpr_slice grpc_chttp2_rst_stream_create(gpr_uint32 stream_id, gpr_uint32 code); +grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame( + grpc_chttp2_rst_stream_parser *parser, gpr_uint32 length, gpr_uint8 flags); +grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse( + void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last); + #endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_RST_STREAM_H */ diff --git a/src/core/transport/chttp2/hpack_parser.c b/src/core/transport/chttp2/hpack_parser.c index 3fd8f67226..a489543868 100644 --- a/src/core/transport/chttp2/hpack_parser.c +++ b/src/core/transport/chttp2/hpack_parser.c @@ -654,7 +654,7 @@ static int parse_stream_weight(grpc_chttp2_hpack_parser *p, return 1; } - return parse_begin(p, cur + 1, end); + return p->after_prioritization(p, cur + 1, end); } static int parse_stream_dep3(grpc_chttp2_hpack_parser *p, const gpr_uint8 *cur, @@ -1349,7 +1349,7 @@ void grpc_chttp2_hpack_parser_init(grpc_chttp2_hpack_parser *p, } void grpc_chttp2_hpack_parser_set_has_priority(grpc_chttp2_hpack_parser *p) { - GPR_ASSERT(p->state == parse_begin); + p->after_prioritization = p->state; p->state = parse_stream_dep0; } diff --git a/src/core/transport/chttp2/hpack_parser.h b/src/core/transport/chttp2/hpack_parser.h index bb4c1a1f49..bfc06b3980 100644 --- a/src/core/transport/chttp2/hpack_parser.h +++ b/src/core/transport/chttp2/hpack_parser.h @@ -62,6 +62,8 @@ struct grpc_chttp2_hpack_parser { grpc_chttp2_hpack_parser_state state; /* future states dependent on the opening op code */ const grpc_chttp2_hpack_parser_state *next_state; + /* what to do after skipping prioritization data */ + grpc_chttp2_hpack_parser_state after_prioritization; /* the value we're currently parsing */ union { gpr_uint32 *value; diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c index a6f9f782a1..9dc5f23389 100644 --- a/src/core/transport/chttp2_transport.c +++ b/src/core/transport/chttp2_transport.c @@ -154,7 +154,13 @@ typedef enum { WRITE_STATE_OPEN, WRITE_STATE_QUEUED_CLOSE, WRITE_STATE_SENT_CLOSE -} WRITE_STATE; +} write_state; + +typedef enum { + DONT_SEND_CLOSED = 0, + SEND_CLOSED, + SEND_CLOSED_WITH_RST_STREAM +} send_closed; typedef struct { stream *head; @@ -267,6 +273,7 @@ struct transport { grpc_chttp2_window_update_parser window_update; grpc_chttp2_settings_parser settings; grpc_chttp2_ping_parser ping; + grpc_chttp2_rst_stream_parser rst_stream; } simple_parsers; /* goaway */ @@ -312,8 +319,8 @@ struct stream { /* when the application requests writes be closed, the write_closed is 'queued'; when the close is flow controlled into the send path, we are 'sending' it; when the write has been performed it is 'sent' */ - WRITE_STATE write_state; - gpr_uint8 send_closed; + write_state write_state; + send_closed send_closed; gpr_uint8 read_closed; gpr_uint8 cancelled; @@ -937,7 +944,11 @@ static int prepare_write(transport *t) { if (s->write_state == WRITE_STATE_QUEUED_CLOSE && s->outgoing_sopb->nops == 0) { - s->send_closed = 1; + if (!t->is_client && !s->read_closed) { + s->send_closed = SEND_CLOSED_WITH_RST_STREAM; + } else { + s->send_closed = SEND_CLOSED; + } } if (s->writing_sopb.nops > 0 || s->send_closed) { stream_list_join(t, s, WRITING); @@ -982,9 +993,12 @@ static void finalize_outbuf(transport *t) { while ((s = stream_list_remove_head(t, WRITING))) { grpc_chttp2_encode(s->writing_sopb.ops, s->writing_sopb.nops, - s->send_closed, s->id, &t->hpack_compressor, &t->outbuf); + s->send_closed != DONT_SEND_CLOSED, s->id, &t->hpack_compressor, &t->outbuf); s->writing_sopb.nops = 0; - if (s->send_closed) { + if (s->send_closed == SEND_CLOSED_WITH_RST_STREAM) { + gpr_slice_buffer_add(&t->outbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_CHTTP2_NO_ERROR)); + } + if (s->send_closed != DONT_SEND_CLOSED) { stream_list_join(t, s, WRITTEN_CLOSED); } } @@ -999,9 +1013,10 @@ static void finish_write_common(transport *t, int success) { } while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) { s->write_state = WRITE_STATE_SENT_CLOSE; - if (1||!s->cancelled) { - maybe_finish_read(t, s); + if (!t->is_client) { + s->read_closed = 1; } + maybe_finish_read(t, s); } t->outbuf.count = 0; t->outbuf.length = 0; @@ -1127,6 +1142,7 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) { if (op->recv_ops) { GPR_ASSERT(s->incoming_sopb == NULL); + GPR_ASSERT(s->published_state != GRPC_STREAM_CLOSED); s->recv_done_closure.cb = op->on_done_recv; s->recv_done_closure.user_data = op->recv_user_data; s->incoming_sopb = op->recv_ops; @@ -1214,12 +1230,14 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, if (s) { /* clear out any unreported input & output: nobody cares anymore */ had_outgoing = s->outgoing_sopb && s->outgoing_sopb->nops != 0; - schedule_nuke_sopb(t, &s->parser.incoming_sopb); - if (s->outgoing_sopb) { - schedule_nuke_sopb(t, s->outgoing_sopb); - s->outgoing_sopb = NULL; - stream_list_remove(t, s, WRITABLE); - schedule_cb(t, s->send_done_closure, 0); + if (error_code != GRPC_CHTTP2_NO_ERROR) { + schedule_nuke_sopb(t, &s->parser.incoming_sopb); + if (s->outgoing_sopb) { + schedule_nuke_sopb(t, s->outgoing_sopb); + s->outgoing_sopb = NULL; + stream_list_remove(t, s, WRITABLE); + schedule_cb(t, s->send_done_closure, 0); + } } if (s->cancelled) { send_rst = 0; @@ -1228,31 +1246,34 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id, s->cancelled = 1; stream_list_join(t, s, CANCELLED); - gpr_ltoa(local_status, buffer); - add_incoming_metadata( - t, s, - grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer)); - if (!optional_message) { - switch (local_status) { - case GRPC_STATUS_CANCELLED: - add_incoming_metadata( - t, s, grpc_mdelem_from_strings(t->metadata_context, - "grpc-message", "Cancelled")); - break; - default: - break; - } - } else { + if (error_code != GRPC_CHTTP2_NO_ERROR) { + /* synthesize a status if we don't believe we'll get one */ + gpr_ltoa(local_status, buffer); add_incoming_metadata( t, s, - grpc_mdelem_from_metadata_strings( - t->metadata_context, - grpc_mdstr_from_string(t->metadata_context, "grpc-message"), - grpc_mdstr_ref(optional_message))); + grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer)); + if (!optional_message) { + switch (local_status) { + case GRPC_STATUS_CANCELLED: + add_incoming_metadata( + t, s, grpc_mdelem_from_strings(t->metadata_context, + "grpc-message", "Cancelled")); + break; + default: + break; + } + } else { + add_incoming_metadata( + t, s, + grpc_mdelem_from_metadata_strings( + t->metadata_context, + grpc_mdstr_from_string(t->metadata_context, "grpc-message"), + grpc_mdstr_ref(optional_message))); + } + add_metadata_batch(t, s); } - add_metadata_batch(t, s); - maybe_finish_read(t, s); } + maybe_finish_read(t, s); } if (!id) send_rst = 0; if (send_rst) { @@ -1527,6 +1548,19 @@ static int init_ping_parser(transport *t) { return ok; } +static int init_rst_stream_parser(transport *t) { + int ok = GRPC_CHTTP2_PARSE_OK == + grpc_chttp2_rst_stream_parser_begin_frame(&t->simple_parsers.rst_stream, + t->incoming_frame_size, + t->incoming_frame_flags); + if (!ok) { + drop_connection(t); + } + t->parser = grpc_chttp2_rst_stream_parser_parse; + t->parser_data = &t->simple_parsers.rst_stream; + return ok; +} + static int init_goaway_parser(transport *t) { int ok = GRPC_CHTTP2_PARSE_OK == @@ -1581,12 +1615,7 @@ static int init_frame_parser(transport *t) { gpr_log(GPR_ERROR, "Unexpected CONTINUATION frame"); return 0; case GRPC_CHTTP2_FRAME_RST_STREAM: - /* TODO(ctiller): actually parse the reason */ - cancel_stream_id( - t, t->incoming_stream_id, - grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_CANCEL), - GRPC_CHTTP2_CANCEL, 0); - return init_skip_frame(t, 0); + return init_rst_stream_parser(t); case GRPC_CHTTP2_FRAME_SETTINGS: return init_settings_frame_parser(t); case GRPC_CHTTP2_FRAME_WINDOW_UPDATE: @@ -1650,6 +1679,12 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) { if (st.goaway) { add_goaway(t, st.goaway_error, st.goaway_text); } + if (st.rst_stream) { + cancel_stream_id( + t, t->incoming_stream_id, + grpc_chttp2_http2_error_to_grpc_status(st.rst_stream_reason), + st.rst_stream_reason, 0); + } if (st.process_ping_reply) { for (i = 0; i < t->ping_count; i++) { if (0 == |