diff options
author | Sree Kuchibhotla <sreek@google.com> | 2018-04-11 15:26:56 -0700 |
---|---|---|
committer | Sree Kuchibhotla <sreek@google.com> | 2018-04-11 15:26:56 -0700 |
commit | 7b011b296e252403cd47dc36a20cb4b8cdcaba5b (patch) | |
tree | da0e08c9bb990fe00d9640739973ab4a67e31d60 /src | |
parent | f2f5a9a0a75b2eba8d950345aeec89b4275f7b59 (diff) | |
parent | 71e8aee5e7861134fa8c5d49d41d569df981e88a (diff) |
Merge branch 'master' into fix-time
Diffstat (limited to 'src')
50 files changed, 372 insertions, 264 deletions
diff --git a/src/compiler/objective_c_generator.cc b/src/compiler/objective_c_generator.cc index ffdeb8f6b0..39f68cb956 100644 --- a/src/compiler/objective_c_generator.cc +++ b/src/compiler/objective_c_generator.cc @@ -206,7 +206,7 @@ void PrintMethodImplementations(Printer* printer, } } for (auto one_class : classes) { - output += " @class " + one_class + ";\n"; + output += "@class " + one_class + ";\n"; } return output; diff --git a/src/compiler/objective_c_plugin.cc b/src/compiler/objective_c_plugin.cc index 76703d79cd..f0fe3688cc 100644 --- a/src/compiler/objective_c_plugin.cc +++ b/src/compiler/objective_c_plugin.cc @@ -118,11 +118,11 @@ class ObjectiveCGrpcGenerator : public grpc::protobuf::compiler::CodeGenerator { Write(context, file_name + ".pbrpc.h", PreprocIfNot(kForwardDeclare, imports) + "\n" + PreprocIfNot(kProtocolOnly, system_imports) + "\n" + - PreprocIfElse(kForwardDeclare, class_declarations, - class_imports) + - "\n" + forward_declarations + "\n" + kNonNullBegin + "\n" + - protocols + "\n" + PreprocIfNot(kProtocolOnly, interfaces) + - "\n" + kNonNullEnd + "\n"); + class_declarations + "\n" + + PreprocIfNot(kForwardDeclare, class_imports) + "\n" + + forward_declarations + "\n" + kNonNullBegin + "\n" + protocols + + "\n" + PreprocIfNot(kProtocolOnly, interfaces) + "\n" + + kNonNullEnd + "\n"); } { diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc index 51f9ae000a..a10bfea8b1 100644 --- a/src/core/ext/filters/client_channel/client_channel.cc +++ b/src/core/ext/filters/client_channel/client_channel.cc @@ -924,7 +924,9 @@ typedef struct client_channel_call_data { // Note: We inline the cache for the first 3 send_message ops and use // dynamic allocation after that. This number was essentially picked // at random; it could be changed in the future to tune performance. - grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3> send_messages; + grpc_core::ManualConstructor< + grpc_core::InlinedVector<grpc_core::ByteStreamCache*, 3>> + send_messages; // send_trailing_metadata bool seen_send_trailing_metadata; grpc_linked_mdelem* send_trailing_metadata_storage; @@ -974,7 +976,7 @@ static void maybe_cache_send_ops_for_batch(call_data* calld, gpr_arena_alloc(calld->arena, sizeof(grpc_core::ByteStreamCache))); new (cache) grpc_core::ByteStreamCache( std::move(batch->payload->send_message.send_message)); - calld->send_messages.push_back(cache); + calld->send_messages->push_back(cache); } // Save metadata batch for send_trailing_metadata ops. if (batch->send_trailing_metadata) { @@ -1008,7 +1010,7 @@ static void free_cached_send_op_data_after_commit( "]", chand, calld, i); } - calld->send_messages[i]->Destroy(); + (*calld->send_messages)[i]->Destroy(); } if (retry_state->completed_send_trailing_metadata) { grpc_metadata_batch_destroy(&calld->send_trailing_metadata); @@ -1032,7 +1034,7 @@ static void free_cached_send_op_data_for_completed_batch( "]", chand, calld, retry_state->completed_send_message_count - 1); } - calld->send_messages[retry_state->completed_send_message_count - 1] + (*calld->send_messages)[retry_state->completed_send_message_count - 1] ->Destroy(); } if (batch_data->batch.send_trailing_metadata) { @@ -1280,7 +1282,8 @@ static bool pending_batch_is_completed( return false; } if (pending->batch->send_message && - retry_state->completed_send_message_count < calld->send_messages.size()) { + retry_state->completed_send_message_count < + calld->send_messages->size()) { return false; } if (pending->batch->send_trailing_metadata && @@ -1315,7 +1318,7 @@ static bool pending_batch_is_unstarted( return true; } if (pending->batch->send_message && - retry_state->started_send_message_count < calld->send_messages.size()) { + retry_state->started_send_message_count < calld->send_messages->size()) { return true; } if (pending->batch->send_trailing_metadata && @@ -1817,7 +1820,7 @@ static void add_closures_for_replay_or_pending_send_ops( channel_data* chand = static_cast<channel_data*>(elem->channel_data); call_data* calld = static_cast<call_data*>(elem->call_data); bool have_pending_send_message_ops = - retry_state->started_send_message_count < calld->send_messages.size(); + retry_state->started_send_message_count < calld->send_messages->size(); bool have_pending_send_trailing_metadata_op = calld->seen_send_trailing_metadata && !retry_state->started_send_trailing_metadata; @@ -2133,7 +2136,7 @@ static void add_retriable_send_message_op( chand, calld, retry_state->started_send_message_count); } grpc_core::ByteStreamCache* cache = - calld->send_messages[retry_state->started_send_message_count]; + (*calld->send_messages)[retry_state->started_send_message_count]; ++retry_state->started_send_message_count; batch_data->send_message.Init(cache); batch_data->batch.send_message = true; @@ -2254,7 +2257,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( } // send_message. // Note that we can only have one send_message op in flight at a time. - if (retry_state->started_send_message_count < calld->send_messages.size() && + if (retry_state->started_send_message_count < calld->send_messages->size() && retry_state->started_send_message_count == retry_state->completed_send_message_count && !calld->pending_send_message) { @@ -2274,7 +2277,7 @@ static subchannel_batch_data* maybe_create_subchannel_batch_for_replay( // to start, since we can't send down any more send_message ops after // send_trailing_metadata. if (calld->seen_send_trailing_metadata && - retry_state->started_send_message_count == calld->send_messages.size() && + retry_state->started_send_message_count == calld->send_messages->size() && !retry_state->started_send_trailing_metadata && !calld->pending_send_trailing_metadata) { if (grpc_client_channel_trace.enabled()) { @@ -2325,7 +2328,7 @@ static void add_subchannel_batches_for_pending_batches( // send_message ops after send_trailing_metadata. if (batch->send_trailing_metadata && (retry_state->started_send_message_count + batch->send_message < - calld->send_messages.size() || + calld->send_messages->size() || retry_state->started_send_trailing_metadata)) { continue; } @@ -2976,6 +2979,7 @@ static grpc_error* cc_init_call_elem(grpc_call_element* elem, calld->deadline); } calld->enable_retries = chand->enable_retries; + calld->send_messages.Init(); return GRPC_ERROR_NONE; } @@ -3011,6 +3015,7 @@ static void cc_destroy_call_elem(grpc_call_element* elem, calld->pick.subchannel_call_context[i].value); } } + calld->send_messages.Destroy(); GRPC_CLOSURE_SCHED(then_schedule_closure, GRPC_ERROR_NONE); } diff --git a/src/core/lib/debug/trace.cc b/src/core/lib/debug/trace.cc index b0e0f2ba7c..01c1e867d9 100644 --- a/src/core/lib/debug/trace.cc +++ b/src/core/lib/debug/trace.cc @@ -55,7 +55,8 @@ bool TraceFlagList::Set(const char* name, bool enabled) { found = true; } } - if (!found) { + // check for unknowns, but ignore "", to allow to GRPC_TRACE= + if (!found && 0 != strcmp(name, "")) { gpr_log(GPR_ERROR, "Unknown trace var: '%s'", name); return false; /* early return */ } diff --git a/src/core/lib/gprpp/inlined_vector.h b/src/core/lib/gprpp/inlined_vector.h index ca95aecddc..f36f6cb706 100644 --- a/src/core/lib/gprpp/inlined_vector.h +++ b/src/core/lib/gprpp/inlined_vector.h @@ -54,43 +54,43 @@ class InlinedVector { InlinedVector(const InlinedVector&) = delete; InlinedVector& operator=(const InlinedVector&) = delete; + T* data() { + return dynamic_ != nullptr ? dynamic_ : reinterpret_cast<T*>(inline_); + } + + const T* data() const { + return dynamic_ != nullptr ? dynamic_ : reinterpret_cast<const T*>(inline_); + } + T& operator[](size_t offset) { assert(offset < size_); - if (offset < N) { - return *reinterpret_cast<T*>(inline_ + offset); - } else { - return dynamic_[offset - N]; - } + return data()[offset]; } const T& operator[](size_t offset) const { assert(offset < size_); - if (offset < N) { - return *reinterpret_cast<const T*>(inline_ + offset); - } else { - return dynamic_[offset - N]; + return data()[offset]; + } + + void reserve(size_t capacity) { + if (capacity > capacity_) { + T* new_dynamic = static_cast<T*>(gpr_malloc(sizeof(T) * capacity)); + for (size_t i = 0; i < size_; ++i) { + new (&new_dynamic[i]) T(std::move(data()[i])); + data()[i].~T(); + } + gpr_free(dynamic_); + dynamic_ = new_dynamic; + capacity_ = capacity; } } template <typename... Args> void emplace_back(Args&&... args) { - if (size_ < N) { - new (&inline_[size_]) T(std::forward<Args>(args)...); - } else { - if (size_ - N == dynamic_capacity_) { - size_t new_capacity = - dynamic_capacity_ == 0 ? 2 : dynamic_capacity_ * 2; - T* new_dynamic = static_cast<T*>(gpr_malloc(sizeof(T) * new_capacity)); - for (size_t i = 0; i < dynamic_capacity_; ++i) { - new (&new_dynamic[i]) T(std::move(dynamic_[i])); - dynamic_[i].~T(); - } - gpr_free(dynamic_); - dynamic_ = new_dynamic; - dynamic_capacity_ = new_capacity; - } - new (&dynamic_[size_ - N]) T(std::forward<Args>(args)...); + if (size_ == capacity_) { + reserve(capacity_ * 2); } + new (&(data()[size_])) T(std::forward<Args>(args)...); ++size_; } @@ -99,6 +99,7 @@ class InlinedVector { void push_back(T&& value) { emplace_back(std::move(value)); } size_t size() const { return size_; } + size_t capacity() const { return capacity_; } void clear() { destroy_elements(); @@ -109,26 +110,21 @@ class InlinedVector { void init_data() { dynamic_ = nullptr; size_ = 0; - dynamic_capacity_ = 0; + capacity_ = N; } void destroy_elements() { - for (size_t i = 0; i < size_ && i < N; ++i) { - T& value = *reinterpret_cast<T*>(inline_ + i); + for (size_t i = 0; i < size_; ++i) { + T& value = data()[i]; value.~T(); } - if (size_ > N) { // Avoid subtracting two signed values. - for (size_t i = 0; i < size_ - N; ++i) { - dynamic_[i].~T(); - } - } gpr_free(dynamic_); } typename std::aligned_storage<sizeof(T)>::type inline_[N]; T* dynamic_; size_t size_; - size_t dynamic_capacity_; + size_t capacity_; }; } // namespace grpc_core diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 0ef7c03056..44d8cf2b1e 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -59,7 +59,10 @@ //#define GRPC_EPOLLEX_CREATE_WORKERS_ON_HEAP 1 #define MAX_EPOLL_EVENTS 100 -#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 1 +// TODO(juanlishen): We use a greater-than-one value here as a workaround fix to +// a keepalive ping timeout issue. We may want to revert https://github +// .com/grpc/grpc/pull/14943 once we figure out the root cause. +#define MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL 16 grpc_core::DebugOnlyTraceFlag grpc_trace_pollable_refcount(false, "pollable_refcount"); @@ -198,6 +201,7 @@ struct grpc_pollset_worker { struct grpc_pollset { gpr_mu mu; + gpr_atm worker_count; pollable* active_pollable; bool kicked_without_poller; grpc_closure* shutdown_closure; @@ -685,6 +689,7 @@ static grpc_error* pollset_kick_all(grpc_pollset* pollset) { static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) { gpr_mu_init(&pollset->mu); + gpr_atm_no_barrier_store(&pollset->worker_count, 0); pollset->active_pollable = POLLABLE_REF(g_empty_pollable, "pollset"); pollset->kicked_without_poller = false; pollset->shutdown_closure = nullptr; @@ -758,8 +763,20 @@ static grpc_error* pollable_process_events(grpc_pollset* pollset, pollable* pollable_obj, bool drain) { GPR_TIMER_SCOPE("pollable_process_events", 0); static const char* err_desc = "pollset_process_events"; + // Use a simple heuristic to determine how many fd events to process + // per loop iteration. (events/workers) + int handle_count = 1; + int worker_count = gpr_atm_no_barrier_load(&pollset->worker_count); + GPR_ASSERT(worker_count > 0); + handle_count = + (pollable_obj->event_count - pollable_obj->event_cursor) / worker_count; + if (handle_count == 0) { + handle_count = 1; + } else if (handle_count > MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) { + handle_count = MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL; + } grpc_error* error = GRPC_ERROR_NONE; - for (int i = 0; (drain || i < MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL) && + for (int i = 0; (drain || i < handle_count) && pollable_obj->event_cursor != pollable_obj->event_count; i++) { int n = pollable_obj->event_cursor++; @@ -884,6 +901,7 @@ static bool begin_worker(grpc_pollset* pollset, grpc_pollset_worker* worker, GPR_TIMER_SCOPE("begin_worker", 0); bool do_poll = (pollset->shutdown_closure == nullptr && !pollset->already_shutdown); + gpr_atm_no_barrier_fetch_add(&pollset->worker_count, 1); if (worker_hdl != nullptr) *worker_hdl = worker; worker->initialized_cv = false; worker->kicked = false; @@ -964,6 +982,7 @@ static void end_worker(grpc_pollset* pollset, grpc_pollset_worker* worker, if (worker->initialized_cv) { gpr_cv_destroy(&worker->cv); } + gpr_atm_no_barrier_fetch_add(&pollset->worker_count, -1); } #ifndef NDEBUG diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index 2e375b4022..d9aba9b6a3 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -1530,6 +1530,12 @@ static void run_poll(void* args) { // This function overrides poll() to handle condition variable wakeup fds static int cvfd_poll(struct pollfd* fds, nfds_t nfds, int timeout) { + if (timeout == 0) { + // Don't bother using background threads for polling if timeout is 0, + // poll-cv might not wait for a poll to return otherwise. + // https://github.com/grpc/grpc/issues/13298 + return poll(fds, nfds, 0); + } unsigned int i; int res, idx; grpc_cv_node* pollcv; diff --git a/src/core/lib/iomgr/socket_utils_linux.cc b/src/core/lib/iomgr/socket_utils_linux.cc index b0207578de..34f93cc4b0 100644 --- a/src/core/lib/iomgr/socket_utils_linux.cc +++ b/src/core/lib/iomgr/socket_utils_linux.cc @@ -33,7 +33,6 @@ int grpc_accept4(int sockfd, grpc_resolved_address* resolved_addr, int nonblock, int cloexec) { int flags = 0; - GPR_ASSERT(sizeof(socklen_t) <= sizeof(size_t)); flags |= nonblock ? SOCK_NONBLOCK : 0; flags |= cloexec ? SOCK_CLOEXEC : 0; return accept4(sockfd, reinterpret_cast<grpc_sockaddr*>(resolved_addr->addr), diff --git a/src/core/lib/iomgr/socket_utils_posix.cc b/src/core/lib/iomgr/socket_utils_posix.cc index 2a49583ac4..c48da52ffb 100644 --- a/src/core/lib/iomgr/socket_utils_posix.cc +++ b/src/core/lib/iomgr/socket_utils_posix.cc @@ -34,9 +34,8 @@ int grpc_accept4(int sockfd, grpc_resolved_address* resolved_addr, int nonblock, int cloexec) { int fd, flags; - GPR_ASSERT(sizeof(socklen_t) <= sizeof(size_t)); - fd = accept(sockfd, (grpc_sockaddr*)resolved_addr->addr, - (socklen_t*)&resolved_addr->len); + fd = accept(sockfd, reinterpret_cast<grpc_sockaddr*>(resolved_addr->addr), + &resolved_addr->len); if (fd >= 0) { if (nonblock) { flags = fcntl(fd, F_GETFL, 0); diff --git a/src/core/lib/iomgr/socket_utils_uv.cc b/src/core/lib/iomgr/socket_utils_uv.cc index 8538abc7e4..7eba40c46b 100644 --- a/src/core/lib/iomgr/socket_utils_uv.cc +++ b/src/core/lib/iomgr/socket_utils_uv.cc @@ -38,8 +38,8 @@ int grpc_inet_pton(int af, const char* src, void* dst) { } const char* grpc_inet_ntop(int af, const void* src, char* dst, size_t size) { - /* Windows InetNtopA wants a mutable ip pointer */ - return inet_ntop(af, src, dst, (socklen_t)size); + uv_inet_ntop(af, src, dst, size); + return dst; } #endif /* GRPC_UV */ diff --git a/src/core/lib/iomgr/tcp_custom.h b/src/core/lib/iomgr/tcp_custom.h index 22caa149f8..784ef84222 100644 --- a/src/core/lib/iomgr/tcp_custom.h +++ b/src/core/lib/iomgr/tcp_custom.h @@ -62,8 +62,6 @@ typedef struct grpc_socket_vtable { const grpc_sockaddr* addr, int* len); grpc_error* (*getsockname)(grpc_custom_socket* socket, const grpc_sockaddr* addr, int* len); - grpc_error* (*setsockopt)(grpc_custom_socket* socket, int level, int optname, - const void* optval, uint32_t optlen); grpc_error* (*bind)(grpc_custom_socket* socket, const grpc_sockaddr* addr, size_t len, int flags); grpc_error* (*listen)(grpc_custom_socket* socket); diff --git a/src/core/lib/iomgr/tcp_server_custom.cc b/src/core/lib/iomgr/tcp_server_custom.cc index be92e61b62..79ba5c39ee 100644 --- a/src/core/lib/iomgr/tcp_server_custom.cc +++ b/src/core/lib/iomgr/tcp_server_custom.cc @@ -393,13 +393,6 @@ static grpc_error* tcp_server_add_port(grpc_tcp_server* s, grpc_custom_socket_vtable->init(socket, family); if (error == GRPC_ERROR_NONE) { -#if defined(GPR_LINUX) && defined(SO_REUSEPORT) - if (family == AF_INET || family == AF_INET6) { - int enable = 1; - grpc_custom_socket_vtable->setsockopt(socket, SOL_SOCKET, SO_REUSEPORT, - &enable, sizeof(enable)); - } -#endif /* GPR_LINUX && SO_REUSEPORT */ error = add_socket_to_server(s, socket, addr, port_index, &sp); } gpr_free(allocated_addr); diff --git a/src/core/lib/iomgr/tcp_server_windows.cc b/src/core/lib/iomgr/tcp_server_windows.cc index 77f3811dca..b01afdcc9d 100644 --- a/src/core/lib/iomgr/tcp_server_windows.cc +++ b/src/core/lib/iomgr/tcp_server_windows.cc @@ -129,6 +129,7 @@ static void destroy_server(void* arg, grpc_error* error) { gpr_free(sp); } grpc_channel_args_destroy(s->channel_args); + gpr_mu_destroy(&s->mu); gpr_free(s); } diff --git a/src/core/lib/iomgr/tcp_uv.cc b/src/core/lib/iomgr/tcp_uv.cc index 5e3166926b..f20f8dcb74 100644 --- a/src/core/lib/iomgr/tcp_uv.cc +++ b/src/core/lib/iomgr/tcp_uv.cc @@ -192,6 +192,15 @@ static grpc_error* uv_socket_init_helper(uv_socket_t* uv_socket, int domain) { if (status != 0) { return tcp_error_create("Failed to initialize UV tcp handle", status); } +#if defined(GPR_LINUX) && defined(SO_REUSEPORT) + if (domain == AF_INET || domain == AF_INET6) { + int enable = 1; + int fd; + uv_fileno((uv_handle_t*)tcp, &fd); + // TODO Handle error here. + setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &enable, sizeof(enable)); + } +#endif uv_socket->write_buffers = nullptr; uv_socket->read_len = 0; uv_tcp_nodelay(uv_socket->handle, 1); @@ -299,17 +308,6 @@ static grpc_error* uv_socket_listen(grpc_custom_socket* socket) { return tcp_error_create("Failed to listen to port", status); } -static grpc_error* uv_socket_setsockopt(grpc_custom_socket* socket, int level, - int option_name, const void* optval, - socklen_t option_len) { - int fd; - uv_socket_t* uv_socket = (uv_socket_t*)socket->impl; - uv_fileno((uv_handle_t*)uv_socket->handle, &fd); - // TODO Handle error here. Also, does this work on windows?? - setsockopt(fd, level, option_name, &optval, (socklen_t)option_len); - return GRPC_ERROR_NONE; -} - static void uv_tc_on_connect(uv_connect_t* req, int status) { grpc_custom_socket* socket = (grpc_custom_socket*)req->data; uv_socket_t* uv_socket = (uv_socket_t*)socket->impl; @@ -340,7 +338,6 @@ static void uv_socket_connect(grpc_custom_socket* socket, static grpc_resolved_addresses* handle_addrinfo_result( struct addrinfo* result) { struct addrinfo* resp; - struct addrinfo* prev; size_t i; grpc_resolved_addresses* addresses = (grpc_resolved_addresses*)gpr_malloc(sizeof(grpc_resolved_addresses)); @@ -350,16 +347,13 @@ static grpc_resolved_addresses* handle_addrinfo_result( } addresses->addrs = (grpc_resolved_address*)gpr_malloc( sizeof(grpc_resolved_address) * addresses->naddrs); - i = 0; - resp = result; - while (resp != nullptr) { + for (resp = result, i = 0; resp != nullptr; resp = resp->ai_next, i++) { memcpy(&addresses->addrs[i].addr, resp->ai_addr, resp->ai_addrlen); addresses->addrs[i].len = resp->ai_addrlen; - i++; - prev = resp; - resp = resp->ai_next; - gpr_free(prev); } + // addrinfo objects are allocated by libuv (e.g. in uv_getaddrinfo) + // and not by gpr_malloc + uv_freeaddrinfo(result); return addresses; } @@ -415,10 +409,9 @@ static void uv_resolve_async(grpc_custom_resolver* r, char* host, char* port) { grpc_custom_resolver_vtable uv_resolver_vtable = {uv_resolve, uv_resolve_async}; grpc_socket_vtable grpc_uv_socket_vtable = { - uv_socket_init, uv_socket_connect, uv_socket_destroy, - uv_socket_shutdown, uv_socket_close, uv_socket_write, - uv_socket_read, uv_socket_getpeername, uv_socket_getsockname, - uv_socket_setsockopt, uv_socket_bind, uv_socket_listen, - uv_socket_accept}; + uv_socket_init, uv_socket_connect, uv_socket_destroy, + uv_socket_shutdown, uv_socket_close, uv_socket_write, + uv_socket_read, uv_socket_getpeername, uv_socket_getsockname, + uv_socket_bind, uv_socket_listen, uv_socket_accept}; #endif diff --git a/src/core/lib/iomgr/timer_generic.cc b/src/core/lib/iomgr/timer_generic.cc index 9a4da59624..e8f14dd2e5 100644 --- a/src/core/lib/iomgr/timer_generic.cc +++ b/src/core/lib/iomgr/timer_generic.cc @@ -98,6 +98,12 @@ static void init_timer_ht() { } } +static void destroy_timer_ht() { + for (int i = 0; i < NUM_HASH_BUCKETS; i++) { + gpr_mu_destroy(&g_hash_mu[i]); + } +} + static bool is_in_ht(grpc_timer* t) { size_t i = GPR_HASH_POINTER(t, NUM_HASH_BUCKETS); @@ -189,6 +195,7 @@ static void validate_non_pending_timer(grpc_timer* t) { } #define INIT_TIMER_HASH_TABLE() init_timer_ht() +#define DESTROY_TIMER_HASH_TABLE() destroy_timer_ht() #define ADD_TO_HASH_TABLE(t) add_to_ht((t)) #define REMOVE_FROM_HASH_TABLE(t) remove_from_ht((t)) #define VALIDATE_NON_PENDING_TIMER(t) validate_non_pending_timer((t)) @@ -196,6 +203,7 @@ static void validate_non_pending_timer(grpc_timer* t) { #else #define INIT_TIMER_HASH_TABLE() +#define DESTROY_TIMER_HASH_TABLE() #define ADD_TO_HASH_TABLE(t) #define REMOVE_FROM_HASH_TABLE(t) #define VALIDATE_NON_PENDING_TIMER(t) @@ -299,6 +307,8 @@ static void timer_list_shutdown() { gpr_free(g_shards); gpr_free(g_shard_queue); g_shared_mutables.initialized = false; + + DESTROY_TIMER_HASH_TABLE(); } /* returns true if the first element in the list */ diff --git a/src/core/lib/slice/slice.cc b/src/core/lib/slice/slice.cc index 585b41cf91..419474129b 100644 --- a/src/core/lib/slice/slice.cc +++ b/src/core/lib/slice/slice.cc @@ -69,8 +69,12 @@ grpc_slice grpc_slice_ref(grpc_slice slice) { /* Public API */ void grpc_slice_unref(grpc_slice slice) { - grpc_core::ExecCtx exec_ctx; - grpc_slice_unref_internal(slice); + if (grpc_core::ExecCtx::Get() == nullptr) { + grpc_core::ExecCtx exec_ctx; + grpc_slice_unref_internal(slice); + } else { + grpc_slice_unref_internal(slice); + } } /* grpc_slice_from_static_string support structure - a refcount that does diff --git a/src/core/lib/slice/slice_buffer.cc b/src/core/lib/slice/slice_buffer.cc index 58ca495517..fd56997388 100644 --- a/src/core/lib/slice/slice_buffer.cc +++ b/src/core/lib/slice/slice_buffer.cc @@ -75,8 +75,12 @@ void grpc_slice_buffer_destroy_internal(grpc_slice_buffer* sb) { } void grpc_slice_buffer_destroy(grpc_slice_buffer* sb) { - grpc_core::ExecCtx exec_ctx; - grpc_slice_buffer_destroy_internal(sb); + if (grpc_core::ExecCtx::Get() == nullptr) { + grpc_core::ExecCtx exec_ctx; + grpc_slice_buffer_destroy_internal(sb); + } else { + grpc_slice_buffer_destroy_internal(sb); + } } uint8_t* grpc_slice_buffer_tiny_add(grpc_slice_buffer* sb, size_t n) { @@ -176,8 +180,12 @@ void grpc_slice_buffer_reset_and_unref_internal(grpc_slice_buffer* sb) { } void grpc_slice_buffer_reset_and_unref(grpc_slice_buffer* sb) { - grpc_core::ExecCtx exec_ctx; - grpc_slice_buffer_reset_and_unref_internal(sb); + if (grpc_core::ExecCtx::Get() == nullptr) { + grpc_core::ExecCtx exec_ctx; + grpc_slice_buffer_reset_and_unref_internal(sb); + } else { + grpc_slice_buffer_reset_and_unref_internal(sb); + } } void grpc_slice_buffer_swap(grpc_slice_buffer* a, grpc_slice_buffer* b) { diff --git a/src/core/lib/surface/call.cc b/src/core/lib/surface/call.cc index c683cc02de..9a9113643d 100644 --- a/src/core/lib/surface/call.cc +++ b/src/core/lib/surface/call.cc @@ -878,8 +878,8 @@ static void set_encodings_accepted_by_peer(grpc_call* call, grpc_mdelem mdel, } else { char* accept_encoding_entry_str = grpc_slice_to_c_string(accept_encoding_entry_slice); - gpr_log(GPR_ERROR, - "Invalid entry in accept encoding metadata: '%s'. Ignoring.", + gpr_log(GPR_DEBUG, + "Unknown entry in accept encoding metadata: '%s'. Ignoring.", accept_encoding_entry_str); gpr_free(accept_encoding_entry_str); } diff --git a/src/core/lib/surface/version.cc b/src/core/lib/surface/version.cc index be196a78bc..a712e10037 100644 --- a/src/core/lib/surface/version.cc +++ b/src/core/lib/surface/version.cc @@ -25,4 +25,4 @@ const char* grpc_version_string(void) { return "6.0.0-dev"; } -const char* grpc_g_stands_for(void) { return "gorgeous"; } +const char* grpc_g_stands_for(void) { return "glorious"; } diff --git a/src/cpp/common/core_codegen.cc b/src/cpp/common/core_codegen.cc index aa9788da76..619aacadaa 100644 --- a/src/cpp/common/core_codegen.cc +++ b/src/cpp/common/core_codegen.cc @@ -98,6 +98,10 @@ void CoreCodegen::grpc_byte_buffer_destroy(grpc_byte_buffer* bb) { ::grpc_byte_buffer_destroy(bb); } +size_t CoreCodegen::grpc_byte_buffer_length(grpc_byte_buffer* bb) { + return ::grpc_byte_buffer_length(bb); +} + grpc_call_error CoreCodegen::grpc_call_cancel_with_status( grpc_call* call, grpc_status_code status, const char* description, void* reserved) { @@ -135,6 +139,12 @@ grpc_slice CoreCodegen::grpc_slice_new_with_user_data(void* p, size_t len, return ::grpc_slice_new_with_user_data(p, len, destroy, user_data); } +grpc_slice CoreCodegen::grpc_slice_new_with_len(void* p, size_t len, + void (*destroy)(void*, + size_t)) { + return ::grpc_slice_new_with_len(p, len, destroy); +} + grpc_slice CoreCodegen::grpc_empty_slice() { return ::grpc_empty_slice(); } grpc_slice CoreCodegen::grpc_slice_malloc(size_t length) { diff --git a/src/cpp/common/version_cc.cc b/src/cpp/common/version_cc.cc index fb1723c816..d669ea21a9 100644 --- a/src/cpp/common/version_cc.cc +++ b/src/cpp/common/version_cc.cc @@ -22,5 +22,5 @@ #include <grpcpp/grpcpp.h> namespace grpc { -grpc::string Version() { return "1.11.0-dev"; } +grpc::string Version() { return "1.12.0-dev"; } } // namespace grpc diff --git a/src/cpp/util/byte_buffer_cc.cc b/src/cpp/util/byte_buffer_cc.cc index fbc1768bcc..8700f96d8d 100644 --- a/src/cpp/util/byte_buffer_cc.cc +++ b/src/cpp/util/byte_buffer_cc.cc @@ -25,32 +25,6 @@ namespace grpc { static internal::GrpcLibraryInitializer g_gli_initializer; -ByteBuffer::ByteBuffer(const Slice* slices, size_t nslices) { - // The following assertions check that the representation of a grpc::Slice is - // identical to that of a grpc_slice: it has a grpc_slice field, and nothing - // else. - static_assert(std::is_same<decltype(slices[0].slice_), grpc_slice>::value, - "Slice must have same representation as grpc_slice"); - static_assert(sizeof(Slice) == sizeof(grpc_slice), - "Slice must have same representation as grpc_slice"); - // The following assertions check that the representation of a ByteBuffer is - // identical to grpc_byte_buffer*: it has a grpc_byte_buffer* field, - // and nothing else. - static_assert(std::is_same<decltype(buffer_), grpc_byte_buffer*>::value, - "ByteBuffer must have same representation as " - "grpc_byte_buffer*"); - static_assert(sizeof(ByteBuffer) == sizeof(grpc_byte_buffer*), - "ByteBuffer must have same representation as " - "grpc_byte_buffer*"); - g_gli_initializer.summon(); // Make sure that initializer linked in - // The const_cast is legal if grpc_raw_byte_buffer_create() does no more - // than its advertised side effect of increasing the reference count of the - // slices it processes, and such an increase does not affect the semantics - // seen by the caller of this constructor. - buffer_ = grpc_raw_byte_buffer_create( - reinterpret_cast<grpc_slice*>(const_cast<Slice*>(slices)), nslices); -} - Status ByteBuffer::Dump(std::vector<Slice>* slices) const { slices->clear(); if (!buffer_) { @@ -69,14 +43,6 @@ Status ByteBuffer::Dump(std::vector<Slice>* slices) const { return Status::OK; } -size_t ByteBuffer::Length() const { - if (buffer_) { - return grpc_byte_buffer_length(buffer_); - } else { - return 0; - } -} - ByteBuffer::ByteBuffer(const ByteBuffer& buf) : buffer_(grpc_byte_buffer_copy(buf.buffer_)) {} @@ -90,10 +56,4 @@ ByteBuffer& ByteBuffer::operator=(const ByteBuffer& buf) { return *this; } -void ByteBuffer::Swap(ByteBuffer* other) { - grpc_byte_buffer* tmp = other->buffer_; - other->buffer_ = buffer_; - buffer_ = tmp; -} - } // namespace grpc diff --git a/src/cpp/util/slice_cc.cc b/src/cpp/util/slice_cc.cc deleted file mode 100644 index c72dbdbfc0..0000000000 --- a/src/cpp/util/slice_cc.cc +++ /dev/null @@ -1,55 +0,0 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpc/slice.h> -#include <grpcpp/support/slice.h> - -namespace grpc { - -Slice::Slice() : slice_(grpc_empty_slice()) {} - -Slice::~Slice() { grpc_slice_unref(slice_); } - -Slice::Slice(grpc_slice slice, AddRef) : slice_(grpc_slice_ref(slice)) {} - -Slice::Slice(grpc_slice slice, StealRef) : slice_(slice) {} - -Slice::Slice(size_t len) : slice_(grpc_slice_malloc(len)) {} - -Slice::Slice(const void* buf, size_t len) - : slice_( - grpc_slice_from_copied_buffer(static_cast<const char*>(buf), len)) {} - -Slice::Slice(const grpc::string& str) - : slice_(grpc_slice_from_copied_buffer(str.c_str(), str.length())) {} - -Slice::Slice(const void* buf, size_t len, StaticSlice) - : slice_( - grpc_slice_from_static_buffer(static_cast<const char*>(buf), len)) {} - -Slice::Slice(const Slice& other) : slice_(grpc_slice_ref(other.slice_)) {} - -Slice::Slice(void* buf, size_t len, void (*destroy)(void*), void* user_data) - : slice_(grpc_slice_new_with_user_data(buf, len, destroy, user_data)) {} - -Slice::Slice(void* buf, size_t len, void (*destroy)(void*, size_t)) - : slice_(grpc_slice_new_with_len(buf, len, destroy)) {} - -grpc_slice Slice::c_slice() const { return grpc_slice_ref(slice_); } - -} // namespace grpc diff --git a/src/csharp/Grpc.Core/Version.csproj.include b/src/csharp/Grpc.Core/Version.csproj.include index 9b55f2469a..f7a7a5cbe9 100755 --- a/src/csharp/Grpc.Core/Version.csproj.include +++ b/src/csharp/Grpc.Core/Version.csproj.include @@ -1,7 +1,7 @@ <!-- This file is generated --> <Project> <PropertyGroup> - <GrpcCsharpVersion>1.11.0-dev</GrpcCsharpVersion> + <GrpcCsharpVersion>1.12.0-dev</GrpcCsharpVersion> <GoogleProtobufVersion>3.3.0</GoogleProtobufVersion> </PropertyGroup> </Project> diff --git a/src/csharp/Grpc.Core/VersionInfo.cs b/src/csharp/Grpc.Core/VersionInfo.cs index 2902aee8d9..06a0396c34 100644 --- a/src/csharp/Grpc.Core/VersionInfo.cs +++ b/src/csharp/Grpc.Core/VersionInfo.cs @@ -33,11 +33,11 @@ namespace Grpc.Core /// <summary> /// Current <c>AssemblyFileVersion</c> of gRPC C# assemblies /// </summary> - public const string CurrentAssemblyFileVersion = "1.11.0.0"; + public const string CurrentAssemblyFileVersion = "1.12.0.0"; /// <summary> /// Current version of gRPC C# /// </summary> - public const string CurrentVersion = "1.11.0-dev"; + public const string CurrentVersion = "1.12.0-dev"; } } diff --git a/src/csharp/build_packages_dotnetcli.bat b/src/csharp/build_packages_dotnetcli.bat index 76391738ff..0d65748697 100755 --- a/src/csharp/build_packages_dotnetcli.bat +++ b/src/csharp/build_packages_dotnetcli.bat @@ -13,7 +13,7 @@ @rem limitations under the License. @rem Current package versions -set VERSION=1.11.0-dev +set VERSION=1.12.0-dev @rem Adjust the location of nuget.exe set NUGET=C:\nuget\nuget.exe diff --git a/src/csharp/build_packages_dotnetcli.sh b/src/csharp/build_packages_dotnetcli.sh index 1b73614b91..66aba36089 100755 --- a/src/csharp/build_packages_dotnetcli.sh +++ b/src/csharp/build_packages_dotnetcli.sh @@ -45,7 +45,7 @@ dotnet pack --configuration Release Grpc.Auth --output ../../../artifacts dotnet pack --configuration Release Grpc.HealthCheck --output ../../../artifacts dotnet pack --configuration Release Grpc.Reflection --output ../../../artifacts -nuget pack Grpc.nuspec -Version "1.11.0-dev" -OutputDirectory ../../artifacts -nuget pack Grpc.Tools.nuspec -Version "1.11.0-dev" -OutputDirectory ../../artifacts +nuget pack Grpc.nuspec -Version "1.12.0-dev" -OutputDirectory ../../artifacts +nuget pack Grpc.Tools.nuspec -Version "1.12.0-dev" -OutputDirectory ../../artifacts (cd ../../artifacts && zip csharp_nugets_dotnetcli.zip *.nupkg) diff --git a/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec b/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec index 954beed8e1..f06312bea9 100644 --- a/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec +++ b/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec @@ -42,7 +42,7 @@ Pod::Spec.new do |s| # exclamation mark ensures that other "regular" pods will be able to find it as it'll be installed # before them. s.name = '!ProtoCompiler-gRPCPlugin' - v = '1.11.0-dev' + v = '1.12.0-dev' s.version = v s.summary = 'The gRPC ProtoC plugin generates Objective-C files from .proto services.' s.description = <<-DESC diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index cf70064298..501114dea0 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -355,17 +355,8 @@ static NSString * const kBearerPrefix = @"Bearer "; } dispatch_async(_callQueue, ^{ - __weak GRPCCall *weakSelf = self; - [self writeMessage:value withErrorHandler:^{ - __strong GRPCCall *strongSelf = weakSelf; - if (strongSelf != nil) { - [strongSelf maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeInternal - userInfo:nil]]; - // Wrapped call must be canceled when error is reported to upper layers - [strongSelf cancelCall]; - } - }]; + // Write error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT + [self writeMessage:value withErrorHandler:nil]; }); } @@ -387,15 +378,8 @@ static NSString * const kBearerPrefix = @"Bearer "; [self cancel]; } else { dispatch_async(_callQueue, ^{ - __weak GRPCCall *weakSelf = self; - [self finishRequestWithErrorHandler:^{ - __strong GRPCCall *strongSelf = weakSelf; - [strongSelf maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain - code:GRPCErrorCodeInternal - userInfo:nil]]; - // Wrapped call must be canceled when error is reported to upper layers - [strongSelf cancelCall]; - }]; + // EOS error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT + [self finishRequestWithErrorHandler:nil]; }); } } diff --git a/src/objective-c/GRPCClient/private/version.h b/src/objective-c/GRPCClient/private/version.h index 405c2fff9f..a99eb289c3 100644 --- a/src/objective-c/GRPCClient/private/version.h +++ b/src/objective-c/GRPCClient/private/version.h @@ -23,4 +23,4 @@ // `tools/buildgen/generate_projects.sh`. -#define GRPC_OBJC_VERSION_STRING @"1.11.0-dev" +#define GRPC_OBJC_VERSION_STRING @"1.12.0-dev" diff --git a/src/objective-c/tests/GRPCClientTests.m b/src/objective-c/tests/GRPCClientTests.m index 3bab7f6671..e716198009 100644 --- a/src/objective-c/tests/GRPCClientTests.m +++ b/src/objective-c/tests/GRPCClientTests.m @@ -31,6 +31,8 @@ #import <RxLibrary/GRXWriter+Immediate.h> #import <RxLibrary/GRXBufferedPipe.h> +#include <netinet/in.h> + #import "version.h" #define TEST_TIMEOUT 16 @@ -482,4 +484,40 @@ static GRPCProtoMethod *kFullDuplexCallMethod; [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; } +- (int)findFreePort { + struct sockaddr_in addr; + unsigned int addr_len = sizeof(addr); + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + int fd = socket(AF_INET, SOCK_STREAM, 0); + XCTAssertEqual(bind(fd, (struct sockaddr*)&addr, sizeof(addr)), 0); + XCTAssertEqual(getsockname(fd, (struct sockaddr*)&addr, &addr_len), 0); + XCTAssertEqual(addr_len, sizeof(addr)); + close(fd); + return addr.sin_port; +} + +- (void)testErrorCode { + int port = [self findFreePort]; + NSString * const kDummyAddress = [NSString stringWithFormat:@"localhost:%d", port]; + __weak XCTestExpectation *completion = [self expectationWithDescription:@"Empty RPC completed."]; + + GRPCCall *call = [[GRPCCall alloc] initWithHost:kDummyAddress + path:kEmptyCallMethod.HTTPPath + requestsWriter:[GRXWriter writerWithValue:[NSData data]]]; + + id<GRXWriteable> responsesWriteable = [[GRXWriteable alloc] initWithValueHandler:^(NSData *value) { + // Should not reach here + XCTAssert(NO); + } completionHandler:^(NSError *errorOrNil) { + XCTAssertNotNil(errorOrNil, @"Finished with no error"); + XCTAssertEqual(errorOrNil.code, GRPC_STATUS_UNAVAILABLE); + [completion fulfill]; + }]; + + [call startWithWriteable:responsesWriteable]; + + [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; +} + @end diff --git a/src/objective-c/tests/version.h b/src/objective-c/tests/version.h index 6f6cd25007..15fe1a59f3 100644 --- a/src/objective-c/tests/version.h +++ b/src/objective-c/tests/version.h @@ -23,5 +23,5 @@ // `tools/buildgen/generate_projects.sh`. -#define GRPC_OBJC_VERSION_STRING @"1.11.0-dev" +#define GRPC_OBJC_VERSION_STRING @"1.12.0-dev" #define GRPC_C_VERSION_STRING @"6.0.0-dev" diff --git a/src/php/composer.json b/src/php/composer.json index dbf0cc35fd..57d911db79 100644 --- a/src/php/composer.json +++ b/src/php/composer.json @@ -2,7 +2,7 @@ "name": "grpc/grpc-dev", "description": "gRPC library for PHP - for Developement use only", "license": "Apache-2.0", - "version": "1.11.0", + "version": "1.12.0", "require": { "php": ">=5.5.0", "google/protobuf": "^v3.3.0" diff --git a/src/php/ext/grpc/version.h b/src/php/ext/grpc/version.h index dd2a701ada..37df2768bf 100644 --- a/src/php/ext/grpc/version.h +++ b/src/php/ext/grpc/version.h @@ -20,6 +20,6 @@ #ifndef VERSION_H #define VERSION_H -#define PHP_GRPC_VERSION "1.11.0dev" +#define PHP_GRPC_VERSION "1.12.0dev" #endif /* VERSION_H */ diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd index f35aebadde..f5688d08cd 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd @@ -83,8 +83,6 @@ cdef extern from "src/core/lib/iomgr/tcp_custom.h": const grpc_sockaddr* addr, int* len); grpc_error* (*getsockname)(grpc_custom_socket* socket, const grpc_sockaddr* addr, int* len); - grpc_error* (*setsockopt)(grpc_custom_socket* socket, int level, int optname, - const void* optval, uint32_t len); grpc_error* (*bind)(grpc_custom_socket* socket, const grpc_sockaddr* addr, size_t len, int flags); grpc_error* (*listen)(grpc_custom_socket* socket); diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx index ba8c731891..31ef671aed 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx @@ -239,11 +239,6 @@ cdef grpc_error* socket_getsockname(grpc_custom_socket* socket, length[0] = c_addr.len return grpc_error_none() -cdef grpc_error* socket_setsockopt(grpc_custom_socket* socket, int level, int optname, - const void *optval, uint32_t optlen) with gil: - # No-op; we provide a default set of options - return grpc_error_none() - def applysockopts(s): s.setsockopt(gevent_socket.SOL_SOCKET, gevent_socket.SO_REUSEADDR, 1) s.setsockopt(gevent_socket.IPPROTO_TCP, gevent_socket.TCP_NODELAY, True) @@ -435,7 +430,6 @@ def init_grpc_gevent(): gevent_socket_vtable.read = socket_read gevent_socket_vtable.getpeername = socket_getpeername gevent_socket_vtable.getsockname = socket_getsockname - gevent_socket_vtable.setsockopt = socket_setsockopt gevent_socket_vtable.bind = socket_bind gevent_socket_vtable.listen = socket_listen gevent_socket_vtable.accept = socket_accept diff --git a/src/python/grpcio/grpc/_grpcio_metadata.py b/src/python/grpcio/grpc/_grpcio_metadata.py index 4a69d859fc..cb5da72f1f 100644 --- a/src/python/grpcio/grpc/_grpcio_metadata.py +++ b/src/python/grpcio/grpc/_grpcio_metadata.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc/_grpcio_metadata.py.template`!!! -__version__ = """1.11.0.dev0""" +__version__ = """1.12.0.dev0""" diff --git a/src/python/grpcio/grpc_version.py b/src/python/grpcio/grpc_version.py index 32e82493f3..de5a780abd 100644 --- a/src/python/grpcio/grpc_version.py +++ b/src/python/grpcio/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio/grpc_version.py.template`!!! -VERSION = '1.11.0.dev0' +VERSION = '1.12.0.dev0' diff --git a/src/python/grpcio_health_checking/grpc_version.py b/src/python/grpcio_health_checking/grpc_version.py index ad4c85cc12..afcd316e5c 100644 --- a/src/python/grpcio_health_checking/grpc_version.py +++ b/src/python/grpcio_health_checking/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_health_checking/grpc_version.py.template`!!! -VERSION = '1.11.0.dev0' +VERSION = '1.12.0.dev0' diff --git a/src/python/grpcio_reflection/grpc_version.py b/src/python/grpcio_reflection/grpc_version.py index 6322d847b1..824b73201d 100644 --- a/src/python/grpcio_reflection/grpc_version.py +++ b/src/python/grpcio_reflection/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_reflection/grpc_version.py.template`!!! -VERSION = '1.11.0.dev0' +VERSION = '1.12.0.dev0' diff --git a/src/python/grpcio_testing/grpc_version.py b/src/python/grpcio_testing/grpc_version.py index 1e75fea12e..5b1f4c4cc0 100644 --- a/src/python/grpcio_testing/grpc_version.py +++ b/src/python/grpcio_testing/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_testing/grpc_version.py.template`!!! -VERSION = '1.11.0.dev0' +VERSION = '1.12.0.dev0' diff --git a/src/python/grpcio_tests/grpc_version.py b/src/python/grpcio_tests/grpc_version.py index 0cd7bd257f..382f95018e 100644 --- a/src/python/grpcio_tests/grpc_version.py +++ b/src/python/grpcio_tests/grpc_version.py @@ -14,4 +14,4 @@ # AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_tests/grpc_version.py.template`!!! -VERSION = '1.11.0.dev0' +VERSION = '1.12.0.dev0' diff --git a/src/ruby/end2end/multiple_killed_watching_threads_driver.rb b/src/ruby/end2end/multiple_killed_watching_threads_driver.rb index 8f078cfbed..8ec2073d98 100755 --- a/src/ruby/end2end/multiple_killed_watching_threads_driver.rb +++ b/src/ruby/end2end/multiple_killed_watching_threads_driver.rb @@ -54,8 +54,14 @@ def run_multiple_killed_watches(num_threads, sleep_time) end def main + STDERR.puts '10 iterations, sleep 0.1 before killing thread' run_multiple_killed_watches(10, 0.1) + STDERR.puts '1000 iterations, sleep 0.001 before killing thread' run_multiple_killed_watches(1000, 0.001) + STDERR.puts '10000 iterations, sleep 0.00001 before killing thread' + run_multiple_killed_watches(10_000, 0.00001) + STDERR.puts '20000 iterations, sleep 0.00001 before killing thread' + run_multiple_killed_watches(20_000, 0.00001) end main diff --git a/src/ruby/ext/grpc/extconf.rb b/src/ruby/ext/grpc/extconf.rb index e8e87e4f15..4760f33e38 100644 --- a/src/ruby/ext/grpc/extconf.rb +++ b/src/ruby/ext/grpc/extconf.rb @@ -84,7 +84,7 @@ if grpc_config == 'gcov' end if grpc_config == 'dbg' - $CFLAGS << ' -O0' + $CFLAGS << ' -O0 -ggdb3' end $LDFLAGS << ' -Wl,-wrap,memcpy' if RUBY_PLATFORM =~ /linux/ diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index e8bfeb32a0..3f0dc530cf 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -315,7 +315,7 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE* argv, } typedef struct watch_state_stack { - grpc_channel* channel; + bg_watched_channel* bg_wrapped; gpr_timespec deadline; int last_state; } watch_state_stack; @@ -328,15 +328,15 @@ static void* wait_for_watch_state_op_complete_without_gvl(void* arg) { gpr_mu_lock(&global_connection_polling_mu); // its unsafe to do a "watch" after "channel polling abort" because the cq has // been shut down. - if (abort_channel_polling) { + if (abort_channel_polling || stack->bg_wrapped->channel_destroyed) { gpr_mu_unlock(&global_connection_polling_mu); return (void*)0; } op = gpr_zalloc(sizeof(watch_state_op)); op->op_type = WATCH_STATE_API; - grpc_channel_watch_connectivity_state(stack->channel, stack->last_state, - stack->deadline, channel_polling_cq, - op); + grpc_channel_watch_connectivity_state(stack->bg_wrapped->channel, + stack->last_state, stack->deadline, + channel_polling_cq, op); while (!op->op.api_callback_args.called_back) { gpr_cv_wait(&global_connection_polling_cv, &global_connection_polling_mu, @@ -388,7 +388,7 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, return Qnil; } - stack.channel = wrapper->bg_wrapped->channel; + stack.bg_wrapped = wrapper->bg_wrapped; stack.deadline = grpc_rb_time_timeval(deadline, 0), stack.last_state = NUM2LONG(last_state); diff --git a/src/ruby/lib/grpc/generic/bidi_call.rb b/src/ruby/lib/grpc/generic/bidi_call.rb index 086455db0b..ffb232b827 100644 --- a/src/ruby/lib/grpc/generic/bidi_call.rb +++ b/src/ruby/lib/grpc/generic/bidi_call.rb @@ -124,12 +124,18 @@ module GRPC def read_using_run_batch ops = { RECV_MESSAGE => nil } ops[RECV_INITIAL_METADATA] = nil unless @metadata_received - batch_result = @call.run_batch(ops) - unless @metadata_received - @call.metadata = batch_result.metadata - @metadata_received = true + begin + batch_result = @call.run_batch(ops) + unless @metadata_received + @call.metadata = batch_result.metadata + @metadata_received = true + end + batch_result + rescue GRPC::Core::CallError => e + GRPC.logger.warn('bidi call: read_using_run_batch failed') + GRPC.logger.warn(e) + nil end - batch_result end # set_output_stream_done is relevant on client-side @@ -155,7 +161,12 @@ module GRPC GRPC.logger.debug("bidi-write-loop: #{count} writes done") if is_client GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting") - @call.run_batch(SEND_CLOSE_FROM_CLIENT => nil) + begin + @call.run_batch(SEND_CLOSE_FROM_CLIENT => nil) + rescue GRPC::Core::CallError => e + GRPC.logger.warn('bidi-write-loop: send close failed') + GRPC.logger.warn(e) + end GRPC.logger.debug('bidi-write-loop: done') end GRPC.logger.debug('bidi-write-loop: finished') @@ -187,7 +198,7 @@ module GRPC batch_result = read_using_run_batch # handle the next message - if batch_result.message.nil? + if batch_result.nil? || batch_result.message.nil? GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}") if is_client diff --git a/src/ruby/lib/grpc/version.rb b/src/ruby/lib/grpc/version.rb index 256a543a9f..2cb7c4be53 100644 --- a/src/ruby/lib/grpc/version.rb +++ b/src/ruby/lib/grpc/version.rb @@ -14,5 +14,5 @@ # GRPC contains the General RPC module. module GRPC - VERSION = '1.11.0.dev' + VERSION = '1.12.0.dev' end diff --git a/src/ruby/spec/generic/client_stub_spec.rb b/src/ruby/spec/generic/client_stub_spec.rb index d858c4e3fe..da50f8d0c9 100644 --- a/src/ruby/spec/generic/client_stub_spec.rb +++ b/src/ruby/spec/generic/client_stub_spec.rb @@ -750,6 +750,90 @@ describe 'ClientStub' do # rubocop:disable Metrics/BlockLength expected_error_message) end end + + # Prompted by grpc/github #14853 + describe 'client-side error handling on bidi streams' do + class EnumeratorQueue + def initialize(queue) + @queue = queue + end + + def each + loop do + msg = @queue.pop + break if msg.nil? + yield msg + end + end + end + + def run_server_bidi_shutdown_after_one_read + @server.start + recvd_rpc = @server.request_call + recvd_call = recvd_rpc.call + server_call = GRPC::ActiveCall.new( + recvd_call, noop, noop, INFINITE_FUTURE, + metadata_received: true, started: false) + expect(server_call.remote_read).to eq('first message') + @server.shutdown_and_notify(from_relative_time(0)) + @server.close + end + + it 'receives a grpc status code when writes to a bidi stream fail' do + # This test tries to trigger the case when a 'SEND_MESSAGE' op + # and subseqeunt 'SEND_CLOSE_FROM_CLIENT' op of a bidi stream fails. + # In this case, iteration through the response stream should result + # in a grpc status code, and the writer thread should not raise an + # exception. + server_thread = Thread.new do + run_server_bidi_shutdown_after_one_read + end + stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) + request_queue = Queue.new + @sent_msgs = EnumeratorQueue.new(request_queue) + responses = get_responses(stub) + request_queue.push('first message') + # Now wait for the server to shut down. + server_thread.join + # Sanity check. This test is not interesting if + # Thread.abort_on_exception is not set. + expect(Thread.abort_on_exception).to be(true) + # An attempt to send a second message should fail now that the + # server is down. + request_queue.push('second message') + request_queue.push(nil) + expect { responses.next }.to raise_error(GRPC::BadStatus) + end + + def run_server_bidi_shutdown_after_one_write + @server.start + recvd_rpc = @server.request_call + recvd_call = recvd_rpc.call + server_call = GRPC::ActiveCall.new( + recvd_call, noop, noop, INFINITE_FUTURE, + metadata_received: true, started: false) + server_call.send_initial_metadata + server_call.remote_send('message') + @server.shutdown_and_notify(from_relative_time(0)) + @server.close + end + + it 'receives a grpc status code when reading from a failed bidi call' do + server_thread = Thread.new do + run_server_bidi_shutdown_after_one_write + end + stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) + request_queue = Queue.new + @sent_msgs = EnumeratorQueue.new(request_queue) + responses = get_responses(stub) + expect(responses.next).to eq('message') + # Wait for the server to shut down + server_thread.join + expect { responses.next }.to raise_error(GRPC::BadStatus) + # Push a sentinel to allow the writer thread to finish + request_queue.push(nil) + end + end end describe 'without a call operation' do @@ -810,6 +894,55 @@ describe 'ClientStub' do # rubocop:disable Metrics/BlockLength responses.each { |r| p r } end end + + def run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback) + @server.start + recvd_rpc = @server.request_call + recvd_call = recvd_rpc.call + server_call = GRPC::ActiveCall.new( + recvd_call, noop, noop, INFINITE_FUTURE, + metadata_received: true, started: false) + server_call.send_initial_metadata + server_call.remote_send('server call received') + wait_for_shutdown_ok_callback.call + # since the client is cancelling the call, + # we should be able to shut down cleanly + @server.shutdown_and_notify(nil) + @server.close + end + + it 'receives a grpc status code when reading from a cancelled bidi call' do + # This test tries to trigger a 'RECV_INITIAL_METADATA' and/or + # 'RECV_MESSAGE' op failure. + # An attempt to read a message might fail; in that case, iteration + # through the response stream should still result in a grpc status. + server_can_shutdown = false + server_can_shutdown_mu = Mutex.new + server_can_shutdown_cv = ConditionVariable.new + wait_for_shutdown_ok_callback = proc do + server_can_shutdown_mu.synchronize do + server_can_shutdown_cv.wait(server_can_shutdown_mu) until server_can_shutdown + end + end + server_thread = Thread.new do + run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback) + end + stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) + request_queue = Queue.new + @sent_msgs = EnumeratorQueue.new(request_queue) + responses = get_responses(stub) + expect(responses.next).to eq('server call received') + @op.cancel + expect { responses.next }.to raise_error(GRPC::Cancelled) + # Now let the server proceed to shut down. + server_can_shutdown_mu.synchronize do + server_can_shutdown = true + server_can_shutdown_cv.broadcast + end + server_thread.join + # Push a sentinel to allow the writer thread to finish + request_queue.push(nil) + end end end diff --git a/src/ruby/spec/pb/package_with_underscore/checker_spec.rb b/src/ruby/spec/pb/package_with_underscore/checker_spec.rb index 6155b3b934..dac7c14a9a 100644 --- a/src/ruby/spec/pb/package_with_underscore/checker_spec.rb +++ b/src/ruby/spec/pb/package_with_underscore/checker_spec.rb @@ -15,16 +15,13 @@ require 'open3' require 'tmpdir' -def debug_mode? - !ENV['CONFIG'].nil? && ENV['CONFIG'] == 'dbg' -end - describe 'Package with underscore protobuf code generation' do it 'should have the same content as created by code generation' do root_dir = File.join(File.dirname(__FILE__), '..', '..', '..', '..', '..') pb_dir = File.join(root_dir, 'src', 'ruby', 'spec', 'pb') - bins_sub_dir = debug_mode? ? 'dbg' : 'opt' + fail 'CONFIG env variable unexpectedly unset' unless ENV['CONFIG'] + bins_sub_dir = ENV['CONFIG'] bins_dir = File.join(root_dir, 'bins', bins_sub_dir) plugin = File.join(bins_dir, 'grpc_ruby_plugin') diff --git a/src/ruby/tools/version.rb b/src/ruby/tools/version.rb index 8dc1623d6f..822f70eb0a 100644 --- a/src/ruby/tools/version.rb +++ b/src/ruby/tools/version.rb @@ -14,6 +14,6 @@ module GRPC module Tools - VERSION = '1.11.0.dev' + VERSION = '1.12.0.dev' end end |