aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/channel/http_server_filter.c1
-rw-r--r--src/core/iomgr/pollset_posix.c7
-rw-r--r--src/core/iomgr/tcp_posix.c4
-rw-r--r--src/core/profiling/basic_timers.c32
-rw-r--r--src/core/profiling/stap_timers.c13
-rw-r--r--src/core/profiling/timers.h41
-rw-r--r--src/core/security/security_connector.c2
-rw-r--r--src/core/security/security_connector.h4
-rw-r--r--src/core/support/subprocess_posix.c2
-rw-r--r--src/core/surface/call.c5
-rw-r--r--src/core/surface/completion_queue.c4
-rw-r--r--src/core/surface/server.c2
-rw-r--r--src/core/transport/chttp2/frame.h2
-rw-r--r--src/core/transport/chttp2/frame_rst_stream.c40
-rw-r--r--src/core/transport/chttp2/frame_rst_stream.h11
-rw-r--r--src/core/transport/chttp2_transport.c117
16 files changed, 200 insertions, 87 deletions
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index 1f64df68e3..11a53b1e70 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -34,7 +34,6 @@
#include "src/core/channel/http_server_filter.h"
#include <string.h>
-#include <grpc/grpc_http.h>
#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
diff --git a/src/core/iomgr/pollset_posix.c b/src/core/iomgr/pollset_posix.c
index 4d1bcad9e2..ab1af0d4ee 100644
--- a/src/core/iomgr/pollset_posix.c
+++ b/src/core/iomgr/pollset_posix.c
@@ -258,7 +258,6 @@ static void unary_poll_do_promote(void *args, int success) {
grpc_pollset *pollset = up_args->pollset;
grpc_fd *fd = up_args->fd;
int do_shutdown_cb = 0;
- gpr_free(up_args);
/*
* This is quite tricky. There are a number of cases to keep in mind here:
@@ -273,8 +272,12 @@ static void unary_poll_do_promote(void *args, int success) {
/* First we need to ensure that nobody is polling concurrently */
while (pollset->counter != 0) {
grpc_pollset_kick(pollset);
- gpr_cv_wait(&pollset->cv, &pollset->mu, gpr_inf_future);
+ grpc_iomgr_add_callback(unary_poll_do_promote, up_args);
+ gpr_mu_unlock(&pollset->mu);
+ return;
}
+
+ gpr_free(up_args);
/* At this point the pollset may no longer be a unary poller. In that case
* we should just call the right add function and be done. */
/* TODO(klempner): If we're not careful this could cause infinite recursion.
diff --git a/src/core/iomgr/tcp_posix.c b/src/core/iomgr/tcp_posix.c
index f7dae5f86c..cd6b2ecae6 100644
--- a/src/core/iomgr/tcp_posix.c
+++ b/src/core/iomgr/tcp_posix.c
@@ -138,8 +138,10 @@ static void slice_state_remove_prefix(grpc_tcp_slice_state *state,
native "trim the first N bytes" operation to splice */
/* TODO(klempner): This really shouldn't be modifying the current slice
unless we own the slices array. */
- *current_slice = gpr_slice_split_tail(current_slice, prefix_bytes);
+ gpr_slice tail;
+ tail = gpr_slice_split_tail(current_slice, prefix_bytes);
gpr_slice_unref(*current_slice);
+ *current_slice = tail;
return;
} else {
gpr_slice_unref(*current_slice);
diff --git a/src/core/profiling/basic_timers.c b/src/core/profiling/basic_timers.c
index 124a8d6621..ae37f584eb 100644
--- a/src/core/profiling/basic_timers.c
+++ b/src/core/profiling/basic_timers.c
@@ -55,6 +55,7 @@ typedef enum {
typedef struct grpc_timer_entry {
grpc_precise_clock tm;
int tag;
+ const char* tagstr;
marker_type type;
void* id;
const char* file;
@@ -70,18 +71,19 @@ static void log_report() {
int i;
for (i = 0; i < count; i++) {
grpc_timer_entry* entry = &(log[i]);
- printf("GRPC_LAT_PROF " GRPC_PRECISE_CLOCK_FORMAT " %p %c %d %p %s %d\n",
+ printf("GRPC_LAT_PROF " GRPC_PRECISE_CLOCK_FORMAT
+ " %p %c %d(%s) %p %s %d\n",
GRPC_PRECISE_CLOCK_PRINTF_ARGS(&entry->tm),
(void*)(gpr_intptr)gpr_thd_currentid(), entry->type, entry->tag,
- entry->id, entry->file, entry->line);
+ entry->tagstr, entry->id, entry->file, entry->line);
}
/* Now clear out the log */
count = 0;
}
-static void grpc_timers_log_add(int tag, marker_type type, void* id,
- const char* file, int line) {
+static void grpc_timers_log_add(int tag, const char* tagstr, marker_type type,
+ void* id, const char* file, int line) {
grpc_timer_entry* entry;
/* TODO (vpai) : Improve concurrency */
@@ -93,6 +95,7 @@ static void grpc_timers_log_add(int tag, marker_type type, void* id,
grpc_precise_clock_now(&entry->tm);
entry->tag = tag;
+ entry->tagstr = tagstr;
entry->type = type;
entry->id = id;
entry->file = file;
@@ -100,28 +103,31 @@ static void grpc_timers_log_add(int tag, marker_type type, void* id,
}
/* Latency profiler API implementation. */
-void grpc_timer_add_mark(int tag, void* id, const char* file, int line) {
+void grpc_timer_add_mark(int tag, const char* tagstr, void* id,
+ const char* file, int line) {
if (tag < GRPC_PTAG_IGNORE_THRESHOLD) {
- grpc_timers_log_add(tag, MARK, id, file, line);
+ grpc_timers_log_add(tag, tagstr, MARK, id, file, line);
}
}
-void grpc_timer_add_important_mark(int tag, void* id, const char* file,
- int line) {
+void grpc_timer_add_important_mark(int tag, const char* tagstr, void* id,
+ const char* file, int line) {
if (tag < GRPC_PTAG_IGNORE_THRESHOLD) {
- grpc_timers_log_add(tag, IMPORTANT, id, file, line);
+ grpc_timers_log_add(tag, tagstr, IMPORTANT, id, file, line);
}
}
-void grpc_timer_begin(int tag, void* id, const char* file, int line) {
+void grpc_timer_begin(int tag, const char* tagstr, void* id, const char* file,
+ int line) {
if (tag < GRPC_PTAG_IGNORE_THRESHOLD) {
- grpc_timers_log_add(tag, BEGIN, id, file, line);
+ grpc_timers_log_add(tag, tagstr, BEGIN, id, file, line);
}
}
-void grpc_timer_end(int tag, void* id, const char* file, int line) {
+void grpc_timer_end(int tag, const char* tagstr, void* id, const char* file,
+ int line) {
if (tag < GRPC_PTAG_IGNORE_THRESHOLD) {
- grpc_timers_log_add(tag, END, id, file, line);
+ grpc_timers_log_add(tag, tagstr, END, id, file, line);
}
}
diff --git a/src/core/profiling/stap_timers.c b/src/core/profiling/stap_timers.c
index 064c86e794..99975163f9 100644
--- a/src/core/profiling/stap_timers.c
+++ b/src/core/profiling/stap_timers.c
@@ -42,20 +42,23 @@
#include "src/core/profiling/stap_probes.h"
/* Latency profiler API implementation. */
-void grpc_timer_add_mark(int tag, void* id, const char* file, int line) {
+void grpc_timer_add_mark(int tag, const char* tagstr, void* id,
+ const char* file, int line) {
_STAP_ADD_MARK(tag);
}
-void grpc_timer_add_important_mark(int tag, void* id, const char* file,
- int line) {
+void grpc_timer_add_important_mark(int tag, const char* tagstr, void* id,
+ const char* file, int line) {
_STAP_ADD_IMPORTANT_MARK(tag);
}
-void grpc_timer_begin(int tag, void* id, const char* file, int line) {
+void grpc_timer_begin(int tag, const char* tagstr, void* id, const char* file,
+ int line) {
_STAP_TIMING_NS_BEGIN(tag);
}
-void grpc_timer_end(int tag, void* id, const char* file, int line) {
+void grpc_timer_end(int tag, const char* tagstr, void* id, const char* file,
+ int line) {
_STAP_TIMING_NS_END(tag);
}
diff --git a/src/core/profiling/timers.h b/src/core/profiling/timers.h
index 4fb465c237..036d02f187 100644
--- a/src/core/profiling/timers.h
+++ b/src/core/profiling/timers.h
@@ -41,11 +41,14 @@ extern "C" {
void grpc_timers_global_init(void);
void grpc_timers_global_destroy(void);
-void grpc_timer_add_mark(int tag, void *id, const char *file, int line);
-void grpc_timer_add_important_mark(int tag, void *id, const char *file,
- int line);
-void grpc_timer_begin(int tag, void *id, const char *file, int line);
-void grpc_timer_end(int tag, void *id, const char *file, int line);
+void grpc_timer_add_mark(int tag, const char *tagstr, void *id,
+ const char *file, int line);
+void grpc_timer_add_important_mark(int tag, const char *tagstr, void *id,
+ const char *file, int line);
+void grpc_timer_begin(int tag, const char *tagstr, void *id, const char *file,
+ int line);
+void grpc_timer_end(int tag, const char *tagstr, void *id, const char *file,
+ int line);
enum grpc_profiling_tags {
/* Any GRPC_PTAG_* >= than the threshold won't generate any profiling mark. */
@@ -103,25 +106,27 @@ enum grpc_profiling_tags {
#endif
/* Generic profiling interface. */
-#define GRPC_TIMER_MARK(tag, id) \
- if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \
- grpc_timer_add_mark(tag, ((void *)(gpr_intptr)(id)), __FILE__, __LINE__); \
+#define GRPC_TIMER_MARK(tag, id) \
+ if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \
+ grpc_timer_add_mark(tag, #tag, ((void *)(gpr_intptr)(id)), __FILE__, \
+ __LINE__); \
}
-#define GRPC_TIMER_IMPORTANT_MARK(tag, id) \
- if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \
- grpc_timer_add_important_mark(tag, ((void *)(gpr_intptr)(id)), __FILE__, \
- __LINE__); \
+#define GRPC_TIMER_IMPORTANT_MARK(tag, id) \
+ if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \
+ grpc_timer_add_important_mark(tag, #tag, ((void *)(gpr_intptr)(id)), \
+ __FILE__, __LINE__); \
}
-#define GRPC_TIMER_BEGIN(tag, id) \
- if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \
- grpc_timer_begin(tag, ((void *)(gpr_intptr)(id)), __FILE__, __LINE__); \
+#define GRPC_TIMER_BEGIN(tag, id) \
+ if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \
+ grpc_timer_begin(tag, #tag, ((void *)(gpr_intptr)(id)), __FILE__, \
+ __LINE__); \
}
-#define GRPC_TIMER_END(tag, id) \
- if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \
- grpc_timer_end(tag, ((void *)(gpr_intptr)(id)), __FILE__, __LINE__); \
+#define GRPC_TIMER_END(tag, id) \
+ if (tag < GRPC_PTAG_IGNORE_THRESHOLD) { \
+ grpc_timer_end(tag, #tag, ((void *)(gpr_intptr)(id)), __FILE__, __LINE__); \
}
#ifdef GRPC_STAP_PROFILER
diff --git a/src/core/security/security_connector.c b/src/core/security/security_connector.c
index 61cb20f6b9..b17e0e0dfa 100644
--- a/src/core/security/security_connector.c
+++ b/src/core/security/security_connector.c
@@ -82,7 +82,7 @@ static const char *ssl_cipher_suites(void) {
/* -- Common methods. -- */
/* Returns the first property with that name. */
-static const tsi_peer_property *tsi_peer_get_property_by_name(
+const tsi_peer_property *tsi_peer_get_property_by_name(
const tsi_peer *peer, const char *name) {
size_t i;
if (peer == NULL) return NULL;
diff --git a/src/core/security/security_connector.h b/src/core/security/security_connector.h
index 47abe05cff..87b7ca9b8b 100644
--- a/src/core/security/security_connector.h
+++ b/src/core/security/security_connector.h
@@ -198,4 +198,8 @@ typedef struct {
grpc_security_status grpc_ssl_server_security_connector_create(
const grpc_ssl_server_config *config, grpc_security_connector **sc);
+/* Util. */
+const tsi_peer_property *tsi_peer_get_property_by_name(
+ const tsi_peer *peer, const char *name);
+
#endif /* GRPC_INTERNAL_CORE_SECURITY_SECURITY_CONNECTOR_H */
diff --git a/src/core/support/subprocess_posix.c b/src/core/support/subprocess_posix.c
index 4580537aa8..642520bb47 100644
--- a/src/core/support/subprocess_posix.c
+++ b/src/core/support/subprocess_posix.c
@@ -55,7 +55,7 @@ struct gpr_subprocess {
int joined;
};
-char *gpr_subprocess_binary_extension() { return ""; }
+const char *gpr_subprocess_binary_extension() { return ""; }
gpr_subprocess *gpr_subprocess_create(int argc, char **argv) {
gpr_subprocess *r;
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 50df36cae9..ec6fd65ea3 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -401,6 +401,7 @@ static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
static int need_more_data(grpc_call *call) {
+ if (call->read_state == READ_STATE_STREAM_CLOSED) return 0;
return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) ||
(is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) && grpc_bbq_empty(&call->incoming_queue)) ||
is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) ||
@@ -408,8 +409,7 @@ static int need_more_data(grpc_call *call) {
is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) ||
(is_op_live(call, GRPC_IOREQ_RECV_CLOSE) &&
grpc_bbq_empty(&call->incoming_queue)) ||
- (call->write_state == WRITE_STATE_INITIAL && !call->is_client &&
- call->read_state < READ_STATE_GOT_INITIAL_METADATA);
+ (call->write_state == WRITE_STATE_INITIAL && !call->is_client);
}
static void unlock(grpc_call *call) {
@@ -597,6 +597,7 @@ static void call_on_done_send(void *pc, int success) {
finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, success);
finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1);
}
+ call->send_ops.nops = 0;
call->last_send_contains = 0;
call->sending = 0;
unlock(call);
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 48910afda3..8c9ca48a05 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -275,14 +275,14 @@ grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
memset(&ret, 0, sizeof(ret));
ret.type = GRPC_QUEUE_TIMEOUT;
- GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
+ GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
return ret;
}
}
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
ret = ev->base;
gpr_free(ev);
- GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
+ GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
return ret;
}
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 351ed5b758..d75af7291b 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -427,6 +427,8 @@ static void server_on_recv(void *ptr, int success) {
grpc_iomgr_add_callback(kill_zombie, elem);
} else if (calld->state == PENDING) {
call_list_remove(calld, PENDING_START);
+ calld->state = ZOMBIED;
+ grpc_iomgr_add_callback(kill_zombie, elem);
}
gpr_mu_unlock(&chand->server->mu);
break;
diff --git a/src/core/transport/chttp2/frame.h b/src/core/transport/chttp2/frame.h
index ac76c4cc9c..c9e3e13042 100644
--- a/src/core/transport/chttp2/frame.h
+++ b/src/core/transport/chttp2/frame.h
@@ -53,12 +53,14 @@ typedef struct {
gpr_uint8 send_ping_ack;
gpr_uint8 process_ping_reply;
gpr_uint8 goaway;
+ gpr_uint8 rst_stream;
gpr_int64 initial_window_update;
gpr_uint32 window_update;
gpr_uint32 goaway_last_stream_index;
gpr_uint32 goaway_error;
gpr_slice goaway_text;
+ gpr_uint32 rst_stream_reason;
} grpc_chttp2_parse_state;
#define GRPC_CHTTP2_FRAME_DATA 0
diff --git a/src/core/transport/chttp2/frame_rst_stream.c b/src/core/transport/chttp2/frame_rst_stream.c
index 368ca86481..3016aac7a2 100644
--- a/src/core/transport/chttp2/frame_rst_stream.c
+++ b/src/core/transport/chttp2/frame_rst_stream.c
@@ -32,6 +32,9 @@
*/
#include "src/core/transport/chttp2/frame_rst_stream.h"
+
+#include <grpc/support/log.h>
+
#include "src/core/transport/chttp2/frame.h"
gpr_slice grpc_chttp2_rst_stream_create(gpr_uint32 id, gpr_uint32 code) {
@@ -54,3 +57,40 @@ gpr_slice grpc_chttp2_rst_stream_create(gpr_uint32 id, gpr_uint32 code) {
return slice;
}
+
+grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame(
+ grpc_chttp2_rst_stream_parser *parser, gpr_uint32 length, gpr_uint8 flags) {
+ if (length != 4) {
+ gpr_log(GPR_ERROR, "invalid rst_stream: length=%d, flags=%02x", length, flags);
+ return GRPC_CHTTP2_CONNECTION_ERROR;
+ }
+ parser->byte = 0;
+ return GRPC_CHTTP2_PARSE_OK;
+}
+
+grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse(
+ void *parser, grpc_chttp2_parse_state *state, gpr_slice slice,
+ int is_last) {
+ gpr_uint8 *const beg = GPR_SLICE_START_PTR(slice);
+ gpr_uint8 *const end = GPR_SLICE_END_PTR(slice);
+ gpr_uint8 *cur = beg;
+ grpc_chttp2_rst_stream_parser *p = parser;
+
+ while (p->byte != 4 && cur != end) {
+ p->reason_bytes[p->byte] = *cur;
+ cur++;
+ p->byte++;
+ }
+
+ if (p->byte == 4) {
+ GPR_ASSERT(is_last);
+ state->rst_stream = 1;
+ state->rst_stream_reason =
+ (((gpr_uint32)p->reason_bytes[0]) << 24) |
+ (((gpr_uint32)p->reason_bytes[1]) << 16) |
+ (((gpr_uint32)p->reason_bytes[2]) << 8) |
+ (((gpr_uint32)p->reason_bytes[3]));
+ }
+
+ return GRPC_CHTTP2_PARSE_OK;
+}
diff --git a/src/core/transport/chttp2/frame_rst_stream.h b/src/core/transport/chttp2/frame_rst_stream.h
index 2d3ee18637..07a3c98d03 100644
--- a/src/core/transport/chttp2/frame_rst_stream.h
+++ b/src/core/transport/chttp2/frame_rst_stream.h
@@ -35,7 +35,18 @@
#define GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_RST_STREAM_H
#include <grpc/support/slice.h>
+#include "src/core/transport/chttp2/frame.h"
+
+typedef struct {
+ gpr_uint8 byte;
+ gpr_uint8 reason_bytes[4];
+} grpc_chttp2_rst_stream_parser;
gpr_slice grpc_chttp2_rst_stream_create(gpr_uint32 stream_id, gpr_uint32 code);
+grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_begin_frame(
+ grpc_chttp2_rst_stream_parser *parser, gpr_uint32 length, gpr_uint8 flags);
+grpc_chttp2_parse_error grpc_chttp2_rst_stream_parser_parse(
+ void *parser, grpc_chttp2_parse_state *state, gpr_slice slice, int is_last);
+
#endif /* GRPC_INTERNAL_CORE_TRANSPORT_CHTTP2_FRAME_RST_STREAM_H */
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index a6f9f782a1..9dc5f23389 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -154,7 +154,13 @@ typedef enum {
WRITE_STATE_OPEN,
WRITE_STATE_QUEUED_CLOSE,
WRITE_STATE_SENT_CLOSE
-} WRITE_STATE;
+} write_state;
+
+typedef enum {
+ DONT_SEND_CLOSED = 0,
+ SEND_CLOSED,
+ SEND_CLOSED_WITH_RST_STREAM
+} send_closed;
typedef struct {
stream *head;
@@ -267,6 +273,7 @@ struct transport {
grpc_chttp2_window_update_parser window_update;
grpc_chttp2_settings_parser settings;
grpc_chttp2_ping_parser ping;
+ grpc_chttp2_rst_stream_parser rst_stream;
} simple_parsers;
/* goaway */
@@ -312,8 +319,8 @@ struct stream {
/* when the application requests writes be closed, the write_closed is
'queued'; when the close is flow controlled into the send path, we are
'sending' it; when the write has been performed it is 'sent' */
- WRITE_STATE write_state;
- gpr_uint8 send_closed;
+ write_state write_state;
+ send_closed send_closed;
gpr_uint8 read_closed;
gpr_uint8 cancelled;
@@ -937,7 +944,11 @@ static int prepare_write(transport *t) {
if (s->write_state == WRITE_STATE_QUEUED_CLOSE &&
s->outgoing_sopb->nops == 0) {
- s->send_closed = 1;
+ if (!t->is_client && !s->read_closed) {
+ s->send_closed = SEND_CLOSED_WITH_RST_STREAM;
+ } else {
+ s->send_closed = SEND_CLOSED;
+ }
}
if (s->writing_sopb.nops > 0 || s->send_closed) {
stream_list_join(t, s, WRITING);
@@ -982,9 +993,12 @@ static void finalize_outbuf(transport *t) {
while ((s = stream_list_remove_head(t, WRITING))) {
grpc_chttp2_encode(s->writing_sopb.ops, s->writing_sopb.nops,
- s->send_closed, s->id, &t->hpack_compressor, &t->outbuf);
+ s->send_closed != DONT_SEND_CLOSED, s->id, &t->hpack_compressor, &t->outbuf);
s->writing_sopb.nops = 0;
- if (s->send_closed) {
+ if (s->send_closed == SEND_CLOSED_WITH_RST_STREAM) {
+ gpr_slice_buffer_add(&t->outbuf, grpc_chttp2_rst_stream_create(s->id, GRPC_CHTTP2_NO_ERROR));
+ }
+ if (s->send_closed != DONT_SEND_CLOSED) {
stream_list_join(t, s, WRITTEN_CLOSED);
}
}
@@ -999,9 +1013,10 @@ static void finish_write_common(transport *t, int success) {
}
while ((s = stream_list_remove_head(t, WRITTEN_CLOSED))) {
s->write_state = WRITE_STATE_SENT_CLOSE;
- if (1||!s->cancelled) {
- maybe_finish_read(t, s);
+ if (!t->is_client) {
+ s->read_closed = 1;
}
+ maybe_finish_read(t, s);
}
t->outbuf.count = 0;
t->outbuf.length = 0;
@@ -1127,6 +1142,7 @@ static void perform_op_locked(transport *t, stream *s, grpc_transport_op *op) {
if (op->recv_ops) {
GPR_ASSERT(s->incoming_sopb == NULL);
+ GPR_ASSERT(s->published_state != GRPC_STREAM_CLOSED);
s->recv_done_closure.cb = op->on_done_recv;
s->recv_done_closure.user_data = op->recv_user_data;
s->incoming_sopb = op->recv_ops;
@@ -1214,12 +1230,14 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
if (s) {
/* clear out any unreported input & output: nobody cares anymore */
had_outgoing = s->outgoing_sopb && s->outgoing_sopb->nops != 0;
- schedule_nuke_sopb(t, &s->parser.incoming_sopb);
- if (s->outgoing_sopb) {
- schedule_nuke_sopb(t, s->outgoing_sopb);
- s->outgoing_sopb = NULL;
- stream_list_remove(t, s, WRITABLE);
- schedule_cb(t, s->send_done_closure, 0);
+ if (error_code != GRPC_CHTTP2_NO_ERROR) {
+ schedule_nuke_sopb(t, &s->parser.incoming_sopb);
+ if (s->outgoing_sopb) {
+ schedule_nuke_sopb(t, s->outgoing_sopb);
+ s->outgoing_sopb = NULL;
+ stream_list_remove(t, s, WRITABLE);
+ schedule_cb(t, s->send_done_closure, 0);
+ }
}
if (s->cancelled) {
send_rst = 0;
@@ -1228,31 +1246,34 @@ static void cancel_stream_inner(transport *t, stream *s, gpr_uint32 id,
s->cancelled = 1;
stream_list_join(t, s, CANCELLED);
- gpr_ltoa(local_status, buffer);
- add_incoming_metadata(
- t, s,
- grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
- if (!optional_message) {
- switch (local_status) {
- case GRPC_STATUS_CANCELLED:
- add_incoming_metadata(
- t, s, grpc_mdelem_from_strings(t->metadata_context,
- "grpc-message", "Cancelled"));
- break;
- default:
- break;
- }
- } else {
+ if (error_code != GRPC_CHTTP2_NO_ERROR) {
+ /* synthesize a status if we don't believe we'll get one */
+ gpr_ltoa(local_status, buffer);
add_incoming_metadata(
t, s,
- grpc_mdelem_from_metadata_strings(
- t->metadata_context,
- grpc_mdstr_from_string(t->metadata_context, "grpc-message"),
- grpc_mdstr_ref(optional_message)));
+ grpc_mdelem_from_strings(t->metadata_context, "grpc-status", buffer));
+ if (!optional_message) {
+ switch (local_status) {
+ case GRPC_STATUS_CANCELLED:
+ add_incoming_metadata(
+ t, s, grpc_mdelem_from_strings(t->metadata_context,
+ "grpc-message", "Cancelled"));
+ break;
+ default:
+ break;
+ }
+ } else {
+ add_incoming_metadata(
+ t, s,
+ grpc_mdelem_from_metadata_strings(
+ t->metadata_context,
+ grpc_mdstr_from_string(t->metadata_context, "grpc-message"),
+ grpc_mdstr_ref(optional_message)));
+ }
+ add_metadata_batch(t, s);
}
- add_metadata_batch(t, s);
- maybe_finish_read(t, s);
}
+ maybe_finish_read(t, s);
}
if (!id) send_rst = 0;
if (send_rst) {
@@ -1527,6 +1548,19 @@ static int init_ping_parser(transport *t) {
return ok;
}
+static int init_rst_stream_parser(transport *t) {
+ int ok = GRPC_CHTTP2_PARSE_OK ==
+ grpc_chttp2_rst_stream_parser_begin_frame(&t->simple_parsers.rst_stream,
+ t->incoming_frame_size,
+ t->incoming_frame_flags);
+ if (!ok) {
+ drop_connection(t);
+ }
+ t->parser = grpc_chttp2_rst_stream_parser_parse;
+ t->parser_data = &t->simple_parsers.rst_stream;
+ return ok;
+}
+
static int init_goaway_parser(transport *t) {
int ok =
GRPC_CHTTP2_PARSE_OK ==
@@ -1581,12 +1615,7 @@ static int init_frame_parser(transport *t) {
gpr_log(GPR_ERROR, "Unexpected CONTINUATION frame");
return 0;
case GRPC_CHTTP2_FRAME_RST_STREAM:
- /* TODO(ctiller): actually parse the reason */
- cancel_stream_id(
- t, t->incoming_stream_id,
- grpc_chttp2_http2_error_to_grpc_status(GRPC_CHTTP2_CANCEL),
- GRPC_CHTTP2_CANCEL, 0);
- return init_skip_frame(t, 0);
+ return init_rst_stream_parser(t);
case GRPC_CHTTP2_FRAME_SETTINGS:
return init_settings_frame_parser(t);
case GRPC_CHTTP2_FRAME_WINDOW_UPDATE:
@@ -1650,6 +1679,12 @@ static int parse_frame_slice(transport *t, gpr_slice slice, int is_last) {
if (st.goaway) {
add_goaway(t, st.goaway_error, st.goaway_text);
}
+ if (st.rst_stream) {
+ cancel_stream_id(
+ t, t->incoming_stream_id,
+ grpc_chttp2_http2_error_to_grpc_status(st.rst_stream_reason),
+ st.rst_stream_reason, 0);
+ }
if (st.process_ping_reply) {
for (i = 0; i < t->ping_count; i++) {
if (0 ==