diff options
Diffstat (limited to 'src')
77 files changed, 918 insertions, 564 deletions
diff --git a/src/compiler/csharp_generator.cc b/src/compiler/csharp_generator.cc index 1c51f6da94..a923ce8e38 100644 --- a/src/compiler/csharp_generator.cc +++ b/src/compiler/csharp_generator.cc @@ -202,7 +202,8 @@ std::string GetCSharpMethodType(MethodType method_type) { std::string GetServiceNameFieldName() { return "__ServiceName"; } std::string GetMarshallerFieldName(const Descriptor* message) { - return "__Marshaller_" + message->name(); + return "__Marshaller_" + + grpc_generator::StringReplace(message->full_name(), ".", "_", true); } std::string GetMethodFieldName(const MethodDescriptor* method) { diff --git a/src/core/ext/filters/client_channel/http_proxy.cc b/src/core/ext/filters/client_channel/http_proxy.cc index 29a6c0e367..9baccd8628 100644 --- a/src/core/ext/filters/client_channel/http_proxy.cc +++ b/src/core/ext/filters/client_channel/http_proxy.cc @@ -83,11 +83,24 @@ done: return proxy_name; } +/** + * Checks the value of GRPC_ARG_ENABLE_HTTP_PROXY to determine if http_proxy + * should be used. + */ +bool http_proxy_enabled(const grpc_channel_args* args) { + const grpc_arg* arg = + grpc_channel_args_find(args, GRPC_ARG_ENABLE_HTTP_PROXY); + return grpc_channel_arg_get_bool(arg, true); +} + static bool proxy_mapper_map_name(grpc_proxy_mapper* mapper, const char* server_uri, const grpc_channel_args* args, char** name_to_resolve, grpc_channel_args** new_args) { + if (!http_proxy_enabled(args)) { + return false; + } char* user_cred = nullptr; *name_to_resolve = get_http_proxy_server(&user_cred); if (*name_to_resolve == nullptr) return false; diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc index 07f2e2efba..f496e9694d 100644 --- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc +++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_ev_driver_posix.cc @@ -104,17 +104,11 @@ static void fd_node_destroy(fd_node* fdn) { GPR_ASSERT(!fdn->writable_registered); GPR_ASSERT(fdn->already_shutdown); gpr_mu_destroy(&fdn->mu); - /* TODO: we need to pass a non-null "release_fd" parameter to - * grpc_fd_orphan because "epollsig" iomgr will close the fd - * even if "already_closed" is true, and it only leaves it open - * if "release_fd" is non-null. This is unlike the rest of the - * pollers, should this be changed within epollsig? */ - int dummy_release_fd; /* c-ares library has closed the fd inside grpc_fd. This fd may be picked up immediately by another thread, and should not be closed by the following grpc_fd_orphan. */ - grpc_fd_orphan(fdn->fd, nullptr, &dummy_release_fd, true /* already_closed */, - "c-ares query finished"); + int dummy_release_fd; + grpc_fd_orphan(fdn->fd, nullptr, &dummy_release_fd, "c-ares query finished"); gpr_free(fdn); } @@ -276,7 +270,7 @@ static void grpc_ares_notify_on_event_locked(grpc_ares_ev_driver* ev_driver) { gpr_asprintf(&fd_name, "ares_ev_driver-%" PRIuPTR, i); fdn = static_cast<fd_node*>(gpr_malloc(sizeof(fd_node))); gpr_log(GPR_DEBUG, "new fd: %d", socks[i]); - fdn->fd = grpc_fd_create(socks[i], fd_name); + fdn->fd = grpc_fd_create(socks[i], fd_name, false); fdn->ev_driver = ev_driver; fdn->readable_registered = false; fdn->writable_registered = false; diff --git a/src/core/ext/filters/deadline/deadline_filter.cc b/src/core/ext/filters/deadline/deadline_filter.cc index 27d3eac8d6..e0a41a3637 100644 --- a/src/core/ext/filters/deadline/deadline_filter.cc +++ b/src/core/ext/filters/deadline/deadline_filter.cc @@ -289,11 +289,10 @@ static void client_start_transport_stream_op_batch( static void recv_initial_metadata_ready(void* arg, grpc_error* error) { grpc_call_element* elem = static_cast<grpc_call_element*>(arg); server_call_data* calld = static_cast<server_call_data*>(elem->call_data); - // Get deadline from metadata and start the timer if needed. start_timer_if_needed(elem, calld->recv_initial_metadata->deadline); // Invoke the next callback. - calld->next_recv_initial_metadata_ready->cb( - calld->next_recv_initial_metadata_ready->cb_arg, error); + GRPC_CLOSURE_RUN(calld->next_recv_initial_metadata_ready, + GRPC_ERROR_REF(error)); } // Method for starting a call op for server filter. diff --git a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc index 0d349e2a89..a8f70333b2 100644 --- a/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc +++ b/src/core/ext/filters/load_reporting/server_load_reporting_filter.cc @@ -82,9 +82,7 @@ static void on_initial_md_ready(void* user_data, grpc_error* err) { } else { GRPC_ERROR_REF(err); } - calld->ops_recv_initial_metadata_ready->cb( - calld->ops_recv_initial_metadata_ready->cb_arg, err); - GRPC_ERROR_UNREF(err); + GRPC_CLOSURE_RUN(calld->ops_recv_initial_metadata_ready, err); } /* Constructor for call_data */ diff --git a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc index b95c9dae53..dfed824cd5 100644 --- a/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc +++ b/src/core/ext/transport/chttp2/client/insecure/channel_create_posix.cc @@ -50,7 +50,7 @@ grpc_channel* grpc_insecure_channel_create_from_fd( GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0); grpc_endpoint* client = grpc_tcp_client_create_from_fd( - grpc_fd_create(fd, "client"), args, "fd-client"); + grpc_fd_create(fd, "client", false), args, "fd-client"); grpc_transport* transport = grpc_create_chttp2_transport(final_args, client, true); diff --git a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc index 371e463814..a0228785ee 100644 --- a/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc +++ b/src/core/ext/transport/chttp2/server/insecure/server_chttp2_posix.cc @@ -43,8 +43,9 @@ void grpc_server_add_insecure_channel_from_fd(grpc_server* server, char* name; gpr_asprintf(&name, "fd:%d", fd); - grpc_endpoint* server_endpoint = grpc_tcp_create( - grpc_fd_create(fd, name), grpc_server_get_channel_args(server), name); + grpc_endpoint* server_endpoint = + grpc_tcp_create(grpc_fd_create(fd, name, false), + grpc_server_get_channel_args(server), name); gpr_free(name); diff --git a/src/core/lib/iomgr/cfstream_handle.cc b/src/core/lib/iomgr/cfstream_handle.cc index 30f4e65632..827fd24831 100644 --- a/src/core/lib/iomgr/cfstream_handle.cc +++ b/src/core/lib/iomgr/cfstream_handle.cc @@ -116,7 +116,9 @@ CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream, open_event_.InitEvent(); read_event_.InitEvent(); write_event_.InitEvent(); - CFStreamClientContext ctx = {0, static_cast<void*>(this), nil, nil, nil}; + CFStreamClientContext ctx = {0, static_cast<void*>(this), + CFStreamHandle::Retain, CFStreamHandle::Release, + nil}; CFReadStreamSetClient( read_stream, kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable | diff --git a/src/core/lib/iomgr/endpoint_pair_posix.cc b/src/core/lib/iomgr/endpoint_pair_posix.cc index 49850ab3a1..5c5c246f99 100644 --- a/src/core/lib/iomgr/endpoint_pair_posix.cc +++ b/src/core/lib/iomgr/endpoint_pair_posix.cc @@ -59,11 +59,11 @@ grpc_endpoint_pair grpc_iomgr_create_endpoint_pair(const char* name, grpc_core::ExecCtx exec_ctx; gpr_asprintf(&final_name, "%s:client", name); - p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name), args, + p.client = grpc_tcp_create(grpc_fd_create(sv[1], final_name, false), args, "socketpair-server"); gpr_free(final_name); gpr_asprintf(&final_name, "%s:server", name); - p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name), args, + p.server = grpc_tcp_create(grpc_fd_create(sv[0], final_name, false), args, "socketpair-client"); gpr_free(final_name); diff --git a/src/core/lib/iomgr/error.cc b/src/core/lib/iomgr/error.cc index 4088cf612e..90ed34da11 100644 --- a/src/core/lib/iomgr/error.cc +++ b/src/core/lib/iomgr/error.cc @@ -312,6 +312,12 @@ static void internal_add_error(grpc_error** err, grpc_error* new_err) { // It is very common to include and extra int and string in an error #define SURPLUS_CAPACITY (2 * SLOTS_PER_INT + SLOTS_PER_TIME) +static bool g_error_creation_allowed = true; + +void grpc_disable_error_creation() { g_error_creation_allowed = false; } + +void grpc_enable_error_creation() { g_error_creation_allowed = true; } + grpc_error* grpc_error_create(const char* file, int line, grpc_slice desc, grpc_error** referencing, size_t num_referencing) { @@ -326,6 +332,12 @@ grpc_error* grpc_error_create(const char* file, int line, grpc_slice desc, return GRPC_ERROR_OOM; } #ifndef NDEBUG + if (!g_error_creation_allowed) { + gpr_log(GPR_ERROR, + "Error creation occurred when error creation was disabled [%s:%d]", + file, line); + abort(); + } if (grpc_trace_error_refcount.enabled()) { gpr_log(GPR_DEBUG, "%p create [%s:%d]", err, file, line); } diff --git a/src/core/lib/iomgr/error.h b/src/core/lib/iomgr/error.h index f8cae4da82..27c4d22fd1 100644 --- a/src/core/lib/iomgr/error.h +++ b/src/core/lib/iomgr/error.h @@ -123,6 +123,11 @@ typedef enum { #define GRPC_ERROR_OOM ((grpc_error*)2) #define GRPC_ERROR_CANCELLED ((grpc_error*)4) +// debug only toggles that allow for a sanity to check that ensures we will +// never create any errors in the per-RPC hotpath. +void grpc_disable_error_creation(); +void grpc_enable_error_creation(); + const char* grpc_error_string(grpc_error* error); /// Create an error - but use GRPC_ERROR_CREATE instead diff --git a/src/core/lib/iomgr/ev_epoll1_linux.cc b/src/core/lib/iomgr/ev_epoll1_linux.cc index cf839619cd..86a0243d2e 100644 --- a/src/core/lib/iomgr/ev_epoll1_linux.cc +++ b/src/core/lib/iomgr/ev_epoll1_linux.cc @@ -136,6 +136,7 @@ struct grpc_fd { grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure; grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure; + grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure; struct grpc_fd* freelist_next; @@ -272,7 +273,7 @@ static void fd_global_shutdown(void) { gpr_mu_destroy(&fd_freelist_mu); } -static grpc_fd* fd_create(int fd, const char* name) { +static grpc_fd* fd_create(int fd, const char* name, bool track_err) { grpc_fd* new_fd = nullptr; gpr_mu_lock(&fd_freelist_mu); @@ -286,11 +287,12 @@ static grpc_fd* fd_create(int fd, const char* name) { new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd))); new_fd->read_closure.Init(); new_fd->write_closure.Init(); + new_fd->error_closure.Init(); } - new_fd->fd = fd; new_fd->read_closure->InitEvent(); new_fd->write_closure->InitEvent(); + new_fd->error_closure->InitEvent(); gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); new_fd->freelist_next = nullptr; @@ -307,7 +309,13 @@ static grpc_fd* fd_create(int fd, const char* name) { struct epoll_event ev; ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET); - ev.data.ptr = new_fd; + /* Use the least significant bit of ev.data.ptr to store track_err. We expect + * the addresses to be word aligned. We need to store track_err to avoid + * synchronization issues when accessing it after receiving an event. + * Accessing fd would be a data race there because the fd might have been + * returned to the free list at that point. */ + ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(new_fd) | + (track_err ? 1 : 0)); if (epoll_ctl(g_epoll_set.epfd, EPOLL_CTL_ADD, fd, &ev) != 0) { gpr_log(GPR_ERROR, "epoll_ctl failed: %s", strerror(errno)); } @@ -327,6 +335,7 @@ static void fd_shutdown_internal(grpc_fd* fd, grpc_error* why, shutdown(fd->fd, SHUT_RDWR); } fd->write_closure->SetShutdown(GRPC_ERROR_REF(why)); + fd->error_closure->SetShutdown(GRPC_ERROR_REF(why)); } GRPC_ERROR_UNREF(why); } @@ -337,7 +346,7 @@ static void fd_shutdown(grpc_fd* fd, grpc_error* why) { } static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, - bool already_closed, const char* reason) { + const char* reason) { grpc_error* error = GRPC_ERROR_NONE; bool is_release_fd = (release_fd != nullptr); @@ -350,7 +359,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, descriptor fd->fd (but we still own the grpc_fd structure). */ if (is_release_fd) { *release_fd = fd->fd; - } else if (!already_closed) { + } else { close(fd->fd); } @@ -359,6 +368,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, grpc_iomgr_unregister_object(&fd->iomgr_object); fd->read_closure->DestroyEvent(); fd->write_closure->DestroyEvent(); + fd->error_closure->DestroyEvent(); gpr_mu_lock(&fd_freelist_mu); fd->freelist_next = fd_freelist; @@ -383,6 +393,10 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) { fd->write_closure->NotifyOn(closure); } +static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) { + fd->error_closure->NotifyOn(closure); +} + static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) { fd->read_closure->SetReady(); /* Use release store to match with acquire load in fd_get_read_notifier */ @@ -391,6 +405,8 @@ static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) { static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); } +static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); } + /******************************************************************************* * Pollset Definitions */ @@ -611,16 +627,25 @@ static grpc_error* process_epoll_events(grpc_pollset* pollset) { append_error(&error, grpc_wakeup_fd_consume_wakeup(&global_wakeup_fd), err_desc); } else { - grpc_fd* fd = static_cast<grpc_fd*>(data_ptr); - bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0; + grpc_fd* fd = reinterpret_cast<grpc_fd*>( + reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1)); + bool track_err = + reinterpret_cast<intptr_t>(data_ptr) & static_cast<intptr_t>(1); + bool cancel = (ev->events & EPOLLHUP) != 0; + bool error = (ev->events & EPOLLERR) != 0; bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0; bool write_ev = (ev->events & EPOLLOUT) != 0; + bool err_fallback = error && !track_err; + + if (error && !err_fallback) { + fd_has_errors(fd); + } - if (read_ev || cancel) { + if (read_ev || cancel || err_fallback) { fd_become_readable(fd, pollset); } - if (write_ev || cancel) { + if (write_ev || cancel || err_fallback) { fd_become_writable(fd); } } @@ -1183,6 +1208,7 @@ static void shutdown_engine(void) { static const grpc_event_engine_vtable vtable = { sizeof(grpc_pollset), + true, fd_create, fd_wrapped_fd, @@ -1190,6 +1216,7 @@ static const grpc_event_engine_vtable vtable = { fd_shutdown, fd_notify_on_read, fd_notify_on_write, + fd_notify_on_error, fd_is_shutdown, fd_get_read_notifier_pollset, diff --git a/src/core/lib/iomgr/ev_epollex_linux.cc b/src/core/lib/iomgr/ev_epollex_linux.cc index 993ea9480e..111b62171b 100644 --- a/src/core/lib/iomgr/ev_epollex_linux.cc +++ b/src/core/lib/iomgr/ev_epollex_linux.cc @@ -175,6 +175,7 @@ struct grpc_fd { grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure; grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure; + grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure; struct grpc_fd* freelist_next; grpc_closure* on_done_closure; @@ -184,6 +185,9 @@ struct grpc_fd { gpr_atm read_notifier_pollset; grpc_iomgr_object iomgr_object; + + /* Do we need to track EPOLLERR events separately? */ + bool track_err; }; static void fd_global_init(void); @@ -309,6 +313,7 @@ static void fd_destroy(void* arg, grpc_error* error) { fd->read_closure->DestroyEvent(); fd->write_closure->DestroyEvent(); + fd->error_closure->DestroyEvent(); gpr_mu_unlock(&fd_freelist_mu); } @@ -348,7 +353,7 @@ static void fd_global_shutdown(void) { gpr_mu_destroy(&fd_freelist_mu); } -static grpc_fd* fd_create(int fd, const char* name) { +static grpc_fd* fd_create(int fd, const char* name, bool track_err) { grpc_fd* new_fd = nullptr; gpr_mu_lock(&fd_freelist_mu); @@ -362,6 +367,7 @@ static grpc_fd* fd_create(int fd, const char* name) { new_fd = static_cast<grpc_fd*>(gpr_malloc(sizeof(grpc_fd))); new_fd->read_closure.Init(); new_fd->write_closure.Init(); + new_fd->error_closure.Init(); } gpr_mu_init(&new_fd->pollable_mu); @@ -369,9 +375,11 @@ static grpc_fd* fd_create(int fd, const char* name) { new_fd->pollable_obj = nullptr; gpr_atm_rel_store(&new_fd->refst, (gpr_atm)1); new_fd->fd = fd; + new_fd->track_err = track_err; new_fd->salt = gpr_atm_no_barrier_fetch_add(&g_fd_salt, 1); new_fd->read_closure->InitEvent(); new_fd->write_closure->InitEvent(); + new_fd->error_closure->InitEvent(); gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); new_fd->freelist_next = nullptr; @@ -395,8 +403,8 @@ static int fd_wrapped_fd(grpc_fd* fd) { } static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, - bool already_closed, const char* reason) { - bool is_fd_closed = already_closed; + const char* reason) { + bool is_fd_closed = false; gpr_mu_lock(&fd->orphan_mu); @@ -406,7 +414,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, descriptor fd->fd (but we still own the grpc_fd structure). */ if (release_fd != nullptr) { *release_fd = fd->fd; - } else if (!is_fd_closed) { + } else { close(fd->fd); is_fd_closed = true; } @@ -445,6 +453,7 @@ static void fd_shutdown(grpc_fd* fd, grpc_error* why) { } } fd->write_closure->SetShutdown(GRPC_ERROR_REF(why)); + fd->error_closure->SetShutdown(GRPC_ERROR_REF(why)); } GRPC_ERROR_UNREF(why); } @@ -457,6 +466,10 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) { fd->write_closure->NotifyOn(closure); } +static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) { + fd->error_closure->NotifyOn(closure); +} + /******************************************************************************* * Pollable Definitions */ @@ -584,7 +597,12 @@ static grpc_error* pollable_add_fd(pollable* p, grpc_fd* fd) { struct epoll_event ev_fd; ev_fd.events = static_cast<uint32_t>(EPOLLET | EPOLLIN | EPOLLOUT | EPOLLEXCLUSIVE); - ev_fd.data.ptr = fd; + /* Use the second least significant bit of ev_fd.data.ptr to store track_err + * to avoid synchronization issues when accessing it after receiving an event. + * Accessing fd would be a data race there because the fd might have been + * returned to the free list at that point. */ + ev_fd.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(fd) | + (fd->track_err ? 2 : 0)); GRPC_STATS_INC_SYSCALL_EPOLL_CTL(); if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd->fd, &ev_fd) != 0) { switch (errno) { @@ -785,6 +803,8 @@ static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) { static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); } +static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); } + static grpc_error* fd_get_or_become_pollable(grpc_fd* fd, pollable** p) { gpr_mu_lock(&fd->pollable_mu); grpc_error* error = GRPC_ERROR_NONE; @@ -853,20 +873,28 @@ static grpc_error* pollable_process_events(grpc_pollset* pollset, (intptr_t)data_ptr)), err_desc); } else { - grpc_fd* fd = static_cast<grpc_fd*>(data_ptr); - bool cancel = (ev->events & (EPOLLERR | EPOLLHUP)) != 0; + grpc_fd* fd = + reinterpret_cast<grpc_fd*>(reinterpret_cast<intptr_t>(data_ptr) & ~2); + bool track_err = reinterpret_cast<intptr_t>(data_ptr) & 2; + bool cancel = (ev->events & EPOLLHUP) != 0; + bool error = (ev->events & EPOLLERR) != 0; bool read_ev = (ev->events & (EPOLLIN | EPOLLPRI)) != 0; bool write_ev = (ev->events & EPOLLOUT) != 0; + bool err_fallback = error && !track_err; + if (grpc_polling_trace.enabled()) { gpr_log(GPR_INFO, "PS:%p got fd %p: cancel=%d read=%d " "write=%d", pollset, fd, cancel, read_ev, write_ev); } - if (read_ev || cancel) { + if (error && !err_fallback) { + fd_has_errors(fd); + } + if (read_ev || cancel || err_fallback) { fd_become_readable(fd, pollset); } - if (write_ev || cancel) { + if (write_ev || cancel || err_fallback) { fd_become_writable(fd); } } @@ -1508,6 +1536,7 @@ static void shutdown_engine(void) { static const grpc_event_engine_vtable vtable = { sizeof(grpc_pollset), + true, fd_create, fd_wrapped_fd, @@ -1515,6 +1544,7 @@ static const grpc_event_engine_vtable vtable = { fd_shutdown, fd_notify_on_read, fd_notify_on_write, + fd_notify_on_error, fd_is_shutdown, fd_get_read_notifier_pollset, diff --git a/src/core/lib/iomgr/ev_epollsig_linux.cc b/src/core/lib/iomgr/ev_epollsig_linux.cc index a144817a83..2189801c18 100644 --- a/src/core/lib/iomgr/ev_epollsig_linux.cc +++ b/src/core/lib/iomgr/ev_epollsig_linux.cc @@ -132,6 +132,7 @@ struct grpc_fd { grpc_core::ManualConstructor<grpc_core::LockfreeEvent> read_closure; grpc_core::ManualConstructor<grpc_core::LockfreeEvent> write_closure; + grpc_core::ManualConstructor<grpc_core::LockfreeEvent> error_closure; struct grpc_fd* freelist_next; grpc_closure* on_done_closure; @@ -141,6 +142,9 @@ struct grpc_fd { gpr_atm read_notifier_pollset; grpc_iomgr_object iomgr_object; + + /* Do we need to track EPOLLERR events separately? */ + bool track_err; }; /* Reference counting for fds */ @@ -352,7 +356,10 @@ static void polling_island_add_fds_locked(polling_island* pi, grpc_fd** fds, for (i = 0; i < fd_count; i++) { ev.events = static_cast<uint32_t>(EPOLLIN | EPOLLOUT | EPOLLET); - ev.data.ptr = fds[i]; + /* Use the least significant bit of ev.data.ptr to store track_err to avoid + * synchronization issues when accessing it after receiving an event */ + ev.data.ptr = reinterpret_cast<void*>(reinterpret_cast<intptr_t>(fds[i]) | + (fds[i]->track_err ? 1 : 0)); err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_ADD, fds[i]->fd, &ev); if (err < 0) { @@ -435,7 +442,6 @@ static void polling_island_remove_all_fds_locked(polling_island* pi, /* The caller is expected to hold pi->mu lock before calling this function */ static void polling_island_remove_fd_locked(polling_island* pi, grpc_fd* fd, - bool is_fd_closed, grpc_error** error) { int err; size_t i; @@ -444,16 +450,14 @@ static void polling_island_remove_fd_locked(polling_island* pi, grpc_fd* fd, /* If fd is already closed, then it would have been automatically been removed from the epoll set */ - if (!is_fd_closed) { - err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, nullptr); - if (err < 0 && errno != ENOENT) { - gpr_asprintf( - &err_msg, - "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)", - pi->epoll_fd, fd->fd, errno, strerror(errno)); - append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc); - gpr_free(err_msg); - } + err = epoll_ctl(pi->epoll_fd, EPOLL_CTL_DEL, fd->fd, nullptr); + if (err < 0 && errno != ENOENT) { + gpr_asprintf( + &err_msg, + "epoll_ctl (epoll_fd: %d) del fd: %d failed with error: %d (%s)", + pi->epoll_fd, fd->fd, errno, strerror(errno)); + append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc); + gpr_free(err_msg); } for (i = 0; i < pi->fd_cnt; i++) { @@ -769,6 +773,7 @@ static void unref_by(grpc_fd* fd, int n) { fd->read_closure->DestroyEvent(); fd->write_closure->DestroyEvent(); + fd->error_closure->DestroyEvent(); gpr_mu_unlock(&fd_freelist_mu); } else { @@ -806,7 +811,7 @@ static void fd_global_shutdown(void) { gpr_mu_destroy(&fd_freelist_mu); } -static grpc_fd* fd_create(int fd, const char* name) { +static grpc_fd* fd_create(int fd, const char* name, bool track_err) { grpc_fd* new_fd = nullptr; gpr_mu_lock(&fd_freelist_mu); @@ -821,6 +826,7 @@ static grpc_fd* fd_create(int fd, const char* name) { gpr_mu_init(&new_fd->po.mu); new_fd->read_closure.Init(); new_fd->write_closure.Init(); + new_fd->error_closure.Init(); } /* Note: It is not really needed to get the new_fd->po.mu lock here. If this @@ -837,6 +843,8 @@ static grpc_fd* fd_create(int fd, const char* name) { new_fd->orphaned = false; new_fd->read_closure->InitEvent(); new_fd->write_closure->InitEvent(); + new_fd->error_closure->InitEvent(); + new_fd->track_err = track_err; gpr_atm_no_barrier_store(&new_fd->read_notifier_pollset, (gpr_atm)NULL); new_fd->freelist_next = nullptr; @@ -863,7 +871,7 @@ static int fd_wrapped_fd(grpc_fd* fd) { } static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, - bool already_closed, const char* reason) { + const char* reason) { grpc_error* error = GRPC_ERROR_NONE; polling_island* unref_pi = nullptr; @@ -884,7 +892,7 @@ static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, before doing this.) */ if (fd->po.pi != nullptr) { polling_island* pi_latest = polling_island_lock(fd->po.pi); - polling_island_remove_fd_locked(pi_latest, fd, already_closed, &error); + polling_island_remove_fd_locked(pi_latest, fd, &error); gpr_mu_unlock(&pi_latest->mu); unref_pi = fd->po.pi; @@ -933,6 +941,7 @@ static void fd_shutdown(grpc_fd* fd, grpc_error* why) { if (fd->read_closure->SetShutdown(GRPC_ERROR_REF(why))) { shutdown(fd->fd, SHUT_RDWR); fd->write_closure->SetShutdown(GRPC_ERROR_REF(why)); + fd->error_closure->SetShutdown(GRPC_ERROR_REF(why)); } GRPC_ERROR_UNREF(why); } @@ -945,6 +954,10 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) { fd->write_closure->NotifyOn(closure); } +static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) { + fd->error_closure->NotifyOn(closure); +} + /******************************************************************************* * Pollset Definitions */ @@ -1116,6 +1129,8 @@ static void fd_become_readable(grpc_fd* fd, grpc_pollset* notifier) { static void fd_become_writable(grpc_fd* fd) { fd->write_closure->SetReady(); } +static void fd_has_errors(grpc_fd* fd) { fd->error_closure->SetReady(); } + static void pollset_release_polling_island(grpc_pollset* ps, const char* reason) { if (ps->po.pi != nullptr) { @@ -1254,14 +1269,23 @@ static void pollset_work_and_unlock(grpc_pollset* pollset, to the function pollset_work_and_unlock() will pick up the correct epoll_fd */ } else { - grpc_fd* fd = static_cast<grpc_fd*>(data_ptr); - int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP); - int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI); - int write_ev = ep_ev[i].events & EPOLLOUT; - if (read_ev || cancel) { + grpc_fd* fd = reinterpret_cast<grpc_fd*>( + reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1)); + bool track_err = + reinterpret_cast<intptr_t>(data_ptr) & ~static_cast<intptr_t>(1); + bool cancel = (ep_ev[i].events & EPOLLHUP) != 0; + bool error = (ep_ev[i].events & EPOLLERR) != 0; + bool read_ev = (ep_ev[i].events & (EPOLLIN | EPOLLPRI)) != 0; + bool write_ev = (ep_ev[i].events & EPOLLOUT) != 0; + bool err_fallback = error && !track_err; + + if (error && !err_fallback) { + fd_has_errors(fd); + } + if (read_ev || cancel || err_fallback) { fd_become_readable(fd, pollset); } - if (write_ev || cancel) { + if (write_ev || cancel || err_fallback) { fd_become_writable(fd); } } @@ -1634,6 +1658,7 @@ static void shutdown_engine(void) { static const grpc_event_engine_vtable vtable = { sizeof(grpc_pollset), + true, fd_create, fd_wrapped_fd, @@ -1641,6 +1666,7 @@ static const grpc_event_engine_vtable vtable = { fd_shutdown, fd_notify_on_read, fd_notify_on_write, + fd_notify_on_error, fd_is_shutdown, fd_get_read_notifier_pollset, diff --git a/src/core/lib/iomgr/ev_poll_posix.cc b/src/core/lib/iomgr/ev_poll_posix.cc index 70958ed562..c9c09881a2 100644 --- a/src/core/lib/iomgr/ev_poll_posix.cc +++ b/src/core/lib/iomgr/ev_poll_posix.cc @@ -330,7 +330,8 @@ static void unref_by(grpc_fd* fd, int n) { } } -static grpc_fd* fd_create(int fd, const char* name) { +static grpc_fd* fd_create(int fd, const char* name, bool track_err) { + GPR_DEBUG_ASSERT(track_err == false); grpc_fd* r = static_cast<grpc_fd*>(gpr_malloc(sizeof(*r))); gpr_mu_init(&r->mu); gpr_atm_rel_store(&r->refst, 1); @@ -424,14 +425,12 @@ static int fd_wrapped_fd(grpc_fd* fd) { } static void fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, - bool already_closed, const char* reason) { + const char* reason) { fd->on_done_closure = on_done; fd->released = release_fd != nullptr; if (release_fd != nullptr) { *release_fd = fd->fd; fd->released = true; - } else if (already_closed) { - fd->released = true; } gpr_mu_lock(&fd->mu); REF_BY(fd, 1, reason); /* remove active status, but keep referenced */ @@ -553,6 +552,11 @@ static void fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) { gpr_mu_unlock(&fd->mu); } +static void fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) { + gpr_log(GPR_ERROR, "Polling engine does not support tracking errors."); + abort(); +} + static uint32_t fd_begin_poll(grpc_fd* fd, grpc_pollset* pollset, grpc_pollset_worker* worker, uint32_t read_mask, uint32_t write_mask, grpc_fd_watcher* watcher) { @@ -1710,6 +1714,7 @@ static void shutdown_engine(void) { static const grpc_event_engine_vtable vtable = { sizeof(grpc_pollset), + false, fd_create, fd_wrapped_fd, @@ -1717,6 +1722,7 @@ static const grpc_event_engine_vtable vtable = { fd_shutdown, fd_notify_on_read, fd_notify_on_write, + fd_notify_on_error, fd_is_shutdown, fd_get_read_notifier_pollset, diff --git a/src/core/lib/iomgr/ev_posix.cc b/src/core/lib/iomgr/ev_posix.cc index 6b7eca0afa..1139b3273a 100644 --- a/src/core/lib/iomgr/ev_posix.cc +++ b/src/core/lib/iomgr/ev_posix.cc @@ -193,10 +193,15 @@ void grpc_event_engine_shutdown(void) { g_event_engine = nullptr; } -grpc_fd* grpc_fd_create(int fd, const char* name) { - GRPC_POLLING_API_TRACE("fd_create(%d, %s)", fd, name); - GRPC_FD_TRACE("fd_create(%d, %s)", fd, name); - return g_event_engine->fd_create(fd, name); +bool grpc_event_engine_can_track_errors(void) { + return g_event_engine->can_track_err; +} + +grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err) { + GRPC_POLLING_API_TRACE("fd_create(%d, %s, %d)", fd, name, track_err); + GRPC_FD_TRACE("fd_create(%d, %s, %d)", fd, name, track_err); + GPR_DEBUG_ASSERT(!track_err || g_event_engine->can_track_err); + return g_event_engine->fd_create(fd, name, track_err); } int grpc_fd_wrapped_fd(grpc_fd* fd) { @@ -204,13 +209,12 @@ int grpc_fd_wrapped_fd(grpc_fd* fd) { } void grpc_fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, - bool already_closed, const char* reason) { - GRPC_POLLING_API_TRACE("fd_orphan(%d, %p, %p, %d, %s)", - grpc_fd_wrapped_fd(fd), on_done, release_fd, - already_closed, reason); + const char* reason) { + GRPC_POLLING_API_TRACE("fd_orphan(%d, %p, %p, %s)", grpc_fd_wrapped_fd(fd), + on_done, release_fd, reason); GRPC_FD_TRACE("grpc_fd_orphan, fd:%d closed", grpc_fd_wrapped_fd(fd)); - g_event_engine->fd_orphan(fd, on_done, release_fd, already_closed, reason); + g_event_engine->fd_orphan(fd, on_done, release_fd, reason); } void grpc_fd_shutdown(grpc_fd* fd, grpc_error* why) { @@ -231,6 +235,10 @@ void grpc_fd_notify_on_write(grpc_fd* fd, grpc_closure* closure) { g_event_engine->fd_notify_on_write(fd, closure); } +void grpc_fd_notify_on_error(grpc_fd* fd, grpc_closure* closure) { + g_event_engine->fd_notify_on_error(fd, closure); +} + static size_t pollset_size(void) { return g_event_engine->pollset_size; } static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) { diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 82cbce9a7b..b4c17fc80d 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -41,14 +41,16 @@ typedef struct grpc_fd grpc_fd; typedef struct grpc_event_engine_vtable { size_t pollset_size; + bool can_track_err; - grpc_fd* (*fd_create)(int fd, const char* name); + grpc_fd* (*fd_create)(int fd, const char* name, bool track_err); int (*fd_wrapped_fd)(grpc_fd* fd); void (*fd_orphan)(grpc_fd* fd, grpc_closure* on_done, int* release_fd, - bool already_closed, const char* reason); + const char* reason); void (*fd_shutdown)(grpc_fd* fd, grpc_error* why); void (*fd_notify_on_read)(grpc_fd* fd, grpc_closure* closure); void (*fd_notify_on_write)(grpc_fd* fd, grpc_closure* closure); + void (*fd_notify_on_error)(grpc_fd* fd, grpc_closure* closure); bool (*fd_is_shutdown)(grpc_fd* fd); grpc_pollset* (*fd_get_read_notifier_pollset)(grpc_fd* fd); @@ -84,10 +86,20 @@ void grpc_event_engine_shutdown(void); /* Return the name of the poll strategy */ const char* grpc_get_poll_strategy_name(); +/* Returns true if polling engine can track errors separately, false otherwise. + * If this is true, fd can be created with track_err set. After this, error + * events will be reported using fd_notify_on_error. If it is not set, errors + * will continue to be reported through fd_notify_on_read and + * fd_notify_on_write. + */ +bool grpc_event_engine_can_track_errors(); + /* Create a wrapped file descriptor. Requires fd is a non-blocking file descriptor. + \a track_err if true means that error events would be tracked separately + using grpc_fd_notify_on_error. Currently, valid only for linux systems. This takes ownership of closing fd. */ -grpc_fd* grpc_fd_create(int fd, const char* name); +grpc_fd* grpc_fd_create(int fd, const char* name, bool track_err); /* Return the wrapped fd, or -1 if it has been released or closed. */ int grpc_fd_wrapped_fd(grpc_fd* fd); @@ -100,7 +112,7 @@ int grpc_fd_wrapped_fd(grpc_fd* fd); notify_on_write. MUST NOT be called with a pollset lock taken */ void grpc_fd_orphan(grpc_fd* fd, grpc_closure* on_done, int* release_fd, - bool already_closed, const char* reason); + const char* reason); /* Has grpc_fd_shutdown been called on an fd? */ bool grpc_fd_is_shutdown(grpc_fd* fd); @@ -126,6 +138,10 @@ void grpc_fd_notify_on_read(grpc_fd* fd, grpc_closure* closure); /* Exactly the same semantics as above, except based on writable events. */ void grpc_fd_notify_on_write(grpc_fd* fd, grpc_closure* closure); +/* Exactly the same semantics as above, except based on error events. track_err + * needs to have been set on grpc_fd_create */ +void grpc_fd_notify_on_error(grpc_fd* fd, grpc_closure* closure); + /* Return the read notifier pollset from the fd */ grpc_pollset* grpc_fd_get_read_notifier_pollset(grpc_fd* fd); diff --git a/src/core/lib/iomgr/tcp_client_cfstream.cc b/src/core/lib/iomgr/tcp_client_cfstream.cc index ffed3bbef6..5acea91792 100644 --- a/src/core/lib/iomgr/tcp_client_cfstream.cc +++ b/src/core/lib/iomgr/tcp_client_cfstream.cc @@ -52,7 +52,7 @@ typedef struct CFStreamConnect { CFReadStreamRef read_stream; CFWriteStreamRef write_stream; - CFStreamHandle* stream_sync; + CFStreamHandle* stream_handle; grpc_timer alarm; grpc_closure on_alarm; @@ -71,7 +71,7 @@ typedef struct CFStreamConnect { static void CFStreamConnectCleanup(CFStreamConnect* connect) { grpc_resource_quota_unref_internal(connect->resource_quota); - CFSTREAM_HANDLE_UNREF(connect->stream_sync, "async connect clean up"); + CFSTREAM_HANDLE_UNREF(connect->stream_handle, "async connect clean up"); CFRelease(connect->read_stream); CFRelease(connect->write_stream); gpr_mu_destroy(&connect->mu); @@ -131,7 +131,7 @@ static void OnOpen(void* arg, grpc_error* error) { if (error == GRPC_ERROR_NONE) { *endpoint = grpc_cfstream_endpoint_create( connect->read_stream, connect->write_stream, connect->addr_name, - connect->resource_quota, connect->stream_sync); + connect->resource_quota, connect->stream_handle); } } else { GRPC_ERROR_REF(error); @@ -170,8 +170,8 @@ static void CFStreamClientConnect(grpc_closure* closure, grpc_endpoint** ep, gpr_mu_init(&connect->mu); if (grpc_tcp_trace.enabled()) { - gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: asynchronously connecting", - connect->addr_name); + gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %p, %s: asynchronously connecting", + connect, connect->addr_name); } grpc_resource_quota* resource_quota = grpc_resource_quota_create(NULL); @@ -197,11 +197,11 @@ static void CFStreamClientConnect(grpc_closure* closure, grpc_endpoint** ep, CFRelease(host); connect->read_stream = read_stream; connect->write_stream = write_stream; - connect->stream_sync = + connect->stream_handle = CFStreamHandle::CreateStreamHandle(read_stream, write_stream); GRPC_CLOSURE_INIT(&connect->on_open, OnOpen, static_cast<void*>(connect), grpc_schedule_on_exec_ctx); - connect->stream_sync->NotifyOnOpen(&connect->on_open); + connect->stream_handle->NotifyOnOpen(&connect->on_open); GRPC_CLOSURE_INIT(&connect->on_alarm, OnAlarm, connect, grpc_schedule_on_exec_ctx); gpr_mu_lock(&connect->mu); diff --git a/src/core/lib/iomgr/tcp_client_posix.cc b/src/core/lib/iomgr/tcp_client_posix.cc index 39da7f1637..296ee74311 100644 --- a/src/core/lib/iomgr/tcp_client_posix.cc +++ b/src/core/lib/iomgr/tcp_client_posix.cc @@ -211,8 +211,7 @@ static void on_writable(void* acp, grpc_error* error) { finish: if (fd != nullptr) { grpc_pollset_set_del_fd(ac->interested_parties, fd); - grpc_fd_orphan(fd, nullptr, nullptr, false /* already_closed */, - "tcp_client_orphan"); + grpc_fd_orphan(fd, nullptr, nullptr, "tcp_client_orphan"); fd = nullptr; } done = (--ac->refs == 0); @@ -280,7 +279,7 @@ grpc_error* grpc_tcp_client_prepare_fd(const grpc_channel_args* channel_args, } addr_str = grpc_sockaddr_to_uri(mapped_addr); gpr_asprintf(&name, "tcp-client:%s", addr_str); - *fdobj = grpc_fd_create(fd, name); + *fdobj = grpc_fd_create(fd, name, false); gpr_free(name); gpr_free(addr_str); return GRPC_ERROR_NONE; @@ -305,8 +304,7 @@ void grpc_tcp_client_create_from_prepared_fd( return; } if (errno != EWOULDBLOCK && errno != EINPROGRESS) { - grpc_fd_orphan(fdobj, nullptr, nullptr, false /* already_closed */, - "tcp_client_connect_error"); + grpc_fd_orphan(fdobj, nullptr, nullptr, "tcp_client_connect_error"); GRPC_CLOSURE_SCHED(closure, GRPC_OS_ERROR(errno, "connect")); return; } diff --git a/src/core/lib/iomgr/tcp_posix.cc b/src/core/lib/iomgr/tcp_posix.cc index 43d545846d..9df2e206b2 100644 --- a/src/core/lib/iomgr/tcp_posix.cc +++ b/src/core/lib/iomgr/tcp_posix.cc @@ -297,7 +297,7 @@ static void tcp_shutdown(grpc_endpoint* ep, grpc_error* why) { static void tcp_free(grpc_tcp* tcp) { grpc_fd_orphan(tcp->em_fd, tcp->release_fd_cb, tcp->release_fd, - false /* already_closed */, "tcp_unref_orphan"); + "tcp_unref_orphan"); grpc_slice_buffer_destroy_internal(&tcp->last_read_buffer); grpc_resource_user_unref(tcp->resource_user); gpr_free(tcp->peer_string); diff --git a/src/core/lib/iomgr/tcp_server_posix.cc b/src/core/lib/iomgr/tcp_server_posix.cc index 0a5caca906..8ddf684fea 100644 --- a/src/core/lib/iomgr/tcp_server_posix.cc +++ b/src/core/lib/iomgr/tcp_server_posix.cc @@ -150,7 +150,7 @@ static void deactivated_all_ports(grpc_tcp_server* s) { GRPC_CLOSURE_INIT(&sp->destroyed_closure, destroyed_port, s, grpc_schedule_on_exec_ctx); grpc_fd_orphan(sp->emfd, &sp->destroyed_closure, nullptr, - false /* already_closed */, "tcp_listener_shutdown"); + "tcp_listener_shutdown"); } gpr_mu_unlock(&s->mu); } else { @@ -226,7 +226,7 @@ static void on_read(void* arg, grpc_error* err) { gpr_log(GPR_INFO, "SERVER_CONNECT: incoming connection: %s", addr_str); } - grpc_fd* fdobj = grpc_fd_create(fd, name); + grpc_fd* fdobj = grpc_fd_create(fd, name, false); read_notifier_pollset = sp->server->pollsets[static_cast<size_t>(gpr_atm_no_barrier_fetch_add( @@ -362,7 +362,7 @@ static grpc_error* clone_port(grpc_tcp_listener* listener, unsigned count) { listener->sibling = sp; sp->server = listener->server; sp->fd = fd; - sp->emfd = grpc_fd_create(fd, name); + sp->emfd = grpc_fd_create(fd, name, false); memcpy(&sp->addr, &listener->addr, sizeof(grpc_resolved_address)); sp->port = port; sp->port_index = listener->port_index; diff --git a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc index 73afa15e65..b9f8145572 100644 --- a/src/core/lib/iomgr/tcp_server_utils_posix_common.cc +++ b/src/core/lib/iomgr/tcp_server_utils_posix_common.cc @@ -105,7 +105,7 @@ static grpc_error* add_socket_to_server(grpc_tcp_server* s, int fd, s->tail = sp; sp->server = s; sp->fd = fd; - sp->emfd = grpc_fd_create(fd, name); + sp->emfd = grpc_fd_create(fd, name, false); memcpy(&sp->addr, addr, sizeof(grpc_resolved_address)); sp->port = port; sp->port_index = port_index; diff --git a/src/core/lib/iomgr/udp_server.cc b/src/core/lib/iomgr/udp_server.cc index 51d17eb174..bdb2d0e764 100644 --- a/src/core/lib/iomgr/udp_server.cc +++ b/src/core/lib/iomgr/udp_server.cc @@ -152,7 +152,7 @@ GrpcUdpListener::GrpcUdpListener(grpc_udp_server* server, int fd, grpc_sockaddr_to_string(&addr_str, addr, 1); gpr_asprintf(&name, "udp-server-listener:%s", addr_str); gpr_free(addr_str); - emfd_ = grpc_fd_create(fd, name); + emfd_ = grpc_fd_create(fd, name, false); memcpy(&addr_, addr, sizeof(grpc_resolved_address)); GPR_ASSERT(emfd_); gpr_free(name); @@ -300,8 +300,7 @@ void GrpcUdpListener::OrphanFd() { grpc_schedule_on_exec_ctx); /* Because at this point, all listening sockets have been shutdown already, no * need to call OnFdAboutToOrphan() to notify the handler again. */ - grpc_fd_orphan(emfd_, &destroyed_closure_, nullptr, - false /* already_closed */, "udp_listener_shutdown"); + grpc_fd_orphan(emfd_, &destroyed_closure_, nullptr, "udp_listener_shutdown"); } void grpc_udp_server_destroy(grpc_udp_server* s, grpc_closure* on_done) { diff --git a/src/csharp/Grpc.Core.Testing/TestCalls.cs b/src/csharp/Grpc.Core.Testing/TestCalls.cs index ac29a8b974..8c76781bbe 100644 --- a/src/csharp/Grpc.Core.Testing/TestCalls.cs +++ b/src/csharp/Grpc.Core.Testing/TestCalls.cs @@ -65,7 +65,7 @@ namespace Grpc.Core.Testing /// Creates a test double for <c>AsyncDuplexStreamingCall</c>. Only for testing. /// Note: experimental API that can change or be removed without any prior notice. /// </summary> - public static AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TResponse, TRequest>( + public static AsyncDuplexStreamingCall<TRequest, TResponse> AsyncDuplexStreamingCall<TRequest, TResponse>( IClientStreamWriter<TRequest> requestStream, IAsyncStreamReader<TResponse> responseStream, Task<Metadata> responseHeadersAsync, Func<Status> getStatusFunc, Func<Metadata> getTrailersFunc, Action disposeAction) diff --git a/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj b/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj index 3ccc9adfaf..d2cc5bbc65 100755 --- a/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj +++ b/src/csharp/Grpc.Examples.Tests/Grpc.Examples.Tests.csproj @@ -13,11 +13,13 @@ <ItemGroup> <ProjectReference Include="../Grpc.Examples/Grpc.Examples.csproj" /> + <ProjectReference Include="../Grpc.Core.Testing/Grpc.Core.Testing.csproj" /> </ItemGroup> <ItemGroup> <PackageReference Include="NUnit" Version="3.6.0" /> <PackageReference Include="NUnitLite" Version="3.6.0" /> + <PackageReference Include="Moq" Version="4.8.2" /> </ItemGroup> <ItemGroup Condition=" '$(TargetFramework)' == 'net45' "> diff --git a/src/csharp/Grpc.Examples.Tests/MathClientMockableTest.cs b/src/csharp/Grpc.Examples.Tests/MathClientMockableTest.cs new file mode 100644 index 0000000000..6ed7b0a937 --- /dev/null +++ b/src/csharp/Grpc.Examples.Tests/MathClientMockableTest.cs @@ -0,0 +1,101 @@ +#region Copyright notice and license + +// Copyright 2018 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Testing; +using NUnit.Framework; + +namespace Math.Tests +{ + /// <summary> + /// Demonstrates how to mock method stubs for all method types in a generated client. + /// </summary> + public class MathClientMockableTest + { + [Test] + public void ClientBaseBlockingUnaryCallCanBeMocked() + { + var mockClient = new Moq.Mock<Math.MathClient>(); + + var expected = new DivReply(); + mockClient.Setup(m => m.Div(Moq.It.IsAny<DivArgs>(), null, null, CancellationToken.None)).Returns(expected); + Assert.AreSame(expected, mockClient.Object.Div(new DivArgs())); + } + + [Test] + public void ClientBaseBlockingUnaryCallWithCallOptionsCallCanBeMocked() + { + var mockClient = new Moq.Mock<Math.MathClient>(); + + var expected = new DivReply(); + mockClient.Setup(m => m.Div(Moq.It.IsAny<DivArgs>(), Moq.It.IsAny<CallOptions>())).Returns(expected); + Assert.AreSame(expected, mockClient.Object.Div(new DivArgs(), new CallOptions())); + } + + [Test] + public void ClientBaseAsyncUnaryCallCanBeMocked() + { + var mockClient = new Moq.Mock<Math.MathClient>(); + + // Use a factory method provided by Grpc.Core.Testing.TestCalls to create an instance of a call. + var fakeCall = TestCalls.AsyncUnaryCall<DivReply>(Task.FromResult(new DivReply()), Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { }); + mockClient.Setup(m => m.DivAsync(Moq.It.IsAny<DivArgs>(), null, null, CancellationToken.None)).Returns(fakeCall); + Assert.AreSame(fakeCall, mockClient.Object.DivAsync(new DivArgs())); + } + + [Test] + public void ClientBaseClientStreamingCallCanBeMocked() + { + var mockClient = new Moq.Mock<Math.MathClient>(); + var mockRequestStream = new Moq.Mock<IClientStreamWriter<Num>>(); + + // Use a factory method provided by Grpc.Core.Testing.TestCalls to create an instance of a call. + var fakeCall = TestCalls.AsyncClientStreamingCall<Num, Num>(mockRequestStream.Object, Task.FromResult(new Num()), Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { }); + mockClient.Setup(m => m.Sum(null, null, CancellationToken.None)).Returns(fakeCall); + Assert.AreSame(fakeCall, mockClient.Object.Sum()); + } + + [Test] + public void ClientBaseServerStreamingCallCanBeMocked() + { + var mockClient = new Moq.Mock<Math.MathClient>(); + var mockResponseStream = new Moq.Mock<IAsyncStreamReader<Num>>(); + + // Use a factory method provided by Grpc.Core.Testing.TestCalls to create an instance of a call. + var fakeCall = TestCalls.AsyncServerStreamingCall<Num>(mockResponseStream.Object, Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { }); + mockClient.Setup(m => m.Fib(Moq.It.IsAny<FibArgs>(), null, null, CancellationToken.None)).Returns(fakeCall); + Assert.AreSame(fakeCall, mockClient.Object.Fib(new FibArgs())); + } + + [Test] + public void ClientBaseDuplexStreamingCallCanBeMocked() + { + var mockClient = new Moq.Mock<Math.MathClient>(); + var mockRequestStream = new Moq.Mock<IClientStreamWriter<DivArgs>>(); + var mockResponseStream = new Moq.Mock<IAsyncStreamReader<DivReply>>(); + + // Use a factory method provided by Grpc.Core.Testing.TestCalls to create an instance of a call. + var fakeCall = TestCalls.AsyncDuplexStreamingCall<DivArgs, DivReply>(mockRequestStream.Object, mockResponseStream.Object, Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { }); + mockClient.Setup(m => m.DivMany(null, null, CancellationToken.None)).Returns(fakeCall); + Assert.AreSame(fakeCall, mockClient.Object.DivMany()); + } + } +} diff --git a/src/csharp/Grpc.Examples.Tests/MathServiceImplTestabilityTest.cs b/src/csharp/Grpc.Examples.Tests/MathServiceImplTestabilityTest.cs new file mode 100644 index 0000000000..7c5fb94408 --- /dev/null +++ b/src/csharp/Grpc.Examples.Tests/MathServiceImplTestabilityTest.cs @@ -0,0 +1,47 @@ +#region Copyright notice and license + +// Copyright 2018 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#endregion + +using System; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using Grpc.Core.Testing; +using Grpc.Core.Utils; +using NUnit.Framework; + +namespace Math.Tests +{ + /// <summary> + /// Demonstrates how to unit test implementations of generated server stubs. + /// </summary> + public class MathServiceImplTestabilityTest + { + [Test] + public async Task ServerCallImplIsTestable() + { + var mathImpl = new MathServiceImpl(); + + // Use a factory method provided by Grpc.Core.Testing.TestServerCallContext to create an instance of server call context. + // This allows testing even those server-side implementations that rely on the contents of ServerCallContext. + var fakeServerCallContext = TestServerCallContext.Create("fooMethod", null, DateTime.UtcNow.AddHours(1), new Metadata(), CancellationToken.None, "127.0.0.1", null, null, (metadata) => TaskUtils.CompletedTask, () => new WriteOptions(), (writeOptions) => { }); + var response = await mathImpl.Div(new DivArgs { Dividend = 10, Divisor = 2 }, fakeServerCallContext); + Assert.AreEqual(5, response.Quotient); + Assert.AreEqual(0, response.Remainder); + } + } +} diff --git a/src/csharp/Grpc.Examples/MathGrpc.cs b/src/csharp/Grpc.Examples/MathGrpc.cs index a4739da81d..9578bb4d81 100644 --- a/src/csharp/Grpc.Examples/MathGrpc.cs +++ b/src/csharp/Grpc.Examples/MathGrpc.cs @@ -27,38 +27,38 @@ namespace Math { { static readonly string __ServiceName = "math.Math"; - static readonly grpc::Marshaller<global::Math.DivArgs> __Marshaller_DivArgs = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Math.DivArgs.Parser.ParseFrom); - static readonly grpc::Marshaller<global::Math.DivReply> __Marshaller_DivReply = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Math.DivReply.Parser.ParseFrom); - static readonly grpc::Marshaller<global::Math.FibArgs> __Marshaller_FibArgs = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Math.FibArgs.Parser.ParseFrom); - static readonly grpc::Marshaller<global::Math.Num> __Marshaller_Num = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Math.Num.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Math.DivArgs> __Marshaller_math_DivArgs = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Math.DivArgs.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Math.DivReply> __Marshaller_math_DivReply = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Math.DivReply.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Math.FibArgs> __Marshaller_math_FibArgs = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Math.FibArgs.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Math.Num> __Marshaller_math_Num = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Math.Num.Parser.ParseFrom); static readonly grpc::Method<global::Math.DivArgs, global::Math.DivReply> __Method_Div = new grpc::Method<global::Math.DivArgs, global::Math.DivReply>( grpc::MethodType.Unary, __ServiceName, "Div", - __Marshaller_DivArgs, - __Marshaller_DivReply); + __Marshaller_math_DivArgs, + __Marshaller_math_DivReply); static readonly grpc::Method<global::Math.DivArgs, global::Math.DivReply> __Method_DivMany = new grpc::Method<global::Math.DivArgs, global::Math.DivReply>( grpc::MethodType.DuplexStreaming, __ServiceName, "DivMany", - __Marshaller_DivArgs, - __Marshaller_DivReply); + __Marshaller_math_DivArgs, + __Marshaller_math_DivReply); static readonly grpc::Method<global::Math.FibArgs, global::Math.Num> __Method_Fib = new grpc::Method<global::Math.FibArgs, global::Math.Num>( grpc::MethodType.ServerStreaming, __ServiceName, "Fib", - __Marshaller_FibArgs, - __Marshaller_Num); + __Marshaller_math_FibArgs, + __Marshaller_math_Num); static readonly grpc::Method<global::Math.Num, global::Math.Num> __Method_Sum = new grpc::Method<global::Math.Num, global::Math.Num>( grpc::MethodType.ClientStreaming, __ServiceName, "Sum", - __Marshaller_Num, - __Marshaller_Num); + __Marshaller_math_Num, + __Marshaller_math_Num); /// <summary>Service descriptor</summary> public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor diff --git a/src/csharp/Grpc.HealthCheck/HealthGrpc.cs b/src/csharp/Grpc.HealthCheck/HealthGrpc.cs index ebd890e48d..5e79c04d2a 100644 --- a/src/csharp/Grpc.HealthCheck/HealthGrpc.cs +++ b/src/csharp/Grpc.HealthCheck/HealthGrpc.cs @@ -30,15 +30,15 @@ namespace Grpc.Health.V1 { { static readonly string __ServiceName = "grpc.health.v1.Health"; - static readonly grpc::Marshaller<global::Grpc.Health.V1.HealthCheckRequest> __Marshaller_HealthCheckRequest = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Health.V1.HealthCheckRequest.Parser.ParseFrom); - static readonly grpc::Marshaller<global::Grpc.Health.V1.HealthCheckResponse> __Marshaller_HealthCheckResponse = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Health.V1.HealthCheckResponse.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Grpc.Health.V1.HealthCheckRequest> __Marshaller_grpc_health_v1_HealthCheckRequest = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Health.V1.HealthCheckRequest.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Grpc.Health.V1.HealthCheckResponse> __Marshaller_grpc_health_v1_HealthCheckResponse = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Health.V1.HealthCheckResponse.Parser.ParseFrom); static readonly grpc::Method<global::Grpc.Health.V1.HealthCheckRequest, global::Grpc.Health.V1.HealthCheckResponse> __Method_Check = new grpc::Method<global::Grpc.Health.V1.HealthCheckRequest, global::Grpc.Health.V1.HealthCheckResponse>( grpc::MethodType.Unary, __ServiceName, "Check", - __Marshaller_HealthCheckRequest, - __Marshaller_HealthCheckResponse); + __Marshaller_grpc_health_v1_HealthCheckRequest, + __Marshaller_grpc_health_v1_HealthCheckResponse); /// <summary>Service descriptor</summary> public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor diff --git a/src/csharp/Grpc.IntegrationTesting/BenchmarkServiceGrpc.cs b/src/csharp/Grpc.IntegrationTesting/BenchmarkServiceGrpc.cs index e2a4b93cef..b5738593f2 100644 --- a/src/csharp/Grpc.IntegrationTesting/BenchmarkServiceGrpc.cs +++ b/src/csharp/Grpc.IntegrationTesting/BenchmarkServiceGrpc.cs @@ -29,43 +29,43 @@ namespace Grpc.Testing { { static readonly string __ServiceName = "grpc.testing.BenchmarkService"; - static readonly grpc::Marshaller<global::Grpc.Testing.SimpleRequest> __Marshaller_SimpleRequest = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.SimpleRequest.Parser.ParseFrom); - static readonly grpc::Marshaller<global::Grpc.Testing.SimpleResponse> __Marshaller_SimpleResponse = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.SimpleResponse.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Grpc.Testing.SimpleRequest> __Marshaller_grpc_testing_SimpleRequest = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.SimpleRequest.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Grpc.Testing.SimpleResponse> __Marshaller_grpc_testing_SimpleResponse = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.SimpleResponse.Parser.ParseFrom); static readonly grpc::Method<global::Grpc.Testing.SimpleRequest, global::Grpc.Testing.SimpleResponse> __Method_UnaryCall = new grpc::Method<global::Grpc.Testing.SimpleRequest, global::Grpc.Testing.SimpleResponse>( grpc::MethodType.Unary, __ServiceName, "UnaryCall", - __Marshaller_SimpleRequest, - __Marshaller_SimpleResponse); + __Marshaller_grpc_testing_SimpleRequest, + __Marshaller_grpc_testing_SimpleResponse); static readonly grpc::Method<global::Grpc.Testing.SimpleRequest, global::Grpc.Testing.SimpleResponse> __Method_StreamingCall = new grpc::Method<global::Grpc.Testing.SimpleRequest, global::Grpc.Testing.SimpleResponse>( grpc::MethodType.DuplexStreaming, __ServiceName, "StreamingCall", - __Marshaller_SimpleRequest, - __Marshaller_SimpleResponse); + __Marshaller_grpc_testing_SimpleRequest, + __Marshaller_grpc_testing_SimpleResponse); static readonly grpc::Method<global::Grpc.Testing.SimpleRequest, global::Grpc.Testing.SimpleResponse> __Method_StreamingFromClient = new grpc::Method<global::Grpc.Testing.SimpleRequest, global::Grpc.Testing.SimpleResponse>( grpc::MethodType.ClientStreaming, __ServiceName, "StreamingFromClient", - __Marshaller_SimpleRequest, - __Marshaller_SimpleResponse); + __Marshaller_grpc_testing_SimpleRequest, + __Marshaller_grpc_testing_SimpleResponse); static readonly grpc::Method<global::Grpc.Testing.SimpleRequest, global::Grpc.Testing.SimpleResponse> __Method_StreamingFromServer = new grpc::Method<global::Grpc.Testing.SimpleRequest, global::Grpc.Testing.SimpleResponse>( grpc::MethodType.ServerStreaming, __ServiceName, "StreamingFromServer", - __Marshaller_SimpleRequest, - __Marshaller_SimpleResponse); + __Marshaller_grpc_testing_SimpleRequest, + __Marshaller_grpc_testing_SimpleResponse); static readonly grpc::Method<global::Grpc.Testing.SimpleRequest, global::Grpc.Testing.SimpleResponse> __Method_StreamingBothWays = new grpc::Method<global::Grpc.Testing.SimpleRequest, global::Grpc.Testing.SimpleResponse>( grpc::MethodType.DuplexStreaming, __ServiceName, "StreamingBothWays", - __Marshaller_SimpleRequest, - __Marshaller_SimpleResponse); + __Marshaller_grpc_testing_SimpleRequest, + __Marshaller_grpc_testing_SimpleResponse); /// <summary>Service descriptor</summary> public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor diff --git a/src/csharp/Grpc.IntegrationTesting/GeneratedClientTest.cs b/src/csharp/Grpc.IntegrationTesting/GeneratedClientTest.cs index 2462606879..c8bcb7a93f 100644 --- a/src/csharp/Grpc.IntegrationTesting/GeneratedClientTest.cs +++ b/src/csharp/Grpc.IntegrationTesting/GeneratedClientTest.cs @@ -34,29 +34,6 @@ namespace Grpc.IntegrationTesting TestService.TestServiceClient unimplementedClient = new UnimplementedTestServiceClient(); [Test] - public void ExpandedParamOverloadCanBeMocked() - { - var expected = new SimpleResponse(); - - var mockClient = new Moq.Mock<TestService.TestServiceClient>(); - // mocking is relatively clumsy because one needs to specify value for all the optional params. - mockClient.Setup(m => m.UnaryCall(Moq.It.IsAny<SimpleRequest>(), null, null, CancellationToken.None)).Returns(expected); - - Assert.AreSame(expected, mockClient.Object.UnaryCall(new SimpleRequest())); - } - - [Test] - public void CallOptionsOverloadCanBeMocked() - { - var expected = new SimpleResponse(); - - var mockClient = new Moq.Mock<TestService.TestServiceClient>(); - mockClient.Setup(m => m.UnaryCall(Moq.It.IsAny<SimpleRequest>(), Moq.It.IsAny<CallOptions>())).Returns(expected); - - Assert.AreSame(expected, mockClient.Object.UnaryCall(new SimpleRequest(), new CallOptions())); - } - - [Test] public void DefaultMethodStubThrows_UnaryCall() { Assert.Throws(typeof(NotImplementedException), () => unimplementedClient.UnaryCall(new SimpleRequest())); diff --git a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj index ba2107a576..e4f36d8810 100755 --- a/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj +++ b/src/csharp/Grpc.IntegrationTesting/Grpc.IntegrationTesting.csproj @@ -19,7 +19,6 @@ <ItemGroup> <PackageReference Include="Google.Protobuf" Version="$(GoogleProtobufVersion)" /> <PackageReference Include="CommandLineParser" Version="2.1.1-beta" /> - <PackageReference Include="Moq" Version="4.8.2" /> <PackageReference Include="NUnit" Version="3.6.0" /> <PackageReference Include="NUnitLite" Version="3.6.0" /> </ItemGroup> diff --git a/src/csharp/Grpc.IntegrationTesting/MetricsGrpc.cs b/src/csharp/Grpc.IntegrationTesting/MetricsGrpc.cs index e8c566e167..9f16f41ac1 100644 --- a/src/csharp/Grpc.IntegrationTesting/MetricsGrpc.cs +++ b/src/csharp/Grpc.IntegrationTesting/MetricsGrpc.cs @@ -33,23 +33,23 @@ namespace Grpc.Testing { { static readonly string __ServiceName = "grpc.testing.MetricsService"; - static readonly grpc::Marshaller<global::Grpc.Testing.EmptyMessage> __Marshaller_EmptyMessage = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.EmptyMessage.Parser.ParseFrom); - static readonly grpc::Marshaller<global::Grpc.Testing.GaugeResponse> __Marshaller_GaugeResponse = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.GaugeResponse.Parser.ParseFrom); - static readonly grpc::Marshaller<global::Grpc.Testing.GaugeRequest> __Marshaller_GaugeRequest = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.GaugeRequest.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Grpc.Testing.EmptyMessage> __Marshaller_grpc_testing_EmptyMessage = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.EmptyMessage.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Grpc.Testing.GaugeResponse> __Marshaller_grpc_testing_GaugeResponse = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.GaugeResponse.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Grpc.Testing.GaugeRequest> __Marshaller_grpc_testing_GaugeRequest = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.GaugeRequest.Parser.ParseFrom); static readonly grpc::Method<global::Grpc.Testing.EmptyMessage, global::Grpc.Testing.GaugeResponse> __Method_GetAllGauges = new grpc::Method<global::Grpc.Testing.EmptyMessage, global::Grpc.Testing.GaugeResponse>( grpc::MethodType.ServerStreaming, __ServiceName, "GetAllGauges", - __Marshaller_EmptyMessage, - __Marshaller_GaugeResponse); + __Marshaller_grpc_testing_EmptyMessage, + __Marshaller_grpc_testing_GaugeResponse); static readonly grpc::Method<global::Grpc.Testing.GaugeRequest, global::Grpc.Testing.GaugeResponse> __Method_GetGauge = new grpc::Method<global::Grpc.Testing.GaugeRequest, global::Grpc.Testing.GaugeResponse>( grpc::MethodType.Unary, __ServiceName, "GetGauge", - __Marshaller_GaugeRequest, - __Marshaller_GaugeResponse); + __Marshaller_grpc_testing_GaugeRequest, + __Marshaller_grpc_testing_GaugeResponse); /// <summary>Service descriptor</summary> public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor diff --git a/src/csharp/Grpc.IntegrationTesting/ReportQpsScenarioServiceGrpc.cs b/src/csharp/Grpc.IntegrationTesting/ReportQpsScenarioServiceGrpc.cs index 60a3890f21..1da0548cb4 100644 --- a/src/csharp/Grpc.IntegrationTesting/ReportQpsScenarioServiceGrpc.cs +++ b/src/csharp/Grpc.IntegrationTesting/ReportQpsScenarioServiceGrpc.cs @@ -29,15 +29,15 @@ namespace Grpc.Testing { { static readonly string __ServiceName = "grpc.testing.ReportQpsScenarioService"; - static readonly grpc::Marshaller<global::Grpc.Testing.ScenarioResult> __Marshaller_ScenarioResult = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.ScenarioResult.Parser.ParseFrom); - static readonly grpc::Marshaller<global::Grpc.Testing.Void> __Marshaller_Void = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.Void.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Grpc.Testing.ScenarioResult> __Marshaller_grpc_testing_ScenarioResult = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.ScenarioResult.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Grpc.Testing.Void> __Marshaller_grpc_testing_Void = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.Void.Parser.ParseFrom); static readonly grpc::Method<global::Grpc.Testing.ScenarioResult, global::Grpc.Testing.Void> __Method_ReportScenario = new grpc::Method<global::Grpc.Testing.ScenarioResult, global::Grpc.Testing.Void>( grpc::MethodType.Unary, __ServiceName, "ReportScenario", - __Marshaller_ScenarioResult, - __Marshaller_Void); + __Marshaller_grpc_testing_ScenarioResult, + __Marshaller_grpc_testing_Void); /// <summary>Service descriptor</summary> public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor diff --git a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs index aec4ce7be7..2176916b43 100644 --- a/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs +++ b/src/csharp/Grpc.IntegrationTesting/TestGrpc.cs @@ -34,69 +34,69 @@ namespace Grpc.Testing { { static readonly string __ServiceName = "grpc.testing.TestService"; - static readonly grpc::Marshaller<global::Grpc.Testing.Empty> __Marshaller_Empty = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.Empty.Parser.ParseFrom); - static readonly grpc::Marshaller<global::Grpc.Testing.SimpleRequest> __Marshaller_SimpleRequest = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.SimpleRequest.Parser.ParseFrom); - static readonly grpc::Marshaller<global::Grpc.Testing.SimpleResponse> __Marshaller_SimpleResponse = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.SimpleResponse.Parser.ParseFrom); - static readonly grpc::Marshaller<global::Grpc.Testing.StreamingOutputCallRequest> __Marshaller_StreamingOutputCallRequest = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.StreamingOutputCallRequest.Parser.ParseFrom); - static readonly grpc::Marshaller<global::Grpc.Testing.StreamingOutputCallResponse> __Marshaller_StreamingOutputCallResponse = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.StreamingOutputCallResponse.Parser.ParseFrom); - static readonly grpc::Marshaller<global::Grpc.Testing.StreamingInputCallRequest> __Marshaller_StreamingInputCallRequest = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.StreamingInputCallRequest.Parser.ParseFrom); - static readonly grpc::Marshaller<global::Grpc.Testing.StreamingInputCallResponse> __Marshaller_StreamingInputCallResponse = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.StreamingInputCallResponse.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Grpc.Testing.Empty> __Marshaller_grpc_testing_Empty = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.Empty.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Grpc.Testing.SimpleRequest> __Marshaller_grpc_testing_SimpleRequest = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.SimpleRequest.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Grpc.Testing.SimpleResponse> __Marshaller_grpc_testing_SimpleResponse = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.SimpleResponse.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Grpc.Testing.StreamingOutputCallRequest> __Marshaller_grpc_testing_StreamingOutputCallRequest = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.StreamingOutputCallRequest.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Grpc.Testing.StreamingOutputCallResponse> __Marshaller_grpc_testing_StreamingOutputCallResponse = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.StreamingOutputCallResponse.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Grpc.Testing.StreamingInputCallRequest> __Marshaller_grpc_testing_StreamingInputCallRequest = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.StreamingInputCallRequest.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Grpc.Testing.StreamingInputCallResponse> __Marshaller_grpc_testing_StreamingInputCallResponse = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.StreamingInputCallResponse.Parser.ParseFrom); static readonly grpc::Method<global::Grpc.Testing.Empty, global::Grpc.Testing.Empty> __Method_EmptyCall = new grpc::Method<global::Grpc.Testing.Empty, global::Grpc.Testing.Empty>( grpc::MethodType.Unary, __ServiceName, "EmptyCall", - __Marshaller_Empty, - __Marshaller_Empty); + __Marshaller_grpc_testing_Empty, + __Marshaller_grpc_testing_Empty); static readonly grpc::Method<global::Grpc.Testing.SimpleRequest, global::Grpc.Testing.SimpleResponse> __Method_UnaryCall = new grpc::Method<global::Grpc.Testing.SimpleRequest, global::Grpc.Testing.SimpleResponse>( grpc::MethodType.Unary, __ServiceName, "UnaryCall", - __Marshaller_SimpleRequest, - __Marshaller_SimpleResponse); + __Marshaller_grpc_testing_SimpleRequest, + __Marshaller_grpc_testing_SimpleResponse); static readonly grpc::Method<global::Grpc.Testing.SimpleRequest, global::Grpc.Testing.SimpleResponse> __Method_CacheableUnaryCall = new grpc::Method<global::Grpc.Testing.SimpleRequest, global::Grpc.Testing.SimpleResponse>( grpc::MethodType.Unary, __ServiceName, "CacheableUnaryCall", - __Marshaller_SimpleRequest, - __Marshaller_SimpleResponse); + __Marshaller_grpc_testing_SimpleRequest, + __Marshaller_grpc_testing_SimpleResponse); static readonly grpc::Method<global::Grpc.Testing.StreamingOutputCallRequest, global::Grpc.Testing.StreamingOutputCallResponse> __Method_StreamingOutputCall = new grpc::Method<global::Grpc.Testing.StreamingOutputCallRequest, global::Grpc.Testing.StreamingOutputCallResponse>( grpc::MethodType.ServerStreaming, __ServiceName, "StreamingOutputCall", - __Marshaller_StreamingOutputCallRequest, - __Marshaller_StreamingOutputCallResponse); + __Marshaller_grpc_testing_StreamingOutputCallRequest, + __Marshaller_grpc_testing_StreamingOutputCallResponse); static readonly grpc::Method<global::Grpc.Testing.StreamingInputCallRequest, global::Grpc.Testing.StreamingInputCallResponse> __Method_StreamingInputCall = new grpc::Method<global::Grpc.Testing.StreamingInputCallRequest, global::Grpc.Testing.StreamingInputCallResponse>( grpc::MethodType.ClientStreaming, __ServiceName, "StreamingInputCall", - __Marshaller_StreamingInputCallRequest, - __Marshaller_StreamingInputCallResponse); + __Marshaller_grpc_testing_StreamingInputCallRequest, + __Marshaller_grpc_testing_StreamingInputCallResponse); static readonly grpc::Method<global::Grpc.Testing.StreamingOutputCallRequest, global::Grpc.Testing.StreamingOutputCallResponse> __Method_FullDuplexCall = new grpc::Method<global::Grpc.Testing.StreamingOutputCallRequest, global::Grpc.Testing.StreamingOutputCallResponse>( grpc::MethodType.DuplexStreaming, __ServiceName, "FullDuplexCall", - __Marshaller_StreamingOutputCallRequest, - __Marshaller_StreamingOutputCallResponse); + __Marshaller_grpc_testing_StreamingOutputCallRequest, + __Marshaller_grpc_testing_StreamingOutputCallResponse); static readonly grpc::Method<global::Grpc.Testing.StreamingOutputCallRequest, global::Grpc.Testing.StreamingOutputCallResponse> __Method_HalfDuplexCall = new grpc::Method<global::Grpc.Testing.StreamingOutputCallRequest, global::Grpc.Testing.StreamingOutputCallResponse>( grpc::MethodType.DuplexStreaming, __ServiceName, "HalfDuplexCall", - __Marshaller_StreamingOutputCallRequest, - __Marshaller_StreamingOutputCallResponse); + __Marshaller_grpc_testing_StreamingOutputCallRequest, + __Marshaller_grpc_testing_StreamingOutputCallResponse); static readonly grpc::Method<global::Grpc.Testing.Empty, global::Grpc.Testing.Empty> __Method_UnimplementedCall = new grpc::Method<global::Grpc.Testing.Empty, global::Grpc.Testing.Empty>( grpc::MethodType.Unary, __ServiceName, "UnimplementedCall", - __Marshaller_Empty, - __Marshaller_Empty); + __Marshaller_grpc_testing_Empty, + __Marshaller_grpc_testing_Empty); /// <summary>Service descriptor</summary> public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor @@ -548,14 +548,14 @@ namespace Grpc.Testing { { static readonly string __ServiceName = "grpc.testing.UnimplementedService"; - static readonly grpc::Marshaller<global::Grpc.Testing.Empty> __Marshaller_Empty = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.Empty.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Grpc.Testing.Empty> __Marshaller_grpc_testing_Empty = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.Empty.Parser.ParseFrom); static readonly grpc::Method<global::Grpc.Testing.Empty, global::Grpc.Testing.Empty> __Method_UnimplementedCall = new grpc::Method<global::Grpc.Testing.Empty, global::Grpc.Testing.Empty>( grpc::MethodType.Unary, __ServiceName, "UnimplementedCall", - __Marshaller_Empty, - __Marshaller_Empty); + __Marshaller_grpc_testing_Empty, + __Marshaller_grpc_testing_Empty); /// <summary>Service descriptor</summary> public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor @@ -669,23 +669,23 @@ namespace Grpc.Testing { { static readonly string __ServiceName = "grpc.testing.ReconnectService"; - static readonly grpc::Marshaller<global::Grpc.Testing.ReconnectParams> __Marshaller_ReconnectParams = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.ReconnectParams.Parser.ParseFrom); - static readonly grpc::Marshaller<global::Grpc.Testing.Empty> __Marshaller_Empty = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.Empty.Parser.ParseFrom); - static readonly grpc::Marshaller<global::Grpc.Testing.ReconnectInfo> __Marshaller_ReconnectInfo = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.ReconnectInfo.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Grpc.Testing.ReconnectParams> __Marshaller_grpc_testing_ReconnectParams = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.ReconnectParams.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Grpc.Testing.Empty> __Marshaller_grpc_testing_Empty = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.Empty.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Grpc.Testing.ReconnectInfo> __Marshaller_grpc_testing_ReconnectInfo = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.ReconnectInfo.Parser.ParseFrom); static readonly grpc::Method<global::Grpc.Testing.ReconnectParams, global::Grpc.Testing.Empty> __Method_Start = new grpc::Method<global::Grpc.Testing.ReconnectParams, global::Grpc.Testing.Empty>( grpc::MethodType.Unary, __ServiceName, "Start", - __Marshaller_ReconnectParams, - __Marshaller_Empty); + __Marshaller_grpc_testing_ReconnectParams, + __Marshaller_grpc_testing_Empty); static readonly grpc::Method<global::Grpc.Testing.Empty, global::Grpc.Testing.ReconnectInfo> __Method_Stop = new grpc::Method<global::Grpc.Testing.Empty, global::Grpc.Testing.ReconnectInfo>( grpc::MethodType.Unary, __ServiceName, "Stop", - __Marshaller_Empty, - __Marshaller_ReconnectInfo); + __Marshaller_grpc_testing_Empty, + __Marshaller_grpc_testing_ReconnectInfo); /// <summary>Service descriptor</summary> public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor diff --git a/src/csharp/Grpc.IntegrationTesting/WorkerServiceGrpc.cs b/src/csharp/Grpc.IntegrationTesting/WorkerServiceGrpc.cs index 85f2cfd871..b9e8f91231 100644 --- a/src/csharp/Grpc.IntegrationTesting/WorkerServiceGrpc.cs +++ b/src/csharp/Grpc.IntegrationTesting/WorkerServiceGrpc.cs @@ -29,41 +29,41 @@ namespace Grpc.Testing { { static readonly string __ServiceName = "grpc.testing.WorkerService"; - static readonly grpc::Marshaller<global::Grpc.Testing.ServerArgs> __Marshaller_ServerArgs = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.ServerArgs.Parser.ParseFrom); - static readonly grpc::Marshaller<global::Grpc.Testing.ServerStatus> __Marshaller_ServerStatus = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.ServerStatus.Parser.ParseFrom); - static readonly grpc::Marshaller<global::Grpc.Testing.ClientArgs> __Marshaller_ClientArgs = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.ClientArgs.Parser.ParseFrom); - static readonly grpc::Marshaller<global::Grpc.Testing.ClientStatus> __Marshaller_ClientStatus = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.ClientStatus.Parser.ParseFrom); - static readonly grpc::Marshaller<global::Grpc.Testing.CoreRequest> __Marshaller_CoreRequest = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.CoreRequest.Parser.ParseFrom); - static readonly grpc::Marshaller<global::Grpc.Testing.CoreResponse> __Marshaller_CoreResponse = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.CoreResponse.Parser.ParseFrom); - static readonly grpc::Marshaller<global::Grpc.Testing.Void> __Marshaller_Void = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.Void.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Grpc.Testing.ServerArgs> __Marshaller_grpc_testing_ServerArgs = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.ServerArgs.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Grpc.Testing.ServerStatus> __Marshaller_grpc_testing_ServerStatus = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.ServerStatus.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Grpc.Testing.ClientArgs> __Marshaller_grpc_testing_ClientArgs = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.ClientArgs.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Grpc.Testing.ClientStatus> __Marshaller_grpc_testing_ClientStatus = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.ClientStatus.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Grpc.Testing.CoreRequest> __Marshaller_grpc_testing_CoreRequest = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.CoreRequest.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Grpc.Testing.CoreResponse> __Marshaller_grpc_testing_CoreResponse = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.CoreResponse.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Grpc.Testing.Void> __Marshaller_grpc_testing_Void = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Testing.Void.Parser.ParseFrom); static readonly grpc::Method<global::Grpc.Testing.ServerArgs, global::Grpc.Testing.ServerStatus> __Method_RunServer = new grpc::Method<global::Grpc.Testing.ServerArgs, global::Grpc.Testing.ServerStatus>( grpc::MethodType.DuplexStreaming, __ServiceName, "RunServer", - __Marshaller_ServerArgs, - __Marshaller_ServerStatus); + __Marshaller_grpc_testing_ServerArgs, + __Marshaller_grpc_testing_ServerStatus); static readonly grpc::Method<global::Grpc.Testing.ClientArgs, global::Grpc.Testing.ClientStatus> __Method_RunClient = new grpc::Method<global::Grpc.Testing.ClientArgs, global::Grpc.Testing.ClientStatus>( grpc::MethodType.DuplexStreaming, __ServiceName, "RunClient", - __Marshaller_ClientArgs, - __Marshaller_ClientStatus); + __Marshaller_grpc_testing_ClientArgs, + __Marshaller_grpc_testing_ClientStatus); static readonly grpc::Method<global::Grpc.Testing.CoreRequest, global::Grpc.Testing.CoreResponse> __Method_CoreCount = new grpc::Method<global::Grpc.Testing.CoreRequest, global::Grpc.Testing.CoreResponse>( grpc::MethodType.Unary, __ServiceName, "CoreCount", - __Marshaller_CoreRequest, - __Marshaller_CoreResponse); + __Marshaller_grpc_testing_CoreRequest, + __Marshaller_grpc_testing_CoreResponse); static readonly grpc::Method<global::Grpc.Testing.Void, global::Grpc.Testing.Void> __Method_QuitWorker = new grpc::Method<global::Grpc.Testing.Void, global::Grpc.Testing.Void>( grpc::MethodType.Unary, __ServiceName, "QuitWorker", - __Marshaller_Void, - __Marshaller_Void); + __Marshaller_grpc_testing_Void, + __Marshaller_grpc_testing_Void); /// <summary>Service descriptor</summary> public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor diff --git a/src/csharp/Grpc.Reflection/ReflectionGrpc.cs b/src/csharp/Grpc.Reflection/ReflectionGrpc.cs index 387c9fb52f..c00075b7c6 100644 --- a/src/csharp/Grpc.Reflection/ReflectionGrpc.cs +++ b/src/csharp/Grpc.Reflection/ReflectionGrpc.cs @@ -29,15 +29,15 @@ namespace Grpc.Reflection.V1Alpha { { static readonly string __ServiceName = "grpc.reflection.v1alpha.ServerReflection"; - static readonly grpc::Marshaller<global::Grpc.Reflection.V1Alpha.ServerReflectionRequest> __Marshaller_ServerReflectionRequest = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Reflection.V1Alpha.ServerReflectionRequest.Parser.ParseFrom); - static readonly grpc::Marshaller<global::Grpc.Reflection.V1Alpha.ServerReflectionResponse> __Marshaller_ServerReflectionResponse = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Reflection.V1Alpha.ServerReflectionResponse.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Grpc.Reflection.V1Alpha.ServerReflectionRequest> __Marshaller_grpc_reflection_v1alpha_ServerReflectionRequest = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Reflection.V1Alpha.ServerReflectionRequest.Parser.ParseFrom); + static readonly grpc::Marshaller<global::Grpc.Reflection.V1Alpha.ServerReflectionResponse> __Marshaller_grpc_reflection_v1alpha_ServerReflectionResponse = grpc::Marshallers.Create((arg) => global::Google.Protobuf.MessageExtensions.ToByteArray(arg), global::Grpc.Reflection.V1Alpha.ServerReflectionResponse.Parser.ParseFrom); static readonly grpc::Method<global::Grpc.Reflection.V1Alpha.ServerReflectionRequest, global::Grpc.Reflection.V1Alpha.ServerReflectionResponse> __Method_ServerReflectionInfo = new grpc::Method<global::Grpc.Reflection.V1Alpha.ServerReflectionRequest, global::Grpc.Reflection.V1Alpha.ServerReflectionResponse>( grpc::MethodType.DuplexStreaming, __ServiceName, "ServerReflectionInfo", - __Marshaller_ServerReflectionRequest, - __Marshaller_ServerReflectionResponse); + __Marshaller_grpc_reflection_v1alpha_ServerReflectionRequest, + __Marshaller_grpc_reflection_v1alpha_ServerReflectionResponse); /// <summary>Service descriptor</summary> public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor diff --git a/src/csharp/tests.json b/src/csharp/tests.json index 60f67ff3c9..c2f243fe0a 100644 --- a/src/csharp/tests.json +++ b/src/csharp/tests.json @@ -41,7 +41,9 @@ "Grpc.Core.Tests.UserAgentStringTest" ], "Grpc.Examples.Tests": [ - "Math.Tests.MathClientServerTest" + "Math.Tests.MathClientMockableTest", + "Math.Tests.MathClientServerTest", + "Math.Tests.MathServiceImplTestabilityTest" ], "Grpc.HealthCheck.Tests": [ "Grpc.HealthCheck.Tests.HealthClientServerTest", diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index 5b48d06158..9783b06440 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -206,8 +206,9 @@ static NSString *const kBearerPrefix = @"Bearer "; } else { [_responseWriteable enqueueSuccessfulCompletion]; } - +#ifndef GRPC_CFSTREAM [GRPCConnectivityMonitor unregisterObserver:self]; +#endif // If the call isn't retained anywhere else, it can be deallocated now. _retainSelf = nil; @@ -462,7 +463,9 @@ static NSString *const kBearerPrefix = @"Bearer "; [self sendHeaders:_requestHeaders]; [self invokeCall]; +#ifndef GRPC_CFSTREAM [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChanged:)]; +#endif } - (void)startWithWriteable:(id<GRXWriteable>)writeable { diff --git a/src/objective-c/GRPCClient/private/GRPCHost.m b/src/objective-c/GRPCClient/private/GRPCHost.m index bd5fd94118..f4b933751f 100644 --- a/src/objective-c/GRPCClient/private/GRPCHost.m +++ b/src/objective-c/GRPCClient/private/GRPCHost.m @@ -49,7 +49,9 @@ static NSMutableDictionary *kHostCache; if (_channelCreds != nil) { grpc_channel_credentials_release(_channelCreds); } +#ifndef GRPC_CFSTREAM [GRPCConnectivityMonitor unregisterObserver:self]; +#endif } // Default initializer. @@ -84,7 +86,9 @@ static NSMutableDictionary *kHostCache; kHostCache[address] = self; _compressAlgorithm = GRPC_COMPRESS_NONE; } +#ifndef GRPC_CFSTREAM [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChange:)]; +#endif } return self; } @@ -125,6 +129,14 @@ static NSMutableDictionary *kHostCache; completionQueue:queue]; } +- (NSData *)nullTerminatedDataWithString:(NSString *)string { + // dataUsingEncoding: does not return a null-terminated string. + NSData *data = [string dataUsingEncoding:NSASCIIStringEncoding allowLossyConversion:YES]; + NSMutableData *nullTerminated = [NSMutableData dataWithData:data]; + [nullTerminated appendBytes:"\0" length:1]; + return nullTerminated; +} + - (BOOL)setTLSPEMRootCerts:(nullable NSString *)pemRootCerts withPrivateKey:(nullable NSString *)pemPrivateKey withCertChain:(nullable NSString *)pemCertChain @@ -146,13 +158,12 @@ static NSMutableDictionary *kHostCache; kDefaultRootsError = error; return; } - kDefaultRootsASCII = - [contentInUTF8 dataUsingEncoding:NSASCIIStringEncoding allowLossyConversion:YES]; + kDefaultRootsASCII = [self nullTerminatedDataWithString:contentInUTF8]; }); NSData *rootsASCII; if (pemRootCerts != nil) { - rootsASCII = [pemRootCerts dataUsingEncoding:NSASCIIStringEncoding allowLossyConversion:YES]; + rootsASCII = [self nullTerminatedDataWithString:pemRootCerts]; } else { if (kDefaultRootsASCII == nil) { if (errorPtr) { @@ -175,10 +186,8 @@ static NSMutableDictionary *kHostCache; creds = grpc_ssl_credentials_create(rootsASCII.bytes, NULL, NULL); } else { grpc_ssl_pem_key_cert_pair key_cert_pair; - NSData *privateKeyASCII = - [pemPrivateKey dataUsingEncoding:NSASCIIStringEncoding allowLossyConversion:YES]; - NSData *certChainASCII = - [pemCertChain dataUsingEncoding:NSASCIIStringEncoding allowLossyConversion:YES]; + NSData *privateKeyASCII = [self nullTerminatedDataWithString:pemPrivateKey]; + NSData *certChainASCII = [self nullTerminatedDataWithString:pemCertChain]; key_cert_pair.private_key = privateKeyASCII.bytes; key_cert_pair.cert_chain = certChainASCII.bytes; creds = grpc_ssl_credentials_create(rootsASCII.bytes, &key_cert_pair, NULL); diff --git a/src/objective-c/examples/Sample/Sample.xcodeproj/project.pbxproj b/src/objective-c/examples/Sample/Sample.xcodeproj/project.pbxproj index ab7159cda2..cdd1c6c8f7 100644 --- a/src/objective-c/examples/Sample/Sample.xcodeproj/project.pbxproj +++ b/src/objective-c/examples/Sample/Sample.xcodeproj/project.pbxproj @@ -325,6 +325,7 @@ buildSettings = { ASSETCATALOG_COMPILER_APPICON_NAME = AppIcon; INFOPLIST_FILE = Sample/Info.plist; + LD_GENERATE_MAP_FILE = YES; LD_RUNPATH_SEARCH_PATHS = "$(inherited) @executable_path/Frameworks"; PRODUCT_BUNDLE_IDENTIFIER = "org.grpc.$(PRODUCT_NAME:rfc1034identifier)"; PRODUCT_NAME = "$(TARGET_NAME)"; @@ -337,6 +338,7 @@ buildSettings = { ASSETCATALOG_COMPILER_APPICON_NAME = AppIcon; INFOPLIST_FILE = Sample/Info.plist; + LD_GENERATE_MAP_FILE = YES; LD_RUNPATH_SEARCH_PATHS = "$(inherited) @executable_path/Frameworks"; PRODUCT_BUNDLE_IDENTIFIER = "org.grpc.$(PRODUCT_NAME:rfc1034identifier)"; PRODUCT_NAME = "$(TARGET_NAME)"; diff --git a/src/objective-c/examples/Sample/Sample.xcodeproj/xcshareddata/xcschemes/Sample.xcscheme b/src/objective-c/examples/Sample/Sample.xcodeproj/xcshareddata/xcschemes/Sample.xcscheme index d399e22e46..e356ea22a6 100644 --- a/src/objective-c/examples/Sample/Sample.xcodeproj/xcshareddata/xcschemes/Sample.xcscheme +++ b/src/objective-c/examples/Sample/Sample.xcodeproj/xcshareddata/xcschemes/Sample.xcscheme @@ -42,7 +42,7 @@ </AdditionalOptions> </TestAction> <LaunchAction - buildConfiguration = "Debug" + buildConfiguration = "Release" selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB" selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB" launchStyle = "0" diff --git a/src/objective-c/tests/Connectivity/ConnectivityTestingApp.xcodeproj/project.pbxproj b/src/objective-c/tests/Connectivity/ConnectivityTestingApp.xcodeproj/project.pbxproj index 6a4c3519d6..7a03f9b41a 100644 --- a/src/objective-c/tests/Connectivity/ConnectivityTestingApp.xcodeproj/project.pbxproj +++ b/src/objective-c/tests/Connectivity/ConnectivityTestingApp.xcodeproj/project.pbxproj @@ -99,7 +99,6 @@ 5EC49F8D2043E46B00ED189A /* Sources */, 5EC49F8E2043E46B00ED189A /* Frameworks */, 5EC49F8F2043E46B00ED189A /* Resources */, - 9F67C72B6B6BAF2781078886 /* [CP] Embed Pods Frameworks */, 735516C793AF7394FBB83B7F /* [CP] Copy Pods Resources */, ); buildRules = ( @@ -194,21 +193,6 @@ shellScript = "diff \"${PODS_PODFILE_DIR_PATH}/Podfile.lock\" \"${PODS_ROOT}/Manifest.lock\" > /dev/null\nif [ $? != 0 ] ; then\n # print error to STDERR\n echo \"error: The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.\" >&2\n exit 1\nfi\n# This output is used by Xcode 'outputs' to avoid re-running this script phase.\necho \"SUCCESS\" > \"${SCRIPT_OUTPUT_FILE_0}\"\n"; showEnvVarsInLog = 0; }; - 9F67C72B6B6BAF2781078886 /* [CP] Embed Pods Frameworks */ = { - isa = PBXShellScriptBuildPhase; - buildActionMask = 2147483647; - files = ( - ); - inputPaths = ( - ); - name = "[CP] Embed Pods Frameworks"; - outputPaths = ( - ); - runOnlyForDeploymentPostprocessing = 0; - shellPath = /bin/sh; - shellScript = "\"${SRCROOT}/Pods/Target Support Files/Pods-ConnectivityTestingApp/Pods-ConnectivityTestingApp-frameworks.sh\"\n"; - showEnvVarsInLog = 0; - }; /* End PBXShellScriptBuildPhase section */ /* Begin PBXSourcesBuildPhase section */ @@ -284,6 +268,7 @@ GCC_PREPROCESSOR_DEFINITIONS = ( "DEBUG=1", "$(inherited)", + "GRPC_CFSTREAM=1", ); GCC_WARN_64_TO_32_BIT_CONVERSION = YES; GCC_WARN_ABOUT_RETURN_TYPE = YES_ERROR; diff --git a/src/objective-c/tests/Connectivity/ConnectivityTestingApp/ViewController.m b/src/objective-c/tests/Connectivity/ConnectivityTestingApp/ViewController.m index 88780e39f0..3c77fe2f2c 100644 --- a/src/objective-c/tests/Connectivity/ConnectivityTestingApp/ViewController.m +++ b/src/objective-c/tests/Connectivity/ConnectivityTestingApp/ViewController.m @@ -35,7 +35,9 @@ NSString *host = @"grpc-test.sandbox.googleapis.com"; - (void)viewDidLoad { [super viewDidLoad]; +#ifndef GRPC_CFSTREAM [GRPCConnectivityMonitor registerObserver:self selector:@selector(reachabilityChanged:)]; +#endif } - (void)reachabilityChanged:(NSNotification *)note { diff --git a/src/objective-c/tests/Connectivity/Podfile b/src/objective-c/tests/Connectivity/Podfile index cdbc6dde59..c7127b3e78 100644 --- a/src/objective-c/tests/Connectivity/Podfile +++ b/src/objective-c/tests/Connectivity/Podfile @@ -5,9 +5,9 @@ platform :ios, '8.0' GRPC_LOCAL_SRC = '../../../..' target 'ConnectivityTestingApp' do - pod 'gRPC', :path => GRPC_LOCAL_SRC - pod 'gRPC-Core', :path => GRPC_LOCAL_SRC - pod 'gRPC-ProtoRPC', :path => GRPC_LOCAL_SRC + pod 'gRPC/CFStream', :path => GRPC_LOCAL_SRC + pod 'gRPC-Core/CFStream-Implementation', :path => GRPC_LOCAL_SRC + pod 'gRPC-ProtoRPC/CFStream', :path => GRPC_LOCAL_SRC pod 'gRPC-RxLibrary', :path => GRPC_LOCAL_SRC pod 'Protobuf', :path => "#{GRPC_LOCAL_SRC}/third_party/protobuf" pod 'BoringSSL', :podspec => "#{GRPC_LOCAL_SRC}/src/objective-c" diff --git a/src/objective-c/tests/GRPCClientTests.m b/src/objective-c/tests/GRPCClientTests.m index 5cd0231db7..d9186561c3 100644 --- a/src/objective-c/tests/GRPCClientTests.m +++ b/src/objective-c/tests/GRPCClientTests.m @@ -525,7 +525,8 @@ static GRPCProtoMethod *kFullDuplexCallMethod; - (void)testErrorCode { int port = [self findFreePort]; NSString *const kDummyAddress = [NSString stringWithFormat:@"localhost:%d", port]; - __weak XCTestExpectation *completion = [self expectationWithDescription:@"Empty RPC completed."]; + __weak XCTestExpectation *completion = + [self expectationWithDescription:@"Received correct error code."]; GRPCCall *call = [[GRPCCall alloc] initWithHost:kDummyAddress path:kEmptyCallMethod.HTTPPath diff --git a/src/objective-c/tests/analyze_link_map.py b/src/objective-c/tests/analyze_link_map.py deleted file mode 100755 index 48e3441087..0000000000 --- a/src/objective-c/tests/analyze_link_map.py +++ /dev/null @@ -1,78 +0,0 @@ -#!/usr/bin/python -# Copyright 2018 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# This script analyzes link map file generated by Xcode. It calculates and -# prints out the sizes of each dependent library and the total sizes of the -# symbols. -# The script takes one parameter, which is the path to the link map file. - -import sys -import re - -table_tag = {} -state = "start" - -table_stats_symbol = {} -table_stats_dead = {} -section_total_size = 0 -symbol_total_size = 0 - - -file_import = sys.argv[1] -lines = list(open(file_import)) -for line in lines: - line_stripped = line[:-1] - if "# Object files:" == line_stripped: - state = "object" - continue - elif "# Sections:" == line_stripped: - state = "section" - continue - elif "# Symbols:" == line_stripped: - state = "symbol" - continue - elif "# Dead Stripped Symbols:" == line_stripped: - state = "dead" - continue - - if state == "object": - segs = re.search('(\[ *[0-9]*\]) (.*)', line_stripped) - table_tag[segs.group(1)] = segs.group(2) - - if state == "section": - if len(line_stripped) == 0 or line_stripped[0] == '#': - continue - segs = re.search('^(.+?)\s+(.+?)\s+.*', line_stripped) - section_total_size += int(segs.group(2), 16) - - if state == "symbol": - if len(line_stripped) == 0 or line_stripped[0] == '#': - continue - segs = re.search('^.+?\s+(.+?)\s+(\[.+?\]).*', line_stripped) - target = table_tag[segs.group(2)] - target_stripped = re.search('^(.*?)(\(.+?\))?$', target).group(1) - size = int(segs.group(1), 16) - if not target_stripped in table_stats_symbol: - table_stats_symbol[target_stripped] = 0 - table_stats_symbol[target_stripped] += size - -print("Sections total size: %d" % section_total_size) - -for target in table_stats_symbol: - print(target) - print(table_stats_symbol[target]) - symbol_total_size += table_stats_symbol[target] - -print("Symbols total size: %d" % symbol_total_size) diff --git a/src/objective-c/tests/build_one_example.sh b/src/objective-c/tests/build_one_example.sh index 985d55f3cc..1eace541e6 100755 --- a/src/objective-c/tests/build_one_example.sh +++ b/src/objective-c/tests/build_one_example.sh @@ -42,6 +42,9 @@ xcodebuild \ build \ -workspace *.xcworkspace \ -scheme $SCHEME \ - -destination name="iPhone 6" \ + -destination generic/platform=iOS \ + -derivedDataPath Build \ + CODE_SIGN_IDENTITY="" \ + CODE_SIGNING_REQUIRED=NO \ | egrep -v "$XCODEBUILD_FILTER" \ | egrep -v "^$" - diff --git a/src/proto/grpc/testing/messages.proto b/src/proto/grpc/testing/messages.proto index 0b8c15711e..7b1b7286dc 100644 --- a/src/proto/grpc/testing/messages.proto +++ b/src/proto/grpc/testing/messages.proto @@ -27,7 +27,6 @@ message BoolValue { bool value = 1; } -// DEPRECATED, don't use. To be removed shortly. // The type of payload that should be returned. enum PayloadType { // Compressable text format. @@ -36,7 +35,6 @@ enum PayloadType { // A block of data, to simply increase gRPC message size. message Payload { - // DEPRECATED, don't use. To be removed shortly. // The type of data in body. PayloadType type = 1; // Primary contents of payload. @@ -52,7 +50,6 @@ message EchoStatus { // Unary request. message SimpleRequest { - // DEPRECATED, don't use. To be removed shortly. // Desired payload type in the response from the server. // If response_type is RANDOM, server randomly chooses one from other formats. PayloadType response_type = 1; @@ -131,7 +128,6 @@ message ResponseParameters { // Server-streaming request. message StreamingOutputCallRequest { - // DEPRECATED, don't use. To be removed shortly. // Desired payload type in the response from the server. // If response_type is RANDOM, the payload from each response in the stream // might be of different types. This is to simulate a mixed type of payload diff --git a/src/python/grpcio/grpc/__init__.py b/src/python/grpcio/grpc/__init__.py index 0f31119467..996d075446 100644 --- a/src/python/grpcio/grpc/__init__.py +++ b/src/python/grpcio/grpc/__init__.py @@ -1482,7 +1482,7 @@ def ssl_server_credentials(private_key_certificate_chain_pairs, A ServerCredentials for use with an SSL-enabled Server. Typically, this object is an argument to add_secure_port() method during server setup. """ - if len(private_key_certificate_chain_pairs) == 0: + if not private_key_certificate_chain_pairs: raise ValueError( 'At least one private key-certificate chain pair is required!') elif require_client_auth and root_certificates is None: @@ -1512,15 +1512,15 @@ def ssl_server_certificate_configuration(private_key_certificate_chain_pairs, A ServerCertificateConfiguration that can be returned in the certificate configuration fetching callback. """ - if len(private_key_certificate_chain_pairs) == 0: - raise ValueError( - 'At least one private key-certificate chain pair is required!') - else: + if private_key_certificate_chain_pairs: return ServerCertificateConfiguration( _cygrpc.server_certificate_config_ssl(root_certificates, [ _cygrpc.SslPemKeyCertPair(key, pem) for key, pem in private_key_certificate_chain_pairs ])) + else: + raise ValueError( + 'At least one private key-certificate chain pair is required!') def dynamic_ssl_server_credentials(initial_certificate_configuration, @@ -1656,9 +1656,11 @@ def server(thread_pool, A Server object. """ from grpc import _server # pylint: disable=cyclic-import - return _server.Server(thread_pool, () if handlers is None else handlers, () - if interceptors is None else interceptors, () if - options is None else options, maximum_concurrent_rpcs) + return _server.create_server(thread_pool, () + if handlers is None else handlers, () + if interceptors is None else interceptors, () + if options is None else options, + maximum_concurrent_rpcs) ################################### __all__ ################################# diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 9fe946b2d5..e9246991df 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -192,7 +192,7 @@ def _consume_request_iterator(request_iterator, state, call, request_serializer, with state.condition: if state.code is None and not state.cancelled: if serialized_request is None: - code = grpc.StatusCode.INTERNAL # pylint: disable=redefined-variable-type + code = grpc.StatusCode.INTERNAL details = 'Exception serializing request!' call.cancel( _common.STATUS_CODE_TO_CYGRPC_STATUS_CODE[code], @@ -813,10 +813,7 @@ def _poll_connectivity(state, channel, initial_try_to_connect): _common.CYGRPC_CONNECTIVITY_STATE_TO_CHANNEL_CONNECTIVITY[ connectivity]) if not state.delivering: - # NOTE(nathaniel): The field is only ever used as a - # sequence so it's fine that both lists and tuples are - # assigned to it. - callbacks = _deliveries(state) # pylint: disable=redefined-variable-type + callbacks = _deliveries(state) if callbacks: _spawn_delivery(state, callbacks) diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi index 7e9ea33ca0..8d73215247 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pxd.pxi @@ -57,6 +57,11 @@ cdef class ChannelCredentials: cdef grpc_channel_credentials *c_credentials +cdef class SSLSessionCacheLRU: + + cdef grpc_ssl_session_cache *_cache + + cdef class SSLChannelCredentials(ChannelCredentials): cdef readonly object _pem_root_certificates diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi index dff9097bf9..f4ccfbc016 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/credentials.pyx.pxi @@ -17,6 +17,9 @@ cimport cpython import grpc import threading +from libc.stdint cimport uintptr_t + + def _spawn_callback_in_thread(cb_func, args): threading.Thread(target=cb_func, args=args).start() @@ -29,6 +32,7 @@ def set_async_callback_func(callback_func): def _spawn_callback_async(callback, args): async_callback_func(callback, args) + cdef class CallCredentials: cdef grpc_call_credentials *c(self): @@ -107,6 +111,21 @@ cdef class ChannelCredentials: raise NotImplementedError() +cdef class SSLSessionCacheLRU: + + def __cinit__(self, capacity): + grpc_init() + self._cache = grpc_ssl_session_cache_create_lru(capacity) + + def __int__(self): + return <uintptr_t>self._cache + + def __dealloc__(self): + if self._cache != NULL: + grpc_ssl_session_cache_destroy(self._cache) + grpc_shutdown() + + cdef class SSLChannelCredentials(ChannelCredentials): def __cinit__(self, pem_root_certificates, private_key, certificate_chain): diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi index 2d6c900c54..cfefeaf938 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc.pxi @@ -131,6 +131,7 @@ cdef extern from "grpc/grpc.h": const char *GRPC_ARG_PRIMARY_USER_AGENT_STRING const char *GRPC_ARG_SECONDARY_USER_AGENT_STRING const char *GRPC_SSL_TARGET_NAME_OVERRIDE_ARG + const char *GRPC_SSL_SESSION_CACHE_ARG const char *GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM const char *GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL const char *GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET @@ -452,8 +453,16 @@ cdef extern from "grpc/grpc_security.h": # We don't care about the internals (and in fact don't know them) pass + + ctypedef struct grpc_ssl_session_cache: + # We don't care about the internals (and in fact don't know them) + pass + ctypedef void (*grpc_ssl_roots_override_callback)(char **pem_root_certs) + grpc_ssl_session_cache *grpc_ssl_session_cache_create_lru(size_t capacity) + void grpc_ssl_session_cache_destroy(grpc_ssl_session_cache* cache) + void grpc_set_ssl_roots_override_callback( grpc_ssl_roots_override_callback cb) nogil diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd.pxi index f5688d08cd..f5688d08cd 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pxd.pxi diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi index f9a1b2856d..f9a1b2856d 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx +++ b/src/python/grpcio/grpc/_cython/_cygrpc/grpc_gevent.pyx.pxi diff --git a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi index ecd991685f..37b98ebbdb 100644 --- a/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi +++ b/src/python/grpcio/grpc/_cython/_cygrpc/records.pyx.pxi @@ -51,6 +51,7 @@ class ChannelArgKey: default_authority = GRPC_ARG_DEFAULT_AUTHORITY primary_user_agent_string = GRPC_ARG_PRIMARY_USER_AGENT_STRING secondary_user_agent_string = GRPC_ARG_SECONDARY_USER_AGENT_STRING + ssl_session_cache = GRPC_SSL_SESSION_CACHE_ARG ssl_target_name_override = GRPC_SSL_TARGET_NAME_OVERRIDE_ARG diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pxd b/src/python/grpcio/grpc/_cython/cygrpc.pxd index c8ace7c3cc..e33c01c28f 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pxd +++ b/src/python/grpcio/grpc/_cython/cygrpc.pxd @@ -29,4 +29,4 @@ include "_cygrpc/server.pxd.pxi" include "_cygrpc/tag.pxd.pxi" include "_cygrpc/time.pxd.pxi" -include "_cygrpc/grpc_gevent.pxd" +include "_cygrpc/grpc_gevent.pxd.pxi" diff --git a/src/python/grpcio/grpc/_cython/cygrpc.pyx b/src/python/grpcio/grpc/_cython/cygrpc.pyx index f5f08fc983..fb16fb16bf 100644 --- a/src/python/grpcio/grpc/_cython/cygrpc.pyx +++ b/src/python/grpcio/grpc/_cython/cygrpc.pyx @@ -36,7 +36,7 @@ include "_cygrpc/server.pyx.pxi" include "_cygrpc/tag.pyx.pxi" include "_cygrpc/time.pyx.pxi" -include "_cygrpc/grpc_gevent.pyx" +include "_cygrpc/grpc_gevent.pyx.pxi" # # initialize gRPC diff --git a/src/python/grpcio/grpc/_interceptor.py b/src/python/grpcio/grpc/_interceptor.py index f465e35a9c..6b7a912a94 100644 --- a/src/python/grpcio/grpc/_interceptor.py +++ b/src/python/grpcio/grpc/_interceptor.py @@ -100,6 +100,12 @@ class _LocalFailure(grpc.RpcError, grpc.Future, grpc.Call): def cancelled(self): return False + def is_active(self): + return False + + def time_remaining(self): + return None + def running(self): return False @@ -115,6 +121,9 @@ class _LocalFailure(grpc.RpcError, grpc.Future, grpc.Call): def traceback(self, ignored_timeout=None): return self._traceback + def add_callback(self, callback): + return False + def add_done_callback(self, fn): fn(self) @@ -288,11 +297,11 @@ class _Channel(grpc.Channel): self._channel = channel self._interceptor = interceptor - def subscribe(self, *args, **kwargs): - self._channel.subscribe(*args, **kwargs) + def subscribe(self, callback, try_to_connect=False): + self._channel.subscribe(callback, try_to_connect=try_to_connect) - def unsubscribe(self, *args, **kwargs): - self._channel.unsubscribe(*args, **kwargs) + def unsubscribe(self, callback): + self._channel.unsubscribe(callback) def unary_unary(self, method, diff --git a/src/python/grpcio/grpc/_server.py b/src/python/grpcio/grpc/_server.py index d12a2421cd..7276a7fd90 100644 --- a/src/python/grpcio/grpc/_server.py +++ b/src/python/grpcio/grpc/_server.py @@ -330,6 +330,8 @@ class _RequestIterator(object): self._state.request = None return request + raise AssertionError() # should never run + def _next(self): with self._state.condition: self._raise_or_start_receive_message() @@ -787,7 +789,16 @@ def _start(state): thread.start() -class Server(grpc.Server): +def _validate_generic_rpc_handlers(generic_rpc_handlers): + for generic_rpc_handler in generic_rpc_handlers: + service_attribute = getattr(generic_rpc_handler, 'service', None) + if service_attribute is None: + raise AttributeError( + '"{}" must conform to grpc.GenericRpcHandler type but does ' + 'not have "service" method!'.format(generic_rpc_handler)) + + +class _Server(grpc.Server): # pylint: disable=too-many-arguments def __init__(self, thread_pool, generic_handlers, interceptors, options, @@ -800,6 +811,7 @@ class Server(grpc.Server): thread_pool, maximum_concurrent_rpcs) def add_generic_rpc_handlers(self, generic_rpc_handlers): + _validate_generic_rpc_handlers(generic_rpc_handlers) _add_generic_handlers(self._state, generic_rpc_handlers) def add_insecure_port(self, address): @@ -817,3 +829,10 @@ class Server(grpc.Server): def __del__(self): _stop(self._state, None) + + +def create_server(thread_pool, generic_rpc_handlers, interceptors, options, + maximum_concurrent_rpcs): + _validate_generic_rpc_handlers(generic_rpc_handlers) + return _Server(thread_pool, generic_rpc_handlers, interceptors, options, + maximum_concurrent_rpcs) diff --git a/src/python/grpcio/grpc/_utilities.py b/src/python/grpcio/grpc/_utilities.py index 25bd1ceae2..d90b34bcbd 100644 --- a/src/python/grpcio/grpc/_utilities.py +++ b/src/python/grpcio/grpc/_utilities.py @@ -116,6 +116,8 @@ class _ChannelReadyFuture(grpc.Future): callable_util.call_logging_exceptions( done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self) + return True + def cancelled(self): with self._condition: return self._cancelled diff --git a/src/python/grpcio/grpc/beta/_server_adaptations.py b/src/python/grpcio/grpc/beta/_server_adaptations.py index ccafec8951..80ac65b649 100644 --- a/src/python/grpcio/grpc/beta/_server_adaptations.py +++ b/src/python/grpcio/grpc/beta/_server_adaptations.py @@ -305,6 +305,7 @@ def _simple_method_handler(implementation, request_deserializer, response_serializer, None, None, None, _adapt_stream_stream_event( implementation.stream_stream_event)) + raise ValueError() def _flatten_method_pair_map(method_pair_map): diff --git a/src/python/grpcio/grpc/beta/utilities.py b/src/python/grpcio/grpc/beta/utilities.py index b5d8aac71a..fe3ce606c9 100644 --- a/src/python/grpcio/grpc/beta/utilities.py +++ b/src/python/grpcio/grpc/beta/utilities.py @@ -85,6 +85,8 @@ class _ChannelReadyFuture(future.Future): callable_util.call_logging_exceptions( done_callback, _DONE_CALLBACK_EXCEPTION_LOG_MESSAGE, self) + return True + def cancelled(self): with self._condition: return self._cancelled diff --git a/src/python/grpcio/grpc/experimental/session_cache.py b/src/python/grpcio/grpc/experimental/session_cache.py new file mode 100644 index 0000000000..5c55f7c327 --- /dev/null +++ b/src/python/grpcio/grpc/experimental/session_cache.py @@ -0,0 +1,45 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""gRPC's APIs for TLS Session Resumption support""" + +from grpc._cython import cygrpc as _cygrpc + + +def ssl_session_cache_lru(capacity): + """Creates an SSLSessionCache with LRU replacement policy + + Args: + capacity: Size of the cache + + Returns: + An SSLSessionCache with LRU replacement policy that can be passed as a value for + the grpc.ssl_session_cache option to a grpc.Channel. SSL session caches are used + to store session tickets, which clients can present to resume previous TLS sessions + with a server. + """ + return SSLSessionCache(_cygrpc.SSLSessionCacheLRU(capacity)) + + +class SSLSessionCache(object): + """An encapsulation of a session cache used for TLS session resumption. + + Instances of this class can be passed to a Channel as values for the + grpc.ssl_session_cache option + """ + + def __init__(self, cache): + self._cache = cache + + def __int__(self): + return int(self._cache) diff --git a/src/python/grpcio/grpc/framework/foundation/stream_util.py b/src/python/grpcio/grpc/framework/foundation/stream_util.py index ed0448aa08..1faaf29bd7 100644 --- a/src/python/grpcio/grpc/framework/foundation/stream_util.py +++ b/src/python/grpcio/grpc/framework/foundation/stream_util.py @@ -47,10 +47,10 @@ class IterableConsumer(stream.Consumer): self._values = [] self._active = True - def consume(self, stock_reply): + def consume(self, value): with self._condition: if self._active: - self._values.append(stock_reply) + self._values.append(value) self._condition.notify() def terminate(self): @@ -58,10 +58,10 @@ class IterableConsumer(stream.Consumer): self._active = False self._condition.notify() - def consume_and_terminate(self, stock_reply): + def consume_and_terminate(self, value): with self._condition: if self._active: - self._values.append(stock_reply) + self._values.append(value) self._active = False self._condition.notify() diff --git a/src/python/grpcio_testing/grpc_testing/__init__.py b/src/python/grpcio_testing/grpc_testing/__init__.py index e87d0ffc96..65fdd1b8ca 100644 --- a/src/python/grpcio_testing/grpc_testing/__init__.py +++ b/src/python/grpcio_testing/grpc_testing/__init__.py @@ -14,9 +14,9 @@ """Objects for use in testing gRPC Python-using application code.""" import abc +import six from google.protobuf import descriptor -import six import grpc diff --git a/src/python/grpcio_testing/grpc_testing/_server/_handler.py b/src/python/grpcio_testing/grpc_testing/_server/_handler.py index d4f50f6863..0e3404b0d0 100644 --- a/src/python/grpcio_testing/grpc_testing/_server/_handler.py +++ b/src/python/grpcio_testing/grpc_testing/_server/_handler.py @@ -105,10 +105,10 @@ class _Handler(Handler): self._expiration_future.cancel() self._condition.notify_all() - def add_termination_callback(self, termination_callback): + def add_termination_callback(self, callback): with self._condition: if self._code is None: - self._termination_callbacks.append(termination_callback) + self._termination_callbacks.append(callback) return True else: return False diff --git a/src/python/grpcio_tests/tests/_loader.py b/src/python/grpcio_tests/tests/_loader.py index be0af64646..80c107aa8e 100644 --- a/src/python/grpcio_tests/tests/_loader.py +++ b/src/python/grpcio_tests/tests/_loader.py @@ -48,12 +48,13 @@ class Loader(object): # measure unnecessarily suffers) coverage_context = coverage.Coverage(data_suffix=True) coverage_context.start() - modules = [importlib.import_module(name) for name in names] - for module in modules: - self.visit_module(module) - for module in modules: + imported_modules = tuple( + importlib.import_module(name) for name in names) + for imported_module in imported_modules: + self.visit_module(imported_module) + for imported_module in imported_modules: try: - package_paths = module.__path__ + package_paths = imported_module.__path__ except AttributeError: continue self.walk_packages(package_paths) diff --git a/src/python/grpcio_tests/tests/_result.py b/src/python/grpcio_tests/tests/_result.py index b105f18e78..e5378b7ea3 100644 --- a/src/python/grpcio_tests/tests/_result.py +++ b/src/python/grpcio_tests/tests/_result.py @@ -144,10 +144,6 @@ class AugmentedResult(unittest.TestResult): super(AugmentedResult, self).startTestRun() self.cases = dict() - def stopTestRun(self): - """See unittest.TestResult.stopTestRun.""" - super(AugmentedResult, self).stopTestRun() - def startTest(self, test): """See unittest.TestResult.startTest.""" super(AugmentedResult, self).startTest(test) @@ -155,19 +151,19 @@ class AugmentedResult(unittest.TestResult): self.cases[case_id] = CaseResult( id=case_id, name=test.id(), kind=CaseResult.Kind.RUNNING) - def addError(self, test, error): + def addError(self, test, err): """See unittest.TestResult.addError.""" - super(AugmentedResult, self).addError(test, error) + super(AugmentedResult, self).addError(test, err) case_id = self.id_map(test) self.cases[case_id] = self.cases[case_id].updated( - kind=CaseResult.Kind.ERROR, traceback=error) + kind=CaseResult.Kind.ERROR, traceback=err) - def addFailure(self, test, error): + def addFailure(self, test, err): """See unittest.TestResult.addFailure.""" - super(AugmentedResult, self).addFailure(test, error) + super(AugmentedResult, self).addFailure(test, err) case_id = self.id_map(test) self.cases[case_id] = self.cases[case_id].updated( - kind=CaseResult.Kind.FAILURE, traceback=error) + kind=CaseResult.Kind.FAILURE, traceback=err) def addSuccess(self, test): """See unittest.TestResult.addSuccess.""" @@ -183,12 +179,12 @@ class AugmentedResult(unittest.TestResult): self.cases[case_id] = self.cases[case_id].updated( kind=CaseResult.Kind.SKIP, skip_reason=reason) - def addExpectedFailure(self, test, error): + def addExpectedFailure(self, test, err): """See unittest.TestResult.addExpectedFailure.""" - super(AugmentedResult, self).addExpectedFailure(test, error) + super(AugmentedResult, self).addExpectedFailure(test, err) case_id = self.id_map(test) self.cases[case_id] = self.cases[case_id].updated( - kind=CaseResult.Kind.EXPECTED_FAILURE, traceback=error) + kind=CaseResult.Kind.EXPECTED_FAILURE, traceback=err) def addUnexpectedSuccess(self, test): """See unittest.TestResult.addUnexpectedSuccess.""" @@ -249,13 +245,6 @@ class CoverageResult(AugmentedResult): self.coverage_context.save() self.coverage_context = None - def stopTestRun(self): - """See unittest.TestResult.stopTestRun.""" - super(CoverageResult, self).stopTestRun() - # TODO(atash): Dig deeper into why the following line fails to properly - # combine coverage data from the Cython plugin. - #coverage.Coverage().combine() - class _Colors(object): """Namespaced constants for terminal color magic numbers.""" @@ -295,16 +284,16 @@ class TerminalResult(CoverageResult): self.out.write(summary(self)) self.out.flush() - def addError(self, test, error): + def addError(self, test, err): """See unittest.TestResult.addError.""" - super(TerminalResult, self).addError(test, error) + super(TerminalResult, self).addError(test, err) self.out.write( _Colors.FAIL + 'ERROR {}\n'.format(test.id()) + _Colors.END) self.out.flush() - def addFailure(self, test, error): + def addFailure(self, test, err): """See unittest.TestResult.addFailure.""" - super(TerminalResult, self).addFailure(test, error) + super(TerminalResult, self).addFailure(test, err) self.out.write( _Colors.FAIL + 'FAILURE {}\n'.format(test.id()) + _Colors.END) self.out.flush() @@ -323,9 +312,9 @@ class TerminalResult(CoverageResult): _Colors.INFO + 'SKIP {}\n'.format(test.id()) + _Colors.END) self.out.flush() - def addExpectedFailure(self, test, error): + def addExpectedFailure(self, test, err): """See unittest.TestResult.addExpectedFailure.""" - super(TerminalResult, self).addExpectedFailure(test, error) + super(TerminalResult, self).addExpectedFailure(test, err) self.out.write( _Colors.INFO + 'FAILURE_OK {}\n'.format(test.id()) + _Colors.END) self.out.flush() diff --git a/src/python/grpcio_tests/tests/interop/methods.py b/src/python/grpcio_tests/tests/interop/methods.py index b728ffd704..cda15a68a3 100644 --- a/src/python/grpcio_tests/tests/interop/methods.py +++ b/src/python/grpcio_tests/tests/interop/methods.py @@ -144,8 +144,8 @@ def _large_unary_common_behavior(stub, fill_username, fill_oauth_scope, def _empty_unary(stub): response = stub.EmptyCall(empty_pb2.Empty()) if not isinstance(response, empty_pb2.Empty): - raise TypeError('response is of type "%s", not empty_pb2.Empty!', - type(response)) + raise TypeError( + 'response is of type "%s", not empty_pb2.Empty!' % type(response)) def _large_unary(stub): diff --git a/src/python/grpcio_tests/tests/tests.json b/src/python/grpcio_tests/tests/tests.json index 0d94426413..ebc41c63f0 100644 --- a/src/python/grpcio_tests/tests/tests.json +++ b/src/python/grpcio_tests/tests/tests.json @@ -53,6 +53,8 @@ "unit._server_ssl_cert_config_test.ServerSSLCertReloadTestCertConfigReuse", "unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithClientAuth", "unit._server_ssl_cert_config_test.ServerSSLCertReloadTestWithoutClientAuth", + "unit._server_test.ServerTest", + "unit._session_cache_test.SSLSessionCacheTest", "unit.beta._beta_features_test.BetaFeaturesTest", "unit.beta._beta_features_test.ContextManagementAndLifecycleTest", "unit.beta._connectivity_channel_test.ConnectivityStatesTest", diff --git a/src/python/grpcio_tests/tests/unit/_auth_context_test.py b/src/python/grpcio_tests/tests/unit/_auth_context_test.py index 8c1a30e032..d174051070 100644 --- a/src/python/grpcio_tests/tests/unit/_auth_context_test.py +++ b/src/python/grpcio_tests/tests/unit/_auth_context_test.py @@ -18,6 +18,7 @@ import unittest import grpc from grpc import _channel +from grpc.experimental import session_cache import six from tests.unit import test_common @@ -140,6 +141,50 @@ class AuthContextTest(unittest.TestCase): self.assertSequenceEqual([b'*.test.google.com'], auth_ctx['x509_common_name']) + def _do_one_shot_client_rpc(self, channel_creds, channel_options, port, + expect_ssl_session_reused): + channel = grpc.secure_channel( + 'localhost:{}'.format(port), channel_creds, options=channel_options) + response = channel.unary_unary(_UNARY_UNARY)(_REQUEST) + auth_data = pickle.loads(response) + self.assertEqual(expect_ssl_session_reused, + auth_data[_AUTH_CTX]['ssl_session_reused']) + channel.close() + + def testSessionResumption(self): + # Set up a secure server + handler = grpc.method_handlers_generic_handler('test', { + 'UnaryUnary': + grpc.unary_unary_rpc_method_handler(handle_unary_unary) + }) + server = test_common.test_server() + server.add_generic_rpc_handlers((handler,)) + server_cred = grpc.ssl_server_credentials(_SERVER_CERTS) + port = server.add_secure_port('[::]:0', server_cred) + server.start() + + # Create a cache for TLS session tickets + cache = session_cache.ssl_session_cache_lru(1) + channel_creds = grpc.ssl_channel_credentials( + root_certificates=_TEST_ROOT_CERTIFICATES) + channel_options = _PROPERTY_OPTIONS + ( + ('grpc.ssl_session_cache', cache),) + + # Initial connection has no session to resume + self._do_one_shot_client_rpc( + channel_creds, + channel_options, + port, + expect_ssl_session_reused=[b'false']) + + # Subsequent connections resume sessions + self._do_one_shot_client_rpc( + channel_creds, + channel_options, + port, + expect_ssl_session_reused=[b'true']) + server.stop(None) + if __name__ == '__main__': unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_junkdrawer/__init__.py b/src/python/grpcio_tests/tests/unit/_junkdrawer/__init__.py deleted file mode 100644 index 5fb4f3c3cf..0000000000 --- a/src/python/grpcio_tests/tests/unit/_junkdrawer/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/src/python/grpcio_tests/tests/unit/_junkdrawer/stock_pb2.py b/src/python/grpcio_tests/tests/unit/_junkdrawer/stock_pb2.py deleted file mode 100644 index 2bf1e1cc0d..0000000000 --- a/src/python/grpcio_tests/tests/unit/_junkdrawer/stock_pb2.py +++ /dev/null @@ -1,164 +0,0 @@ -# Copyright 2015 gRPC authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# TODO(nathaniel): Remove this from source control after having made -# generation from the stock.proto source part of GRPC's build-and-test -# process. - -# Generated by the protocol buffer compiler. DO NOT EDIT! -# source: stock.proto - -import sys -_b = sys.version_info[0] < 3 and (lambda x: x) or (lambda x: x.encode('latin1')) -from google.protobuf import descriptor as _descriptor -from google.protobuf import message as _message -from google.protobuf import reflection as _reflection -from google.protobuf import symbol_database as _symbol_database -from google.protobuf import descriptor_pb2 -# @@protoc_insertion_point(imports) - -_sym_db = _symbol_database.Default() - -DESCRIPTOR = _descriptor.FileDescriptor( - name='stock.proto', - package='stock', - serialized_pb=_b( - '\n\x0bstock.proto\x12\x05stock\">\n\x0cStockRequest\x12\x0e\n\x06symbol\x18\x01 \x01(\t\x12\x1e\n\x13num_trades_to_watch\x18\x02 \x01(\x05:\x01\x30\"+\n\nStockReply\x12\r\n\x05price\x18\x01 \x01(\x02\x12\x0e\n\x06symbol\x18\x02 \x01(\t2\x96\x02\n\x05Stock\x12=\n\x11GetLastTradePrice\x12\x13.stock.StockRequest\x1a\x11.stock.StockReply\"\x00\x12I\n\x19GetLastTradePriceMultiple\x12\x13.stock.StockRequest\x1a\x11.stock.StockReply\"\x00(\x01\x30\x01\x12?\n\x11WatchFutureTrades\x12\x13.stock.StockRequest\x1a\x11.stock.StockReply\"\x00\x30\x01\x12\x42\n\x14GetHighestTradePrice\x12\x13.stock.StockRequest\x1a\x11.stock.StockReply\"\x00(\x01' - )) -_sym_db.RegisterFileDescriptor(DESCRIPTOR) - -_STOCKREQUEST = _descriptor.Descriptor( - name='StockRequest', - full_name='stock.StockRequest', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='symbol', - full_name='stock.StockRequest.symbol', - index=0, - number=1, - type=9, - cpp_type=9, - label=1, - has_default_value=False, - default_value=_b("").decode('utf-8'), - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - options=None), - _descriptor.FieldDescriptor( - name='num_trades_to_watch', - full_name='stock.StockRequest.num_trades_to_watch', - index=1, - number=2, - type=5, - cpp_type=1, - label=1, - has_default_value=True, - default_value=0, - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - options=None), - ], - extensions=[], - nested_types=[], - enum_types=[], - options=None, - is_extendable=False, - extension_ranges=[], - oneofs=[], - serialized_start=22, - serialized_end=84,) - -_STOCKREPLY = _descriptor.Descriptor( - name='StockReply', - full_name='stock.StockReply', - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name='price', - full_name='stock.StockReply.price', - index=0, - number=1, - type=2, - cpp_type=6, - label=1, - has_default_value=False, - default_value=0, - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - options=None), - _descriptor.FieldDescriptor( - name='symbol', - full_name='stock.StockReply.symbol', - index=1, - number=2, - type=9, - cpp_type=9, - label=1, - has_default_value=False, - default_value=_b("").decode('utf-8'), - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - options=None), - ], - extensions=[], - nested_types=[], - enum_types=[], - options=None, - is_extendable=False, - extension_ranges=[], - oneofs=[], - serialized_start=86, - serialized_end=129,) - -DESCRIPTOR.message_types_by_name['StockRequest'] = _STOCKREQUEST -DESCRIPTOR.message_types_by_name['StockReply'] = _STOCKREPLY - -StockRequest = _reflection.GeneratedProtocolMessageType( - 'StockRequest', - (_message.Message,), - dict( - DESCRIPTOR=_STOCKREQUEST, - __module__='stock_pb2' - # @@protoc_insertion_point(class_scope:stock.StockRequest) - )) -_sym_db.RegisterMessage(StockRequest) - -StockReply = _reflection.GeneratedProtocolMessageType( - 'StockReply', - (_message.Message,), - dict( - DESCRIPTOR=_STOCKREPLY, - __module__='stock_pb2' - # @@protoc_insertion_point(class_scope:stock.StockReply) - )) -_sym_db.RegisterMessage(StockReply) - -# @@protoc_insertion_point(module_scope) diff --git a/src/python/grpcio_tests/tests/unit/_server_test.py b/src/python/grpcio_tests/tests/unit/_server_test.py new file mode 100644 index 0000000000..acf4a17921 --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_server_test.py @@ -0,0 +1,52 @@ +# Copyright 2018 The gRPC Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from concurrent import futures +import unittest + +import grpc + + +class _ActualGenericRpcHandler(grpc.GenericRpcHandler): + + def service(self, handler_call_details): + return None + + +class ServerTest(unittest.TestCase): + + def test_not_a_generic_rpc_handler_at_construction(self): + with self.assertRaises(AttributeError) as exception_context: + grpc.server( + futures.ThreadPoolExecutor(max_workers=5), + handlers=[ + _ActualGenericRpcHandler(), + object(), + ]) + self.assertIn('grpc.GenericRpcHandler', + str(exception_context.exception)) + + def test_not_a_generic_rpc_handler_after_construction(self): + server = grpc.server(futures.ThreadPoolExecutor(max_workers=5)) + with self.assertRaises(AttributeError) as exception_context: + server.add_generic_rpc_handlers([ + _ActualGenericRpcHandler(), + object(), + ]) + self.assertIn('grpc.GenericRpcHandler', + str(exception_context.exception)) + + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/src/python/grpcio_tests/tests/unit/_session_cache_test.py b/src/python/grpcio_tests/tests/unit/_session_cache_test.py new file mode 100644 index 0000000000..b4e4670fa7 --- /dev/null +++ b/src/python/grpcio_tests/tests/unit/_session_cache_test.py @@ -0,0 +1,145 @@ +# Copyright 2018 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests experimental TLS Session Resumption API""" + +import pickle +import unittest + +import grpc +from grpc import _channel +from grpc.experimental import session_cache + +from tests.unit import test_common +from tests.unit import resources + +_REQUEST = b'\x00\x00\x00' +_RESPONSE = b'\x00\x00\x00' + +_UNARY_UNARY = '/test/UnaryUnary' + +_SERVER_HOST_OVERRIDE = 'foo.test.google.fr' +_ID = 'id' +_ID_KEY = 'id_key' +_AUTH_CTX = 'auth_ctx' + +_PRIVATE_KEY = resources.private_key() +_CERTIFICATE_CHAIN = resources.certificate_chain() +_TEST_ROOT_CERTIFICATES = resources.test_root_certificates() +_SERVER_CERTS = ((_PRIVATE_KEY, _CERTIFICATE_CHAIN),) +_PROPERTY_OPTIONS = (( + 'grpc.ssl_target_name_override', + _SERVER_HOST_OVERRIDE, +),) + + +def handle_unary_unary(request, servicer_context): + return pickle.dumps({ + _ID: servicer_context.peer_identities(), + _ID_KEY: servicer_context.peer_identity_key(), + _AUTH_CTX: servicer_context.auth_context() + }) + + +def start_secure_server(): + handler = grpc.method_handlers_generic_handler('test', { + 'UnaryUnary': + grpc.unary_unary_rpc_method_handler(handle_unary_unary) + }) + server = test_common.test_server() + server.add_generic_rpc_handlers((handler,)) + server_cred = grpc.ssl_server_credentials(_SERVER_CERTS) + port = server.add_secure_port('[::]:0', server_cred) + server.start() + + return server, port + + +class SSLSessionCacheTest(unittest.TestCase): + + def _do_one_shot_client_rpc(self, channel_creds, channel_options, port, + expect_ssl_session_reused): + channel = grpc.secure_channel( + 'localhost:{}'.format(port), channel_creds, options=channel_options) + response = channel.unary_unary(_UNARY_UNARY)(_REQUEST) + auth_data = pickle.loads(response) + self.assertEqual(expect_ssl_session_reused, + auth_data[_AUTH_CTX]['ssl_session_reused']) + channel.close() + + def testSSLSessionCacheLRU(self): + server_1, port_1 = start_secure_server() + + cache = session_cache.ssl_session_cache_lru(1) + channel_creds = grpc.ssl_channel_credentials( + root_certificates=_TEST_ROOT_CERTIFICATES) + channel_options = _PROPERTY_OPTIONS + ( + ('grpc.ssl_session_cache', cache),) + + # Initial connection has no session to resume + self._do_one_shot_client_rpc( + channel_creds, + channel_options, + port_1, + expect_ssl_session_reused=[b'false']) + + # Connection to server_1 resumes from initial session + self._do_one_shot_client_rpc( + channel_creds, + channel_options, + port_1, + expect_ssl_session_reused=[b'true']) + + # Connection to a different server with the same name overwrites the cache entry + server_2, port_2 = start_secure_server() + self._do_one_shot_client_rpc( + channel_creds, + channel_options, + port_2, + expect_ssl_session_reused=[b'false']) + self._do_one_shot_client_rpc( + channel_creds, + channel_options, + port_2, + expect_ssl_session_reused=[b'true']) + server_2.stop(None) + + # Connection to server_1 now falls back to full TLS handshake + self._do_one_shot_client_rpc( + channel_creds, + channel_options, + port_1, + expect_ssl_session_reused=[b'false']) + + # Re-creating server_1 causes old sessions to become invalid + server_1.stop(None) + server_1, port_1 = start_secure_server() + + # Old sessions should no longer be valid + self._do_one_shot_client_rpc( + channel_creds, + channel_options, + port_1, + expect_ssl_session_reused=[b'false']) + + # Resumption should work for subsequent connections + self._do_one_shot_client_rpc( + channel_creds, + channel_options, + port_1, + expect_ssl_session_reused=[b'true']) + server_1.stop(None) + + +if __name__ == '__main__': + unittest.main(verbosity=2) |