diff options
author | 2015-11-02 14:19:15 -0800 | |
---|---|---|
committer | 2015-11-02 14:19:15 -0800 | |
commit | 592560310102a0ef6354ad0556a5a717e0dc73a0 (patch) | |
tree | 49dd6a4269c1d79d93a6d0ce8801268e95d7a5b9 | |
parent | 93b944785c9fe9b6fd0d1781027fb072fe818496 (diff) |
stream_op cleanup: miscellany
-rw-r--r-- | include/grpc++/alarm.h | 2 | ||||
-rw-r--r-- | include/grpc/support/port_platform.h | 2 | ||||
-rw-r--r-- | include/grpc/support/slice_buffer.h | 5 | ||||
-rw-r--r-- | src/core/profiling/basic_timers.c | 6 | ||||
-rw-r--r-- | src/core/support/slice_buffer.c | 49 | ||||
-rw-r--r-- | src/core/support/sync_posix.c | 5 | ||||
-rw-r--r-- | src/cpp/common/alarm.cc | 8 | ||||
-rw-r--r-- | src/cpp/server/server.cc | 2 |
8 files changed, 67 insertions, 12 deletions
diff --git a/include/grpc++/alarm.h b/include/grpc++/alarm.h index 8cf7f59290..957d11c500 100644 --- a/include/grpc++/alarm.h +++ b/include/grpc++/alarm.h @@ -43,7 +43,7 @@ namespace grpc { /// A thin wrapper around \a grpc_alarm (see / \a / src/core/surface/alarm.h). -class Alarm: public GrpcLibrary { +class Alarm : public GrpcLibrary { public: /// Create a completion queue alarm instance associated to \a cq. /// diff --git a/include/grpc/support/port_platform.h b/include/grpc/support/port_platform.h index f98038da52..63e692cdb5 100644 --- a/include/grpc/support/port_platform.h +++ b/include/grpc/support/port_platform.h @@ -181,9 +181,9 @@ #ifndef _BSD_SOURCE #define _BSD_SOURCE #endif -#define GPR_FORBID_UNREACHABLE_CODE #define GPR_MSG_IOVLEN_TYPE int #if TARGET_OS_IPHONE +#define GPR_FORBID_UNREACHABLE_CODE #define GPR_PLATFORM_STRING "ios" #define GPR_CPU_IPHONE 1 #define GPR_PTHREAD_TLS 1 diff --git a/include/grpc/support/slice_buffer.h b/include/grpc/support/slice_buffer.h index e4f296c32a..f9267177ab 100644 --- a/include/grpc/support/slice_buffer.h +++ b/include/grpc/support/slice_buffer.h @@ -89,6 +89,11 @@ void gpr_slice_buffer_move_into(gpr_slice_buffer *src, gpr_slice_buffer *dst); /* remove n bytes from the end of a slice buffer */ void gpr_slice_buffer_trim_end(gpr_slice_buffer *src, size_t n, gpr_slice_buffer *garbage); +/* move the first n bytes of src into dst */ +void gpr_slice_buffer_move_first(gpr_slice_buffer *src, size_t n, + gpr_slice_buffer *dst); +/* take the first slice in the slice buffer */ +gpr_slice gpr_slice_buffer_take_first(gpr_slice_buffer *src); #ifdef __cplusplus } diff --git a/src/core/profiling/basic_timers.c b/src/core/profiling/basic_timers.c index b49cdd07b3..527a160101 100644 --- a/src/core/profiling/basic_timers.c +++ b/src/core/profiling/basic_timers.c @@ -50,12 +50,12 @@ typedef struct gpr_timer_entry { gpr_timespec tm; const char *tagstr; const char *file; - int line; + short line; char type; gpr_uint8 important; } gpr_timer_entry; -#define MAX_COUNT (1024 * 1024 / sizeof(gpr_timer_entry)) +#define MAX_COUNT (5 * 1024 * 1024 / sizeof(gpr_timer_entry)) static __thread gpr_timer_entry g_log[MAX_COUNT]; static __thread int g_count; @@ -102,7 +102,7 @@ static void gpr_timers_log_add(const char *tagstr, marker_type type, entry->tagstr = tagstr; entry->type = type; entry->file = file; - entry->line = line; + entry->line = (short)line; entry->important = important != 0; } diff --git a/src/core/support/slice_buffer.c b/src/core/support/slice_buffer.c index 310fbe1350..17c0b6c0f6 100644 --- a/src/core/support/slice_buffer.c +++ b/src/core/support/slice_buffer.c @@ -31,6 +31,7 @@ * */ +#include <grpc/support/port_platform.h> #include <grpc/support/slice_buffer.h> #include <string.h> @@ -208,6 +209,44 @@ void gpr_slice_buffer_move_into(gpr_slice_buffer *src, gpr_slice_buffer *dst) { src->length = 0; } +void gpr_slice_buffer_move_first(gpr_slice_buffer *src, size_t n, + gpr_slice_buffer *dst) { + size_t src_idx; + size_t output_len = dst->length + n; + size_t new_input_len = src->length - n; + GPR_ASSERT(src->length >= n); + if (src->length == n) { + gpr_slice_buffer_move_into(src, dst); + return; + } + src_idx = 0; + for (;;) { + gpr_slice slice = src->slices[src_idx]; + size_t slice_len = GPR_SLICE_LENGTH(slice); + if (n > slice_len) { + gpr_slice_buffer_add(dst, slice); + n -= slice_len; + src_idx++; + } else if (n == slice_len) { + gpr_slice_buffer_add(dst, slice); + src_idx++; + break; + } else { /* n < slice_len */ + src->slices[src_idx] = gpr_slice_split_tail(&slice, n); + GPR_ASSERT(GPR_SLICE_LENGTH(slice) == n); + GPR_ASSERT(GPR_SLICE_LENGTH(src->slices[src_idx]) == slice_len - n); + gpr_slice_buffer_add(dst, slice); + break; + } + } + GPR_ASSERT(dst->length == output_len); + memmove(src->slices, src->slices + src_idx, + sizeof(gpr_slice) * (src->count - src_idx)); + src->count -= src_idx; + src->length = new_input_len; + GPR_ASSERT(src->count > 0); +} + void gpr_slice_buffer_trim_end(gpr_slice_buffer *sb, size_t n, gpr_slice_buffer *garbage) { GPR_ASSERT(n <= sb->length); @@ -231,3 +270,13 @@ void gpr_slice_buffer_trim_end(gpr_slice_buffer *sb, size_t n, } } } + +gpr_slice gpr_slice_buffer_take_first(gpr_slice_buffer *sb) { + gpr_slice slice; + GPR_ASSERT(sb->count > 0); + slice = sb->slices[0]; + memmove(&sb->slices[0], &sb->slices[1], (sb->count - 1) * sizeof(gpr_slice)); + sb->count--; + sb->length -= GPR_SLICE_LENGTH(slice); + return slice; +} diff --git a/src/core/support/sync_posix.c b/src/core/support/sync_posix.c index 39c96feb13..d6a0f7c325 100644 --- a/src/core/support/sync_posix.c +++ b/src/core/support/sync_posix.c @@ -59,8 +59,11 @@ void gpr_mu_unlock(gpr_mu* mu) { } int gpr_mu_trylock(gpr_mu* mu) { - int err = pthread_mutex_trylock(mu); + int err; + GPR_TIMER_BEGIN("gpr_mu_trylock", 0); + err = pthread_mutex_trylock(mu); GPR_ASSERT(err == 0 || err == EBUSY); + GPR_TIMER_END("gpr_mu_trylock", 0); return err == 0; } diff --git a/src/cpp/common/alarm.cc b/src/cpp/common/alarm.cc index bce0b174f8..1f0f04175e 100644 --- a/src/cpp/common/alarm.cc +++ b/src/cpp/common/alarm.cc @@ -38,12 +38,8 @@ namespace grpc { Alarm::Alarm(CompletionQueue* cq, gpr_timespec deadline, void* tag) : alarm_(grpc_alarm_create(cq->cq(), deadline, tag)) {} -Alarm::~Alarm() { - grpc_alarm_destroy(alarm_); -} +Alarm::~Alarm() { grpc_alarm_destroy(alarm_); } -void Alarm::Cancel() { - grpc_alarm_cancel(alarm_); -} +void Alarm::Cancel() { grpc_alarm_cancel(alarm_); } } // namespace grpc diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index f5063a079e..695e811654 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -388,6 +388,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) { shutdown_ = true; grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest()); cq_.Shutdown(); + lock.unlock(); // Spin, eating requests until the completion queue is completely shutdown. // If the deadline expires then cancel anything that's pending and keep // spinning forever until the work is actually drained. @@ -403,6 +404,7 @@ void Server::ShutdownInternal(gpr_timespec deadline) { SyncRequest::CallData call_data(this, request); } } + lock.lock(); // Wait for running callbacks to finish. while (num_running_cb_ != 0) { |