aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
Diffstat (limited to 'src/core')
-rw-r--r--src/core/census/context.c2
-rw-r--r--src/core/iomgr/pollset_windows.c10
-rw-r--r--src/core/iomgr/pollset_windows.h1
-rw-r--r--src/core/surface/call.c18
-rw-r--r--src/core/surface/completion_queue.c17
-rw-r--r--src/core/surface/completion_queue.h12
6 files changed, 39 insertions, 21 deletions
diff --git a/src/core/census/context.c b/src/core/census/context.c
index 1358c5127b..df238ec98c 100644
--- a/src/core/census/context.c
+++ b/src/core/census/context.c
@@ -31,7 +31,7 @@
*
*/
-#include "context.h"
+#include "src/core/census/context.h"
#include <string.h>
#include <grpc/census.h>
diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c
index 926ee8fdd9..8d6bc79c96 100644
--- a/src/core/iomgr/pollset_windows.c
+++ b/src/core/iomgr/pollset_windows.c
@@ -48,6 +48,7 @@
won't actually do any polling, and return as quickly as possible. */
void grpc_pollset_init(grpc_pollset *pollset) {
+ memset(pollset, 0, sizeof(*pollset));
gpr_mu_init(&pollset->mu);
gpr_cv_init(&pollset->cv);
}
@@ -55,7 +56,10 @@ void grpc_pollset_init(grpc_pollset *pollset) {
void grpc_pollset_shutdown(grpc_pollset *pollset,
void (*shutdown_done)(void *arg),
void *shutdown_done_arg) {
- grpc_pollset_kick(pollset);
+ gpr_mu_lock(&pollset->mu);
+ pollset->shutting_down = 1;
+ gpr_cv_broadcast(&pollset->cv);
+ gpr_mu_unlock(&pollset->mu);
shutdown_done(shutdown_done_arg);
}
@@ -76,7 +80,9 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) {
if (grpc_alarm_check(&pollset->mu, now, &deadline)) {
return 1 /* GPR_TRUE */;
}
- gpr_cv_wait(&pollset->cv, &pollset->mu, deadline);
+ if (!pollset->shutting_down) {
+ gpr_cv_wait(&pollset->cv, &pollset->mu, deadline);
+ }
return 1 /* GPR_TRUE */;
}
diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h
index b4aec1b809..57a2907926 100644
--- a/src/core/iomgr/pollset_windows.h
+++ b/src/core/iomgr/pollset_windows.h
@@ -46,6 +46,7 @@
typedef struct grpc_pollset {
gpr_mu mu;
gpr_cv cv;
+ int shutting_down;
} grpc_pollset;
#define GRPC_POLLSET_MU(pollset) (&(pollset)->mu)
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 7a8eb8c54f..84ae038e46 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -158,6 +158,9 @@ struct grpc_call {
gpr_uint8 reading_message;
/* have we bound a pollset yet? */
gpr_uint8 bound_pollset;
+ /* is an error status set */
+ gpr_uint8 error_status_set;
+
/* flags with bits corresponding to write states allowing us to determine
what was sent */
gpr_uint16 last_send_contains;
@@ -216,7 +219,7 @@ struct grpc_call {
/* Received call statuses from various sources */
received_status status[STATUS_SOURCE_COUNT];
- /** Compression level for the call */
+ /* Compression level for the call */
grpc_compression_level compression_level;
/* Contexts for various subsystems (security, tracing, ...). */
@@ -417,6 +420,7 @@ static void set_status_code(grpc_call *call, status_source source,
call->status[source].is_set = 1;
call->status[source].code = status;
+ call->error_status_set = status != GRPC_STATUS_OK;
if (status != GRPC_STATUS_OK && !grpc_bbq_empty(&call->incoming_queue)) {
grpc_bbq_flush(&call->incoming_queue);
@@ -694,13 +698,13 @@ static void call_on_done_send(void *pc, int success) {
}
static void finish_message(grpc_call *call) {
- /* TODO(ctiller): this could be a lot faster if coded directly */
- grpc_byte_buffer *byte_buffer = grpc_raw_byte_buffer_create(
- call->incoming_message.slices, call->incoming_message.count);
+ if (call->error_status_set == 0) {
+ /* TODO(ctiller): this could be a lot faster if coded directly */
+ grpc_byte_buffer *byte_buffer = grpc_raw_byte_buffer_create(
+ call->incoming_message.slices, call->incoming_message.count);
+ grpc_bbq_push(&call->incoming_queue, byte_buffer);
+ }
gpr_slice_buffer_reset_and_unref(&call->incoming_message);
-
- grpc_bbq_push(&call->incoming_queue, byte_buffer);
-
GPR_ASSERT(call->incoming_message.count == 0);
call->reading_message = 0;
}
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 063a23cfb1..030a8b4e6f 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -88,9 +88,11 @@ grpc_completion_queue *grpc_completion_queue_create(void) {
}
#ifdef GRPC_CQ_REF_COUNT_DEBUG
-void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason) {
- gpr_log(GPR_DEBUG, "CQ:%p ref %d -> %d %s", cc, (int)cc->owning_refs.count,
- (int)cc->owning_refs.count + 1, reason);
+void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
+ const char *file, int line) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p ref %d -> %d %s",
+ cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1,
+ reason);
#else
void grpc_cq_internal_ref(grpc_completion_queue *cc) {
#endif
@@ -103,9 +105,11 @@ static void on_pollset_destroy_done(void *arg) {
}
#ifdef GRPC_CQ_REF_COUNT_DEBUG
-void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason) {
- gpr_log(GPR_DEBUG, "CQ:%p unref %d -> %d %s", cc, (int)cc->owning_refs.count,
- (int)cc->owning_refs.count - 1, reason);
+void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason,
+ const char *file, int line) {
+ gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s",
+ cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1,
+ reason);
#else
void grpc_cq_internal_unref(grpc_completion_queue *cc) {
#endif
@@ -208,7 +212,6 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
}
if (cc->shutdown) {
ev = create_shutdown_event();
- grpc_pollset_kick(&cc->pollset);
break;
}
if (!grpc_pollset_work(&cc->pollset, deadline)) {
diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h
index e76910c00b..1b9010f462 100644
--- a/src/core/surface/completion_queue.h
+++ b/src/core/surface/completion_queue.h
@@ -40,10 +40,14 @@
#include <grpc/grpc.h>
#ifdef GRPC_CQ_REF_COUNT_DEBUG
-void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason);
-void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason);
-#define GRPC_CQ_INTERNAL_REF(cc, reason) grpc_cq_internal_ref(cc, reason)
-#define GRPC_CQ_INTERNAL_UNREF(cc, reason) grpc_cq_internal_unref(cc, reason)
+void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
+ const char *file, int line);
+void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason,
+ const char *file, int line);
+#define GRPC_CQ_INTERNAL_REF(cc, reason) \
+ grpc_cq_internal_ref(cc, reason, __FILE__, __LINE__)
+#define GRPC_CQ_INTERNAL_UNREF(cc, reason) \
+ grpc_cq_internal_unref(cc, reason, __FILE__, __LINE__)
#else
void grpc_cq_internal_ref(grpc_completion_queue *cc);
void grpc_cq_internal_unref(grpc_completion_queue *cc);