diff options
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/census/context.c | 2 | ||||
-rw-r--r-- | src/core/iomgr/pollset_windows.c | 10 | ||||
-rw-r--r-- | src/core/iomgr/pollset_windows.h | 1 | ||||
-rw-r--r-- | src/core/surface/call.c | 18 | ||||
-rw-r--r-- | src/core/surface/completion_queue.c | 17 | ||||
-rw-r--r-- | src/core/surface/completion_queue.h | 12 |
6 files changed, 39 insertions, 21 deletions
diff --git a/src/core/census/context.c b/src/core/census/context.c index 1358c5127b..df238ec98c 100644 --- a/src/core/census/context.c +++ b/src/core/census/context.c @@ -31,7 +31,7 @@ * */ -#include "context.h" +#include "src/core/census/context.h" #include <string.h> #include <grpc/census.h> diff --git a/src/core/iomgr/pollset_windows.c b/src/core/iomgr/pollset_windows.c index 926ee8fdd9..8d6bc79c96 100644 --- a/src/core/iomgr/pollset_windows.c +++ b/src/core/iomgr/pollset_windows.c @@ -48,6 +48,7 @@ won't actually do any polling, and return as quickly as possible. */ void grpc_pollset_init(grpc_pollset *pollset) { + memset(pollset, 0, sizeof(*pollset)); gpr_mu_init(&pollset->mu); gpr_cv_init(&pollset->cv); } @@ -55,7 +56,10 @@ void grpc_pollset_init(grpc_pollset *pollset) { void grpc_pollset_shutdown(grpc_pollset *pollset, void (*shutdown_done)(void *arg), void *shutdown_done_arg) { - grpc_pollset_kick(pollset); + gpr_mu_lock(&pollset->mu); + pollset->shutting_down = 1; + gpr_cv_broadcast(&pollset->cv); + gpr_mu_unlock(&pollset->mu); shutdown_done(shutdown_done_arg); } @@ -76,7 +80,9 @@ int grpc_pollset_work(grpc_pollset *pollset, gpr_timespec deadline) { if (grpc_alarm_check(&pollset->mu, now, &deadline)) { return 1 /* GPR_TRUE */; } - gpr_cv_wait(&pollset->cv, &pollset->mu, deadline); + if (!pollset->shutting_down) { + gpr_cv_wait(&pollset->cv, &pollset->mu, deadline); + } return 1 /* GPR_TRUE */; } diff --git a/src/core/iomgr/pollset_windows.h b/src/core/iomgr/pollset_windows.h index b4aec1b809..57a2907926 100644 --- a/src/core/iomgr/pollset_windows.h +++ b/src/core/iomgr/pollset_windows.h @@ -46,6 +46,7 @@ typedef struct grpc_pollset { gpr_mu mu; gpr_cv cv; + int shutting_down; } grpc_pollset; #define GRPC_POLLSET_MU(pollset) (&(pollset)->mu) diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 7a8eb8c54f..84ae038e46 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -158,6 +158,9 @@ struct grpc_call { gpr_uint8 reading_message; /* have we bound a pollset yet? */ gpr_uint8 bound_pollset; + /* is an error status set */ + gpr_uint8 error_status_set; + /* flags with bits corresponding to write states allowing us to determine what was sent */ gpr_uint16 last_send_contains; @@ -216,7 +219,7 @@ struct grpc_call { /* Received call statuses from various sources */ received_status status[STATUS_SOURCE_COUNT]; - /** Compression level for the call */ + /* Compression level for the call */ grpc_compression_level compression_level; /* Contexts for various subsystems (security, tracing, ...). */ @@ -417,6 +420,7 @@ static void set_status_code(grpc_call *call, status_source source, call->status[source].is_set = 1; call->status[source].code = status; + call->error_status_set = status != GRPC_STATUS_OK; if (status != GRPC_STATUS_OK && !grpc_bbq_empty(&call->incoming_queue)) { grpc_bbq_flush(&call->incoming_queue); @@ -694,13 +698,13 @@ static void call_on_done_send(void *pc, int success) { } static void finish_message(grpc_call *call) { - /* TODO(ctiller): this could be a lot faster if coded directly */ - grpc_byte_buffer *byte_buffer = grpc_raw_byte_buffer_create( - call->incoming_message.slices, call->incoming_message.count); + if (call->error_status_set == 0) { + /* TODO(ctiller): this could be a lot faster if coded directly */ + grpc_byte_buffer *byte_buffer = grpc_raw_byte_buffer_create( + call->incoming_message.slices, call->incoming_message.count); + grpc_bbq_push(&call->incoming_queue, byte_buffer); + } gpr_slice_buffer_reset_and_unref(&call->incoming_message); - - grpc_bbq_push(&call->incoming_queue, byte_buffer); - GPR_ASSERT(call->incoming_message.count == 0); call->reading_message = 0; } diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 063a23cfb1..030a8b4e6f 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -88,9 +88,11 @@ grpc_completion_queue *grpc_completion_queue_create(void) { } #ifdef GRPC_CQ_REF_COUNT_DEBUG -void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason) { - gpr_log(GPR_DEBUG, "CQ:%p ref %d -> %d %s", cc, (int)cc->owning_refs.count, - (int)cc->owning_refs.count + 1, reason); +void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, + const char *file, int line) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p ref %d -> %d %s", + cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1, + reason); #else void grpc_cq_internal_ref(grpc_completion_queue *cc) { #endif @@ -103,9 +105,11 @@ static void on_pollset_destroy_done(void *arg) { } #ifdef GRPC_CQ_REF_COUNT_DEBUG -void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason) { - gpr_log(GPR_DEBUG, "CQ:%p unref %d -> %d %s", cc, (int)cc->owning_refs.count, - (int)cc->owning_refs.count - 1, reason); +void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason, + const char *file, int line) { + gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s", + cc, (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, + reason); #else void grpc_cq_internal_unref(grpc_completion_queue *cc) { #endif @@ -208,7 +212,6 @@ grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, } if (cc->shutdown) { ev = create_shutdown_event(); - grpc_pollset_kick(&cc->pollset); break; } if (!grpc_pollset_work(&cc->pollset, deadline)) { diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index e76910c00b..1b9010f462 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -40,10 +40,14 @@ #include <grpc/grpc.h> #ifdef GRPC_CQ_REF_COUNT_DEBUG -void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason); -void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason); -#define GRPC_CQ_INTERNAL_REF(cc, reason) grpc_cq_internal_ref(cc, reason) -#define GRPC_CQ_INTERNAL_UNREF(cc, reason) grpc_cq_internal_unref(cc, reason) +void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason, + const char *file, int line); +void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason, + const char *file, int line); +#define GRPC_CQ_INTERNAL_REF(cc, reason) \ + grpc_cq_internal_ref(cc, reason, __FILE__, __LINE__) +#define GRPC_CQ_INTERNAL_UNREF(cc, reason) \ + grpc_cq_internal_unref(cc, reason, __FILE__, __LINE__) #else void grpc_cq_internal_ref(grpc_completion_queue *cc); void grpc_cq_internal_unref(grpc_completion_queue *cc); |