diff options
Diffstat (limited to 'src')
100 files changed, 2942 insertions, 1158 deletions
diff --git a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c index 721ba82d8f..9acacbd92d 100644 --- a/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c +++ b/src/core/ext/transport/chttp2/client/secure/secure_channel_create.c @@ -91,11 +91,13 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, connector *c = arg; grpc_closure *notify; gpr_mu_lock(&c->mu); + grpc_error *error = GRPC_ERROR_NONE; if (c->connecting_endpoint == NULL) { memset(c->result, 0, sizeof(*c->result)); gpr_mu_unlock(&c->mu); } else if (status != GRPC_SECURITY_OK) { - gpr_log(GPR_ERROR, "Secure handshake failed with error %d.", status); + error = grpc_error_set_int(GRPC_ERROR_CREATE("Secure handshake failed"), + GRPC_ERROR_INT_SECURITY_STATUS, status); memset(c->result, 0, sizeof(*c->result)); c->connecting_endpoint = NULL; gpr_mu_unlock(&c->mu); @@ -113,7 +115,7 @@ static void on_secure_handshake_done(grpc_exec_ctx *exec_ctx, void *arg, } notify = c->notify; c->notify = NULL; - grpc_exec_ctx_sched(exec_ctx, notify, GRPC_ERROR_NONE, NULL); + grpc_exec_ctx_sched(exec_ctx, notify, error, NULL); } static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg, diff --git a/src/core/ext/transport/chttp2/transport/chttp2_plugin.c b/src/core/ext/transport/chttp2/transport/chttp2_plugin.c index bd87253ed3..7d5279b9da 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_plugin.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_plugin.c @@ -36,11 +36,14 @@ #include "src/core/lib/debug/trace.h" #include "src/core/lib/transport/metadata.h" +extern int grpc_http_write_state_trace; + void grpc_chttp2_plugin_init(void) { grpc_chttp2_base64_encode_and_huffman_compress = grpc_chttp2_base64_encode_and_huffman_compress_impl; grpc_register_tracer("http", &grpc_http_trace); grpc_register_tracer("flowctl", &grpc_flowctl_trace); + grpc_register_tracer("http_write_state", &grpc_http_write_state_trace); } void grpc_chttp2_plugin_shutdown(void) {} diff --git a/src/core/ext/transport/chttp2/transport/chttp2_transport.c b/src/core/ext/transport/chttp2/transport/chttp2_transport.c index e2dd463a77..d050467a02 100644 --- a/src/core/ext/transport/chttp2/transport/chttp2_transport.c +++ b/src/core/ext/transport/chttp2/transport/chttp2_transport.c @@ -48,6 +48,7 @@ #include "src/core/ext/transport/chttp2/transport/status_conversion.h" #include "src/core/ext/transport/chttp2/transport/timeout_encoding.h" #include "src/core/lib/http/parser.h" +#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/string.h" #include "src/core/lib/transport/static_metadata.h" @@ -60,9 +61,9 @@ #define DEFAULT_MAX_HEADER_LIST_SIZE (16 * 1024) #define MAX_CLIENT_STREAM_ID 0x7fffffffu - int grpc_http_trace = 0; int grpc_flowctl_trace = 0; +int grpc_http_write_state_trace = 0; #define TRANSPORT_FROM_WRITING(tw) \ ((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \ @@ -88,10 +89,16 @@ static const grpc_transport_vtable vtable; static void writing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); static void reading_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); static void parsing_action(grpc_exec_ctx *exec_ctx, void *t, grpc_error *error); +static void initiate_writing(grpc_exec_ctx *exec_ctx, void *t, + grpc_error *error); + +static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t); +static void end_waiting_for_write(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, grpc_error *error); /** Set a transport level setting, and push it to our peer */ -static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, - uint32_t value); +static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_setting_id id, uint32_t value); /** Start disconnection chain */ static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -137,7 +144,7 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global); static void incoming_byte_stream_update_flow_control( - grpc_chttp2_transport_global *transport_global, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, size_t max_size_hint, size_t have_already); static void incoming_byte_stream_destroy_locked(grpc_exec_ctx *exec_ctx, @@ -201,6 +208,7 @@ static void destruct_transport(grpc_exec_ctx *exec_ctx, gpr_free(t); } +/*#define REFCOUNTING_DEBUG 1*/ #ifdef REFCOUNTING_DEBUG #define REF_TRANSPORT(t, r) ref_transport(t, r, __FILE__, __LINE__) #define UNREF_TRANSPORT(cl, t, r) unref_transport(cl, t, r, __FILE__, __LINE__) @@ -231,7 +239,7 @@ static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); } static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, const grpc_channel_args *channel_args, - grpc_endpoint *ep, uint8_t is_client) { + grpc_endpoint *ep, bool is_client) { size_t i; int j; @@ -273,6 +281,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_closure_init(&t->writing_action, writing_action, t); grpc_closure_init(&t->reading_action, reading_action, t); grpc_closure_init(&t->parsing_action, parsing_action, t); + grpc_closure_init(&t->initiate_writing, initiate_writing, t); gpr_slice_buffer_init(&t->parsing.qbuf); grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser); @@ -286,6 +295,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_slice_buffer_add( &t->global.qbuf, gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING)); + grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "initial_write"); } /* 8 is a random stab in the dark as to a good initial size: it's small enough that it shouldn't waste memory for infrequently used connections, yet @@ -311,11 +321,12 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, /* configure http2 the way we like it */ if (is_client) { - push_setting(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0); - push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0); + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0); + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0); } - push_setting(t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, DEFAULT_WINDOW); - push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, + DEFAULT_WINDOW); + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, DEFAULT_MAX_HEADER_LIST_SIZE); if (channel_args) { @@ -329,7 +340,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_log(GPR_ERROR, "%s: must be an integer", GRPC_ARG_MAX_CONCURRENT_STREAMS); } else { - push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, (uint32_t)channel_args->args[i].value.integer); } } else if (0 == strcmp(channel_args->args[i].key, @@ -368,7 +379,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_log(GPR_ERROR, "%s: must be non-negative", GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER); } else { - push_setting(t, GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE, + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE, (uint32_t)channel_args->args[i].value.integer); } } else if (0 == strcmp(channel_args->args[i].key, @@ -393,7 +404,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, gpr_log(GPR_ERROR, "%s: must be non-negative", GRPC_ARG_MAX_METADATA_SIZE); } else { - push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, + push_setting(exec_ctx, t, GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE, (uint32_t)channel_args->args[i].value.integer); } } @@ -444,6 +455,9 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_error *error) { if (!t->closed) { + if (grpc_http_write_state_trace) { + gpr_log(GPR_DEBUG, "W:%p close transport", t); + } t->closed = 1; connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "close_transport"); @@ -590,7 +604,8 @@ static void destroy_stream_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_metadata_buffer_destroy( &s->global.received_trailing_metadata); gpr_slice_buffer_destroy(&s->writing.flow_controlled_buffer); - GRPC_ERROR_UNREF(s->global.removal_error); + GRPC_ERROR_UNREF(s->global.read_closed_error); + GRPC_ERROR_UNREF(s->global.write_closed_error); UNREF_TRANSPORT(exec_ctx, t, "stream"); @@ -634,6 +649,36 @@ grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream( * LOCK MANAGEMENT */ +static const char *write_state_name(grpc_chttp2_write_state state) { + switch (state) { + case GRPC_CHTTP2_WRITING_INACTIVE: + return "INACTIVE"; + case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER: + return "REQUESTED[p=0]"; + case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER: + return "REQUESTED[p=1]"; + case GRPC_CHTTP2_WRITE_SCHEDULED: + return "SCHEDULED"; + case GRPC_CHTTP2_WRITING: + return "WRITING"; + case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER: + return "WRITING[p=1]"; + case GRPC_CHTTP2_WRITING_STALE_NO_POLLER: + return "WRITING[p=0]"; + } + GPR_UNREACHABLE_CODE(return "UNKNOWN"); +} + +static void set_write_state(grpc_chttp2_transport *t, + grpc_chttp2_write_state state, const char *reason) { + if (grpc_http_write_state_trace) { + gpr_log(GPR_DEBUG, "W:%p %s -> %s because %s", t, + write_state_name(t->executor.write_state), write_state_name(state), + reason); + } + t->executor.write_state = state; +} + static void finish_global_actions(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { grpc_chttp2_executor_action_header *hdr; @@ -642,13 +687,6 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx, GPR_TIMER_BEGIN("finish_global_actions", 0); for (;;) { - if (!t->executor.writing_active && !t->closed && - grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing)) { - t->executor.writing_active = 1; - REF_TRANSPORT(t, "writing"); - prevent_endpoint_shutdown(t); - grpc_exec_ctx_sched(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL); - } check_read_ops(exec_ctx, &t->global); gpr_mu_lock(&t->executor.mu); @@ -669,8 +707,28 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx, continue; } else { t->executor.global_active = false; + switch (t->executor.write_state) { + case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER: + set_write_state(t, GRPC_CHTTP2_WRITE_SCHEDULED, "unlocking"); + REF_TRANSPORT(t, "initiate_writing"); + gpr_mu_unlock(&t->executor.mu); + grpc_exec_ctx_sched( + exec_ctx, &t->initiate_writing, GRPC_ERROR_NONE, + t->ep != NULL ? grpc_endpoint_get_workqueue(t->ep) : NULL); + break; + case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER: + start_writing(exec_ctx, t); + gpr_mu_unlock(&t->executor.mu); + break; + case GRPC_CHTTP2_WRITING_INACTIVE: + case GRPC_CHTTP2_WRITING: + case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER: + case GRPC_CHTTP2_WRITING_STALE_NO_POLLER: + case GRPC_CHTTP2_WRITE_SCHEDULED: + gpr_mu_unlock(&t->executor.mu); + break; + } } - gpr_mu_unlock(&t->executor.mu); break; } @@ -741,16 +799,118 @@ void grpc_chttp2_run_with_global_lock(grpc_exec_ctx *exec_ctx, * OUTPUT PROCESSING */ -void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global) { +void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global, + bool covered_by_poller, const char *reason) { + /* Perform state checks, and transition to a scheduled state if appropriate. + Each time we finish the global lock execution, we check if we need to + write. If we do: + - (if there is a poller surrounding the write) schedule + initiate_writing, which locks and calls initiate_writing_locked to... + - call start_writing, which verifies (under the global lock) that there + are things that need to be written by calling + grpc_chttp2_unlocking_check_writes, and if so schedules writing_action + against the current exec_ctx, to be executed OUTSIDE of the global lock + - eventually writing_action results in grpc_chttp2_terminate_writing being + called, which re-takes the global lock, updates state, checks if we need + to do *another* write immediately, and if so loops back to + start_writing. + + Current problems: + - too much lock entry/exiting + - the writing thread can become stuck indefinitely (punt through the + workqueue periodically to fix) */ + + grpc_chttp2_transport *t = TRANSPORT_FROM_GLOBAL(transport_global); + switch (t->executor.write_state) { + case GRPC_CHTTP2_WRITING_INACTIVE: + set_write_state(t, covered_by_poller + ? GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER + : GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER, + reason); + break; + case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER: + /* nothing to do: write already requested */ + break; + case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER: + if (covered_by_poller) { + /* upgrade to note poller is available to cover the write */ + set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER, reason); + } + break; + case GRPC_CHTTP2_WRITE_SCHEDULED: + /* nothing to do: write already scheduled */ + break; + case GRPC_CHTTP2_WRITING: + set_write_state(t, + covered_by_poller ? GRPC_CHTTP2_WRITING_STALE_WITH_POLLER + : GRPC_CHTTP2_WRITING_STALE_NO_POLLER, + reason); + break; + case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER: + /* nothing to do: write already requested */ + break; + case GRPC_CHTTP2_WRITING_STALE_NO_POLLER: + if (covered_by_poller) { + /* upgrade to note poller is available to cover the write */ + set_write_state(t, GRPC_CHTTP2_WRITING_STALE_WITH_POLLER, reason); + } + break; + } +} + +static void start_writing(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) { + GPR_ASSERT(t->executor.write_state == GRPC_CHTTP2_WRITE_SCHEDULED || + t->executor.write_state == GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER); + if (!t->closed && + grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing)) { + set_write_state(t, GRPC_CHTTP2_WRITING, "start_writing"); + REF_TRANSPORT(t, "writing"); + prevent_endpoint_shutdown(t); + grpc_exec_ctx_sched(exec_ctx, &t->writing_action, GRPC_ERROR_NONE, NULL); + } else { + if (t->closed) { + set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, + "start_writing:transport_closed"); + } else { + set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, + "start_writing:nothing_to_write"); + } + end_waiting_for_write(exec_ctx, t, GRPC_ERROR_CREATE("Nothing to write")); + if (t->ep && !t->endpoint_reading) { + destroy_endpoint(exec_ctx, t); + } + } +} + +static void initiate_writing_locked(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, + grpc_chttp2_stream *s_unused, + void *arg_ignored) { + start_writing(exec_ctx, t); + UNREF_TRANSPORT(exec_ctx, t, "initiate_writing"); +} + +static void initiate_writing(grpc_exec_ctx *exec_ctx, void *arg, + grpc_error *error) { + grpc_chttp2_run_with_global_lock(exec_ctx, arg, NULL, initiate_writing_locked, + NULL, 0); +} + +void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, + bool covered_by_poller, const char *reason) { if (!TRANSPORT_FROM_GLOBAL(transport_global)->closed && grpc_chttp2_list_add_writable_stream(transport_global, stream_global)) { GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing"); + grpc_chttp2_initiate_write(exec_ctx, transport_global, covered_by_poller, + reason); } } -static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, - uint32_t value) { +static void push_setting(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_chttp2_setting_id id, uint32_t value) { const grpc_chttp2_setting_parameters *sp = &grpc_chttp2_settings_parameters[id]; uint32_t use_value = GPR_CLAMP(value, sp->min_value, sp->max_value); @@ -761,9 +921,22 @@ static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id, if (use_value != t->global.settings[GRPC_LOCAL_SETTINGS][id]) { t->global.settings[GRPC_LOCAL_SETTINGS][id] = use_value; t->global.dirtied_local_settings = 1; + grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "push_setting"); } } +static void end_waiting_for_write(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport *t, grpc_error *error) { + grpc_chttp2_stream_global *stream_global; + while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global, + &stream_global)) { + fail_pending_writes(exec_ctx, &t->global, stream_global, + GRPC_ERROR_REF(error)); + GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes"); + } + GRPC_ERROR_UNREF(error); +} + static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_stream *s_ignored, @@ -778,24 +951,32 @@ static void terminate_writing_with_lock(grpc_exec_ctx *exec_ctx, grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing); - grpc_chttp2_stream_global *stream_global; - while (grpc_chttp2_list_pop_closed_waiting_for_writing(&t->global, - &stream_global)) { - fail_pending_writes(exec_ctx, &t->global, stream_global, - GRPC_ERROR_REF(error)); - GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "finish_writes"); + end_waiting_for_write(exec_ctx, t, error); + + switch (t->executor.write_state) { + case GRPC_CHTTP2_WRITING_INACTIVE: + case GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER: + case GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER: + case GRPC_CHTTP2_WRITE_SCHEDULED: + GPR_UNREACHABLE_CODE(break); + case GRPC_CHTTP2_WRITING: + set_write_state(t, GRPC_CHTTP2_WRITING_INACTIVE, "terminate_writing"); + break; + case GRPC_CHTTP2_WRITING_STALE_WITH_POLLER: + set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER, + "terminate_writing"); + break; + case GRPC_CHTTP2_WRITING_STALE_NO_POLLER: + set_write_state(t, GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER, + "terminate_writing"); + break; } - /* leave the writing flag up on shutdown to prevent further writes in - unlock() - from starting */ - t->executor.writing_active = 0; if (t->ep && !t->endpoint_reading) { destroy_endpoint(exec_ctx, t); } UNREF_TRANSPORT(exec_ctx, t, "writing"); - GRPC_ERROR_UNREF(error); } void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx, @@ -878,7 +1059,8 @@ static void maybe_start_some_streams( stream_global->id, STREAM_FROM_GLOBAL(stream_global)); stream_global->in_stream_map = true; transport_global->concurrent_stream_count++; - grpc_chttp2_become_writable(transport_global, stream_global); + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, true, + "new_stream"); } /* cancel out streams that will never be started */ while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID && @@ -1018,9 +1200,11 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, maybe_start_some_streams(exec_ctx, transport_global); } else { GPR_ASSERT(stream_global->id != 0); - grpc_chttp2_become_writable(transport_global, stream_global); + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, + true, "op.send_initial_metadata"); } } else { + stream_global->send_trailing_metadata = NULL; grpc_chttp2_complete_closure_step( exec_ctx, transport_global, stream_global, &stream_global->send_initial_metadata_finished, @@ -1042,7 +1226,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, } else { stream_global->send_message = op->send_message; if (stream_global->id != 0) { - grpc_chttp2_become_writable(transport_global, stream_global); + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, + true, "op.send_message"); } } } @@ -1075,6 +1260,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } if (stream_global->write_closed) { + stream_global->send_trailing_metadata = NULL; grpc_chttp2_complete_closure_step( exec_ctx, transport_global, stream_global, &stream_global->send_trailing_metadata_finished, @@ -1085,7 +1271,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, } else if (stream_global->id != 0) { /* TODO(ctiller): check if there's flow control for any outstanding bytes before going writable */ - grpc_chttp2_become_writable(transport_global, stream_global); + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, + true, "op.send_trailing_metadata"); } } } @@ -1106,8 +1293,8 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx, (stream_global->incoming_frames.head == NULL || stream_global->incoming_frames.head->is_tail)) { incoming_byte_stream_update_flow_control( - transport_global, stream_global, transport_global->stream_lookahead, - 0); + exec_ctx, transport_global, stream_global, + transport_global->stream_lookahead, 0); } grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); } @@ -1135,7 +1322,8 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt, sizeof(*op)); } -static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) { +static void send_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, + grpc_closure *on_recv) { grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p)); p->next = &t->global.pings; p->prev = p->next->prev; @@ -1150,6 +1338,7 @@ static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) { p->id[7] = (uint8_t)(t->global.ping_counter & 0xff); p->on_recv = on_recv; gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id)); + grpc_chttp2_initiate_write(exec_ctx, &t->global, true, "send_ping"); } static void ack_ping_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, @@ -1209,6 +1398,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, close_transport = grpc_chttp2_has_streams(t) ? GRPC_ERROR_NONE : GRPC_ERROR_CREATE("GOAWAY sent"); + grpc_chttp2_initiate_write(exec_ctx, &t->global, false, "goaway_sent"); } if (op->set_accept_stream) { @@ -1226,7 +1416,7 @@ static void perform_transport_op_locked(grpc_exec_ctx *exec_ctx, } if (op->send_ping) { - send_ping_locked(t, op->send_ping); + send_ping_locked(exec_ctx, t, op->send_ping); } if (close_transport != GRPC_ERROR_NONE) { @@ -1414,6 +1604,8 @@ static void cancel_from_api(grpc_exec_ctx *exec_ctx, &transport_global->qbuf, grpc_chttp2_rst_stream_create(stream_global->id, (uint32_t)http_error, &stream_global->stats.outgoing)); + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "rst_stream"); } const char *msg = @@ -1473,10 +1665,39 @@ void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx, } } +static void add_error(grpc_error *error, grpc_error **refs, size_t *nrefs) { + if (error == GRPC_ERROR_NONE) return; + for (size_t i = 0; i < *nrefs; i++) { + if (error == refs[i]) { + return; + } + } + refs[*nrefs] = error; + ++*nrefs; +} + +static grpc_error *removal_error(grpc_error *extra_error, + grpc_chttp2_stream_global *stream_global) { + grpc_error *refs[3]; + size_t nrefs = 0; + add_error(stream_global->read_closed_error, refs, &nrefs); + add_error(stream_global->write_closed_error, refs, &nrefs); + add_error(extra_error, refs, &nrefs); + grpc_error *error = GRPC_ERROR_NONE; + if (nrefs > 0) { + error = GRPC_ERROR_CREATE_REFERENCING("Failed due to stream removal", refs, + nrefs); + } + GRPC_ERROR_UNREF(extra_error); + return error; +} + static void fail_pending_writes(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, grpc_error *error) { + error = removal_error(error, stream_global); + stream_global->send_message = NULL; grpc_chttp2_complete_closure_step( exec_ctx, transport_global, stream_global, &stream_global->send_initial_metadata_finished, GRPC_ERROR_REF(error)); @@ -1499,14 +1720,17 @@ void grpc_chttp2_mark_stream_closed( } grpc_chttp2_list_add_check_read_ops(transport_global, stream_global); if (close_reads && !stream_global->read_closed) { + stream_global->read_closed_error = GRPC_ERROR_REF(error); stream_global->read_closed = true; stream_global->published_initial_metadata = true; stream_global->published_trailing_metadata = true; decrement_active_streams_locked(exec_ctx, transport_global, stream_global); } if (close_writes && !stream_global->write_closed) { + stream_global->write_closed_error = GRPC_ERROR_REF(error); stream_global->write_closed = true; - if (TRANSPORT_FROM_GLOBAL(transport_global)->executor.writing_active) { + if (TRANSPORT_FROM_GLOBAL(transport_global)->executor.write_state != + GRPC_CHTTP2_WRITING_INACTIVE) { GRPC_CHTTP2_STREAM_REF(stream_global, "finish_writes"); grpc_chttp2_list_add_closed_waiting_for_writing(transport_global, stream_global); @@ -1516,7 +1740,6 @@ void grpc_chttp2_mark_stream_closed( } } if (stream_global->read_closed && stream_global->write_closed) { - stream_global->removal_error = GRPC_ERROR_REF(error); if (stream_global->id != 0 && TRANSPORT_FROM_GLOBAL(transport_global)->executor.parsing_active) { grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global, @@ -1524,7 +1747,8 @@ void grpc_chttp2_mark_stream_closed( } else { if (stream_global->id != 0) { remove_stream(exec_ctx, TRANSPORT_FROM_GLOBAL(transport_global), - stream_global->id, GRPC_ERROR_REF(error)); + stream_global->id, + removal_error(GRPC_ERROR_REF(error), stream_global)); } GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); } @@ -1649,6 +1873,8 @@ static void close_from_api(grpc_exec_ctx *exec_ctx, grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1, 1, error); + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "close_from_api"); } typedef struct { @@ -1678,8 +1904,14 @@ static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, } /** update window from a settings change */ +typedef struct { + grpc_chttp2_transport *t; + grpc_exec_ctx *exec_ctx; +} update_global_window_args; + static void update_global_window(void *args, uint32_t id, void *stream) { - grpc_chttp2_transport *t = args; + update_global_window_args *a = args; + grpc_chttp2_transport *t = a->t; grpc_chttp2_stream *s = stream; grpc_chttp2_transport_global *transport_global = &t->global; grpc_chttp2_stream_global *stream_global = &s->global; @@ -1693,7 +1925,8 @@ static void update_global_window(void *args, uint32_t id, void *stream) { is_zero = stream_global->outgoing_window <= 0; if (was_zero && !is_zero) { - grpc_chttp2_become_writable(transport_global, stream_global); + grpc_chttp2_become_writable(a->exec_ctx, transport_global, stream_global, + true, "update_global_window"); } } @@ -1801,14 +2034,19 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, grpc_chttp2_transport_global *transport_global = &t->global; grpc_chttp2_transport_parsing *transport_parsing = &t->parsing; /* copy parsing qbuf to global qbuf */ - gpr_slice_buffer_move_into(&t->parsing.qbuf, &t->global.qbuf); + if (t->parsing.qbuf.count > 0) { + gpr_slice_buffer_move_into(&t->parsing.qbuf, &t->global.qbuf); + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "parsing_qbuf"); + } /* merge stream lists */ grpc_chttp2_stream_map_move_into(&t->new_stream_map, &t->parsing_stream_map); transport_global->concurrent_stream_count = (uint32_t)grpc_chttp2_stream_map_size(&t->parsing_stream_map); if (transport_parsing->initial_window_update != 0) { + update_global_window_args args = {t, exec_ctx}; grpc_chttp2_stream_map_for_each(&t->parsing_stream_map, - update_global_window, t); + update_global_window, &args); transport_parsing->initial_window_update = 0; } /* handle higher level things */ @@ -1831,7 +2069,7 @@ static void post_parse_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t, GPR_ASSERT(stream_global->write_closed); GPR_ASSERT(stream_global->read_closed); remove_stream(exec_ctx, t, stream_global->id, - GRPC_ERROR_REF(stream_global->removal_error)); + removal_error(GRPC_ERROR_NONE, stream_global)); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2"); } @@ -1854,11 +2092,12 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx, } drop_connection(exec_ctx, t, GRPC_ERROR_REF(error)); t->endpoint_reading = 0; - if (!t->executor.writing_active && t->ep) { - grpc_endpoint_destroy(exec_ctx, t->ep); - t->ep = NULL; - /* safe as we still have a ref for read */ - UNREF_TRANSPORT(exec_ctx, t, "disconnect"); + if (grpc_http_write_state_trace) { + gpr_log(GPR_DEBUG, "R:%p -> 0 ws=%s", t, + write_state_name(t->executor.write_state)); + } + if (t->executor.write_state == GRPC_CHTTP2_WRITING_INACTIVE && t->ep) { + destroy_endpoint(exec_ctx, t); } } else if (!t->closed) { keep_reading = true; @@ -1942,7 +2181,7 @@ static void incoming_byte_stream_unref(grpc_exec_ctx *exec_ctx, } static void incoming_byte_stream_update_flow_control( - grpc_chttp2_transport_global *transport_global, + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global, grpc_chttp2_stream_global *stream_global, size_t max_size_hint, size_t have_already) { uint32_t max_recv_bytes; @@ -1977,7 +2216,8 @@ static void incoming_byte_stream_update_flow_control( add_max_recv_bytes); grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global, stream_global); - grpc_chttp2_become_writable(transport_global, stream_global); + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, + false, "read_incoming_stream"); } } @@ -1999,8 +2239,9 @@ static void incoming_byte_stream_next_locked(grpc_exec_ctx *exec_ctx, grpc_chttp2_stream_global *stream_global = &bs->stream->global; if (bs->is_tail) { - incoming_byte_stream_update_flow_control( - transport_global, stream_global, arg->max_size_hint, bs->slices.length); + incoming_byte_stream_update_flow_control(exec_ctx, transport_global, + stream_global, arg->max_size_hint, + bs->slices.length); } if (bs->slices.count > 0) { *arg->slice = gpr_slice_buffer_take_first(&bs->slices); @@ -2184,7 +2425,7 @@ static char *format_flowctl_context_var(const char *context, const char *var, if (context == NULL) { *scope = NULL; gpr_asprintf(&buf, "%s(%" PRId64 ")", var, val); - result = gpr_leftpad(buf, ' ', 40); + result = gpr_leftpad(buf, ' ', 60); gpr_free(buf); return result; } @@ -2197,7 +2438,7 @@ static char *format_flowctl_context_var(const char *context, const char *var, gpr_free(tmp); } gpr_asprintf(&buf, "%s.%s(%" PRId64 ")", underscore_pos + 1, var, val); - result = gpr_leftpad(buf, ' ', 40); + result = gpr_leftpad(buf, ' ', 60); gpr_free(buf); return result; } @@ -2230,7 +2471,7 @@ void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase, tmp_phase = gpr_leftpad(phase, ' ', 8); tmp_scope1 = gpr_leftpad(scope1, ' ', 11); - gpr_asprintf(&prefix, "FLOW %s: %s %s ", phase, clisvr, scope1); + gpr_asprintf(&prefix, "FLOW %s: %s %s ", tmp_phase, clisvr, scope1); gpr_free(tmp_phase); gpr_free(tmp_scope1); diff --git a/src/core/ext/transport/chttp2/transport/internal.h b/src/core/ext/transport/chttp2/transport/internal.h index 8d79e93ceb..e1dcf5262a 100644 --- a/src/core/ext/transport/chttp2/transport/internal.h +++ b/src/core/ext/transport/chttp2/transport/internal.h @@ -305,6 +305,22 @@ typedef struct grpc_chttp2_executor_action_header { void *arg; } grpc_chttp2_executor_action_header; +typedef enum { + /** no writing activity */ + GRPC_CHTTP2_WRITING_INACTIVE, + /** write has been requested, but not scheduled yet */ + GRPC_CHTTP2_WRITE_REQUESTED_WITH_POLLER, + GRPC_CHTTP2_WRITE_REQUESTED_NO_POLLER, + /** write has been requested and scheduled against the workqueue */ + GRPC_CHTTP2_WRITE_SCHEDULED, + /** write has been initiated after being reaped from the workqueue */ + GRPC_CHTTP2_WRITING, + /** write has been initiated, AND another write needs to be started once it's + done */ + GRPC_CHTTP2_WRITING_STALE_WITH_POLLER, + GRPC_CHTTP2_WRITING_STALE_NO_POLLER, +} grpc_chttp2_write_state; + struct grpc_chttp2_transport { grpc_transport base; /* must be first */ gpr_refcount refs; @@ -319,10 +335,10 @@ struct grpc_chttp2_transport { /** is a thread currently in the global lock */ bool global_active; - /** is a thread currently writing */ - bool writing_active; /** is a thread currently parsing */ bool parsing_active; + /** write execution state of the transport */ + grpc_chttp2_write_state write_state; grpc_chttp2_executor_action_header *pending_actions_head; grpc_chttp2_executor_action_header *pending_actions_tail; @@ -342,7 +358,8 @@ struct grpc_chttp2_transport { /** global state for reading/writing */ grpc_chttp2_transport_global global; /** state only accessible by the chain of execution that - set writing_active=1 */ + set writing_state >= GRPC_WRITING, and only by the writing closure + chain. */ grpc_chttp2_transport_writing writing; /** state only accessible by the chain of execution that set parsing_active=1 */ @@ -363,6 +380,8 @@ struct grpc_chttp2_transport { grpc_closure reading_action; /** closure to actually do parsing */ grpc_closure parsing_action; + /** closure to initiate writing */ + grpc_closure initiate_writing; /** incoming read bytes */ gpr_slice_buffer read_buffer; @@ -436,8 +455,10 @@ typedef struct { bool seen_error; bool exceeded_metadata_size; - /** the error that resulted in this stream being removed */ - grpc_error *removal_error; + /** the error that resulted in this stream being read-closed */ + grpc_error *read_closed_error; + /** the error that resulted in this stream being write-closed */ + grpc_error *write_closed_error; bool published_initial_metadata; bool published_trailing_metadata; @@ -514,15 +535,20 @@ struct grpc_chttp2_stream { }; /** Transport writing call flow: - chttp2_transport.c calls grpc_chttp2_unlocking_check_writes to see if writes - are required; - if they are, chttp2_transport.c calls grpc_chttp2_perform_writes to do the - writes. - Once writes have been completed (meaning another write could potentially be - started), - grpc_chttp2_terminate_writing is called. This will call - grpc_chttp2_cleanup_writing, at which - point the write phase is complete. */ + grpc_chttp2_initiate_write() is called anywhere that we know bytes need to + go out on the wire. + If no other write has been started, a task is enqueued onto our workqueue. + When that task executes, it obtains the global lock, and gathers the data + to write. + The global lock is dropped and we do the syscall to write. + After writing, a follow-up check is made to see if another round of writing + should be performed. + + The actual call chain is documented in the implementation of this function. + */ +void grpc_chttp2_initiate_write(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global, + bool covered_by_poller, const char *reason); /** Someone is unlocking the transport mutex: check to see if writes are required, and schedule them if so */ @@ -610,9 +636,8 @@ int grpc_chttp2_list_pop_check_read_ops( void grpc_chttp2_list_add_writing_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing); -void grpc_chttp2_list_flush_writing_stalled_by_transport( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing, - bool is_window_available); +bool grpc_chttp2_list_flush_writing_stalled_by_transport( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing); void grpc_chttp2_list_add_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, @@ -822,7 +847,9 @@ void grpc_chttp2_ack_ping(grpc_exec_ctx *exec_ctx, /** add a ref to the stream and add it to the writable list; ref will be dropped in writing.c */ -void grpc_chttp2_become_writable(grpc_chttp2_transport_global *transport_global, - grpc_chttp2_stream_global *stream_global); +void grpc_chttp2_become_writable(grpc_exec_ctx *exec_ctx, + grpc_chttp2_transport_global *transport_global, + grpc_chttp2_stream_global *stream_global, + bool covered_by_poller, const char *reason); #endif /* GRPC_CORE_EXT_TRANSPORT_CHTTP2_TRANSPORT_INTERNAL_H */ diff --git a/src/core/ext/transport/chttp2/transport/parsing.c b/src/core/ext/transport/chttp2/transport/parsing.c index 84eb5752f1..e1fc0ddee2 100644 --- a/src/core/ext/transport/chttp2/transport/parsing.c +++ b/src/core/ext/transport/chttp2/transport/parsing.c @@ -154,10 +154,8 @@ void grpc_chttp2_publish_reads( transport_parsing, outgoing_window); is_zero = transport_global->outgoing_window <= 0; if (was_zero && !is_zero) { - while (grpc_chttp2_list_pop_stalled_by_transport(transport_global, - &stream_global)) { - grpc_chttp2_become_writable(transport_global, stream_global); - } + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "new_global_flow_control"); } if (transport_parsing->incoming_window < @@ -168,6 +166,8 @@ void grpc_chttp2_publish_reads( announce_incoming_window, announce_bytes); GRPC_CHTTP2_FLOW_CREDIT_TRANSPORT("parsed", transport_parsing, incoming_window, announce_bytes); + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "global incoming window"); } /* for each stream that saw an update, fixup global state */ @@ -190,7 +190,8 @@ void grpc_chttp2_publish_reads( outgoing_window); is_zero = stream_global->outgoing_window <= 0; if (was_zero && !is_zero) { - grpc_chttp2_become_writable(transport_global, stream_global); + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, + false, "stream.read_flow_control"); } stream_global->max_recv_bytes -= (uint32_t)GPR_MIN( diff --git a/src/core/ext/transport/chttp2/transport/stream_lists.c b/src/core/ext/transport/chttp2/transport/stream_lists.c index 8f3ab00e6d..2eb5f5f632 100644 --- a/src/core/ext/transport/chttp2/transport/stream_lists.c +++ b/src/core/ext/transport/chttp2/transport/stream_lists.c @@ -329,6 +329,7 @@ void grpc_chttp2_list_add_writing_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing) { grpc_chttp2_stream *stream = STREAM_FROM_WRITING(stream_writing); + gpr_log(GPR_DEBUG, "writing stalled %d", stream->global.id); if (!stream->included[GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT]) { GRPC_CHTTP2_STREAM_REF(&stream->global, "chttp2_writing_stalled"); } @@ -336,27 +337,28 @@ void grpc_chttp2_list_add_writing_stalled_by_transport( GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT); } -void grpc_chttp2_list_flush_writing_stalled_by_transport( - grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing, - bool is_window_available) { +bool grpc_chttp2_list_flush_writing_stalled_by_transport( + grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing) { grpc_chttp2_stream *stream; + bool out = false; grpc_chttp2_transport *transport = TRANSPORT_FROM_WRITING(transport_writing); while (stream_list_pop(transport, &stream, GRPC_CHTTP2_LIST_WRITING_STALLED_BY_TRANSPORT)) { - if (is_window_available) { - grpc_chttp2_become_writable(&transport->global, &stream->global); - } else { - grpc_chttp2_list_add_stalled_by_transport(transport_writing, - &stream->writing); - } + gpr_log(GPR_DEBUG, "move %d from writing stalled to just stalled", + stream->global.id); + grpc_chttp2_list_add_stalled_by_transport(transport_writing, + &stream->writing); GRPC_CHTTP2_STREAM_UNREF(exec_ctx, &stream->global, "chttp2_writing_stalled"); + out = true; } + return out; } void grpc_chttp2_list_add_stalled_by_transport( grpc_chttp2_transport_writing *transport_writing, grpc_chttp2_stream_writing *stream_writing) { + gpr_log(GPR_DEBUG, "stalled %d", stream_writing->id); stream_list_add(TRANSPORT_FROM_WRITING(transport_writing), STREAM_FROM_WRITING(stream_writing), GRPC_CHTTP2_LIST_STALLED_BY_TRANSPORT); diff --git a/src/core/ext/transport/chttp2/transport/writing.c b/src/core/ext/transport/chttp2/transport/writing.c index b19f5f068d..e0d87725e9 100644 --- a/src/core/ext/transport/chttp2/transport/writing.c +++ b/src/core/ext/transport/chttp2/transport/writing.c @@ -75,9 +75,13 @@ int grpc_chttp2_unlocking_check_writes( GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window, transport_global, outgoing_window); - bool is_window_available = transport_writing->outgoing_window > 0; - grpc_chttp2_list_flush_writing_stalled_by_transport( - exec_ctx, transport_writing, is_window_available); + if (transport_writing->outgoing_window > 0) { + while (grpc_chttp2_list_pop_stalled_by_transport(transport_global, + &stream_global)) { + grpc_chttp2_become_writable(exec_ctx, transport_global, stream_global, + false, "transport.read_flow_control"); + } + } /* for each grpc_chttp2_stream that's become writable, frame it's data (according to available window sizes) and add to the output buffer */ @@ -331,6 +335,12 @@ void grpc_chttp2_cleanup_writing( grpc_chttp2_stream_writing *stream_writing; grpc_chttp2_stream_global *stream_global; + if (grpc_chttp2_list_flush_writing_stalled_by_transport(exec_ctx, + transport_writing)) { + grpc_chttp2_initiate_write(exec_ctx, transport_global, false, + "resume_stalled_stream"); + } + while (grpc_chttp2_list_pop_written_stream( transport_global, transport_writing, &stream_global, &stream_writing)) { if (stream_writing->sent_initial_metadata) { diff --git a/src/core/lib/iomgr/endpoint.c b/src/core/lib/iomgr/endpoint.c index 1ab3733d38..f901fcf962 100644 --- a/src/core/lib/iomgr/endpoint.c +++ b/src/core/lib/iomgr/endpoint.c @@ -65,3 +65,7 @@ void grpc_endpoint_destroy(grpc_exec_ctx* exec_ctx, grpc_endpoint* ep) { char* grpc_endpoint_get_peer(grpc_endpoint* ep) { return ep->vtable->get_peer(ep); } + +grpc_workqueue* grpc_endpoint_get_workqueue(grpc_endpoint* ep) { + return ep->vtable->get_workqueue(ep); +} diff --git a/src/core/lib/iomgr/endpoint.h b/src/core/lib/iomgr/endpoint.h index f9808bbda1..894efc0b23 100644 --- a/src/core/lib/iomgr/endpoint.h +++ b/src/core/lib/iomgr/endpoint.h @@ -51,6 +51,7 @@ struct grpc_endpoint_vtable { gpr_slice_buffer *slices, grpc_closure *cb); void (*write)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, gpr_slice_buffer *slices, grpc_closure *cb); + grpc_workqueue *(*get_workqueue)(grpc_endpoint *ep); void (*add_to_pollset)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, grpc_pollset *pollset); void (*add_to_pollset_set)(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, @@ -69,6 +70,9 @@ void grpc_endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, char *grpc_endpoint_get_peer(grpc_endpoint *ep); +/* Retrieve a reference to the workqueue associated with this endpoint */ +grpc_workqueue *grpc_endpoint_get_workqueue(grpc_endpoint *ep); + /* Write slices out to the socket. If the connection is ready for more data after the end of the call, it diff --git a/src/core/lib/iomgr/ev_epoll_linux.c b/src/core/lib/iomgr/ev_epoll_linux.c index cf0fe736a0..6a63c4d1d1 100644 --- a/src/core/lib/iomgr/ev_epoll_linux.c +++ b/src/core/lib/iomgr/ev_epoll_linux.c @@ -57,6 +57,7 @@ #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/iomgr_internal.h" #include "src/core/lib/iomgr/wakeup_fd_posix.h" +#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" #include "src/core/lib/support/block_annotate.h" @@ -113,9 +114,7 @@ struct grpc_fd { grpc_closure *read_closure; grpc_closure *write_closure; - /* The polling island to which this fd belongs to and the mutex protecting the - the field */ - gpr_mu pi_mu; + /* The polling island to which this fd belongs to (protected by mu) */ struct polling_island *polling_island; struct grpc_fd *freelist_next; @@ -152,16 +151,17 @@ static void fd_global_shutdown(void); * Polling island Declarations */ -// #define GRPC_PI_REF_COUNT_DEBUG +//#define GRPC_PI_REF_COUNT_DEBUG #ifdef GRPC_PI_REF_COUNT_DEBUG #define PI_ADD_REF(p, r) pi_add_ref_dbg((p), (r), __FILE__, __LINE__) -#define PI_UNREF(p, r) pi_unref_dbg((p), (r), __FILE__, __LINE__) +#define PI_UNREF(exec_ctx, p, r) \ + pi_unref_dbg((exec_ctx), (p), (r), __FILE__, __LINE__) #else /* defined(GRPC_PI_REF_COUNT_DEBUG) */ #define PI_ADD_REF(p, r) pi_add_ref((p)) -#define PI_UNREF(p, r) pi_unref((p)) +#define PI_UNREF(exec_ctx, p, r) pi_unref((exec_ctx), (p)) #endif /* !defined(GPRC_PI_REF_COUNT_DEBUG) */ @@ -172,7 +172,7 @@ typedef struct polling_island { Once the ref count becomes zero, this structure is destroyed which means we should ensure that there is never a scenario where a PI_ADD_REF() is racing with a PI_UNREF() that just made the ref_count zero. */ - gpr_refcount ref_count; + gpr_atm ref_count; /* Pointer to the polling_island this merged into. * merged_to value is only set once in polling_island's lifetime (and that too @@ -184,6 +184,9 @@ typedef struct polling_island { * (except mu and ref_count) are invalid and must be ignored. */ gpr_atm merged_to; + /* The workqueue associated with this polling island */ + grpc_workqueue *workqueue; + /* The fd of the underlying epoll set */ int epoll_fd; @@ -191,11 +194,6 @@ typedef struct polling_island { size_t fd_cnt; size_t fd_capacity; grpc_fd **fds; - - /* Polling islands that are no longer needed are kept in a freelist so that - they can be reused. This field points to the next polling island in the - free list */ - struct polling_island *next_free; } polling_island; /******************************************************************************* @@ -253,13 +251,14 @@ struct grpc_pollset_set { * Common helpers */ -static void append_error(grpc_error **composite, grpc_error *error, +static bool append_error(grpc_error **composite, grpc_error *error, const char *desc) { - if (error == GRPC_ERROR_NONE) return; + if (error == GRPC_ERROR_NONE) return true; if (*composite == GRPC_ERROR_NONE) { *composite = GRPC_ERROR_CREATE(desc); } *composite = grpc_error_add_child(*composite, error); + return false; } /******************************************************************************* @@ -275,11 +274,8 @@ static void append_error(grpc_error **composite, grpc_error *error, threads that woke up MUST NOT call grpc_wakeup_fd_consume_wakeup() */ static grpc_wakeup_fd polling_island_wakeup_fd; -/* Polling island freelist */ -static gpr_mu g_pi_freelist_mu; -static polling_island *g_pi_freelist = NULL; - -static void polling_island_delete(); /* Forward declaration */ +/* Forward declaration */ +static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi); #ifdef GRPC_TSAN /* Currently TSAN may incorrectly flag data races between epoll_ctl and @@ -293,28 +289,35 @@ gpr_atm g_epoll_sync; #endif /* defined(GRPC_TSAN) */ #ifdef GRPC_PI_REF_COUNT_DEBUG -void pi_add_ref(polling_island *pi); -void pi_unref(polling_island *pi); +static void pi_add_ref(polling_island *pi); +static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi); -void pi_add_ref_dbg(polling_island *pi, char *reason, char *file, int line) { - long old_cnt = gpr_atm_acq_load(&(pi->ref_count.count)); +static void pi_add_ref_dbg(polling_island *pi, char *reason, char *file, + int line) { + long old_cnt = gpr_atm_acq_load(&pi->ref_count); pi_add_ref(pi); gpr_log(GPR_DEBUG, "Add ref pi: %p, old: %ld -> new:%ld (%s) - (%s, %d)", (void *)pi, old_cnt, old_cnt + 1, reason, file, line); } -void pi_unref_dbg(polling_island *pi, char *reason, char *file, int line) { - long old_cnt = gpr_atm_acq_load(&(pi->ref_count.count)); - pi_unref(pi); +static void pi_unref_dbg(grpc_exec_ctx *exec_ctx, polling_island *pi, + char *reason, char *file, int line) { + long old_cnt = gpr_atm_acq_load(&pi->ref_count); + pi_unref(exec_ctx, pi); gpr_log(GPR_DEBUG, "Unref pi: %p, old:%ld -> new:%ld (%s) - (%s, %d)", (void *)pi, old_cnt, (old_cnt - 1), reason, file, line); } #endif -void pi_add_ref(polling_island *pi) { gpr_ref(&pi->ref_count); } +static void pi_add_ref(polling_island *pi) { + gpr_atm_no_barrier_fetch_add(&pi->ref_count, 1); +} -void pi_unref(polling_island *pi) { - /* If ref count went to zero, delete the polling island. +static void pi_unref(grpc_exec_ctx *exec_ctx, polling_island *pi) { + /* If ref count went to one, we're back to just the workqueue owning a ref. + Unref the workqueue to break the loop. + + If ref count went to zero, delete the polling island. Note that this deletion not be done under a lock. Once the ref count goes to zero, we are guaranteed that no one else holds a reference to the polling island (and that there is no racing pi_add_ref() call either). @@ -322,12 +325,20 @@ void pi_unref(polling_island *pi) { Also, if we are deleting the polling island and the merged_to field is non-empty, we should remove a ref to the merged_to polling island */ - if (gpr_unref(&pi->ref_count)) { - polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to); - polling_island_delete(pi); - if (next != NULL) { - PI_UNREF(next, "pi_delete"); /* Recursive call */ + switch (gpr_atm_full_fetch_add(&pi->ref_count, -1)) { + case 2: /* last external ref: the only one now owned is by the workqueue */ + GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island"); + break; + case 1: { + polling_island *next = (polling_island *)gpr_atm_acq_load(&pi->merged_to); + polling_island_delete(exec_ctx, pi); + if (next != NULL) { + PI_UNREF(exec_ctx, next, "pi_delete"); /* Recursive call */ + } + break; } + case 0: + GPR_UNREACHABLE_CODE(return ); } } @@ -462,69 +473,68 @@ static void polling_island_remove_fd_locked(polling_island *pi, grpc_fd *fd, } /* Might return NULL in case of an error */ -static polling_island *polling_island_create(grpc_fd *initial_fd, +static polling_island *polling_island_create(grpc_exec_ctx *exec_ctx, + grpc_fd *initial_fd, grpc_error **error) { polling_island *pi = NULL; - char *err_msg; const char *err_desc = "polling_island_create"; - /* Try to get one from the polling island freelist */ - gpr_mu_lock(&g_pi_freelist_mu); - if (g_pi_freelist != NULL) { - pi = g_pi_freelist; - g_pi_freelist = g_pi_freelist->next_free; - pi->next_free = NULL; - } - gpr_mu_unlock(&g_pi_freelist_mu); + *error = GRPC_ERROR_NONE; - /* Create new polling island if we could not get one from the free list */ - if (pi == NULL) { - pi = gpr_malloc(sizeof(*pi)); - gpr_mu_init(&pi->mu); - pi->fd_cnt = 0; - pi->fd_capacity = 0; - pi->fds = NULL; - } + pi = gpr_malloc(sizeof(*pi)); + gpr_mu_init(&pi->mu); + pi->fd_cnt = 0; + pi->fd_capacity = 0; + pi->fds = NULL; + pi->epoll_fd = -1; + pi->workqueue = NULL; - gpr_ref_init(&pi->ref_count, 0); + gpr_atm_rel_store(&pi->ref_count, 0); gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL); pi->epoll_fd = epoll_create1(EPOLL_CLOEXEC); if (pi->epoll_fd < 0) { - gpr_asprintf(&err_msg, "epoll_create1 failed with error %d (%s)", errno, - strerror(errno)); - append_error(error, GRPC_OS_ERROR(errno, err_msg), err_desc); - gpr_free(err_msg); - } else { - polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error); - pi->next_free = NULL; + append_error(error, GRPC_OS_ERROR(errno, "epoll_create1"), err_desc); + goto done; + } - if (initial_fd != NULL) { - /* Lock the polling island here just in case we got this structure from - the freelist and the polling island lock was not released yet (by the - code that adds the polling island to the freelist) */ - gpr_mu_lock(&pi->mu); - polling_island_add_fds_locked(pi, &initial_fd, 1, true, error); - gpr_mu_unlock(&pi->mu); - } + polling_island_add_wakeup_fd_locked(pi, &grpc_global_wakeup_fd, error); + + if (initial_fd != NULL) { + polling_island_add_fds_locked(pi, &initial_fd, 1, true, error); + } + + if (append_error(error, grpc_workqueue_create(exec_ctx, &pi->workqueue), + err_desc) && + *error == GRPC_ERROR_NONE) { + polling_island_add_fds_locked(pi, &pi->workqueue->wakeup_read_fd, 1, true, + error); + GPR_ASSERT(pi->workqueue->wakeup_read_fd->polling_island == NULL); + pi->workqueue->wakeup_read_fd->polling_island = pi; + PI_ADD_REF(pi, "fd"); } +done: + if (*error != GRPC_ERROR_NONE) { + if (pi->workqueue != NULL) { + GRPC_WORKQUEUE_UNREF(exec_ctx, pi->workqueue, "polling_island"); + } + polling_island_delete(exec_ctx, pi); + pi = NULL; + } return pi; } -static void polling_island_delete(polling_island *pi) { +static void polling_island_delete(grpc_exec_ctx *exec_ctx, polling_island *pi) { GPR_ASSERT(pi->fd_cnt == 0); - gpr_atm_rel_store(&pi->merged_to, (gpr_atm)NULL); - - close(pi->epoll_fd); - pi->epoll_fd = -1; - - gpr_mu_lock(&g_pi_freelist_mu); - pi->next_free = g_pi_freelist; - g_pi_freelist = pi; - gpr_mu_unlock(&g_pi_freelist_mu); + if (pi->epoll_fd >= 0) { + close(pi->epoll_fd); + } + gpr_mu_destroy(&pi->mu); + gpr_free(pi->fds); + gpr_free(pi); } /* Attempts to gets the last polling island in the linked list (liked by the @@ -704,9 +714,6 @@ static polling_island *polling_island_merge(polling_island *p, static grpc_error *polling_island_global_init() { grpc_error *error = GRPC_ERROR_NONE; - gpr_mu_init(&g_pi_freelist_mu); - g_pi_freelist = NULL; - error = grpc_wakeup_fd_init(&polling_island_wakeup_fd); if (error == GRPC_ERROR_NONE) { error = grpc_wakeup_fd_wakeup(&polling_island_wakeup_fd); @@ -716,18 +723,6 @@ static grpc_error *polling_island_global_init() { } static void polling_island_global_shutdown() { - polling_island *next; - gpr_mu_lock(&g_pi_freelist_mu); - gpr_mu_unlock(&g_pi_freelist_mu); - while (g_pi_freelist != NULL) { - next = g_pi_freelist->next_free; - gpr_mu_destroy(&g_pi_freelist->mu); - gpr_free(g_pi_freelist->fds); - gpr_free(g_pi_freelist); - g_pi_freelist = next; - } - gpr_mu_destroy(&g_pi_freelist_mu); - grpc_wakeup_fd_destroy(&polling_island_wakeup_fd); } @@ -845,7 +840,6 @@ static grpc_fd *fd_create(int fd, const char *name) { if (new_fd == NULL) { new_fd = gpr_malloc(sizeof(grpc_fd)); gpr_mu_init(&new_fd->mu); - gpr_mu_init(&new_fd->pi_mu); } /* Note: It is not really needed to get the new_fd->mu lock here. If this is a @@ -896,6 +890,7 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, const char *reason) { bool is_fd_closed = false; grpc_error *error = GRPC_ERROR_NONE; + polling_island *unref_pi = NULL; gpr_mu_lock(&fd->mu); fd->on_done_closure = on_done; @@ -923,21 +918,26 @@ static void fd_orphan(grpc_exec_ctx *exec_ctx, grpc_fd *fd, - Unlock the latest polling island - Set fd->polling_island to NULL (but remove the ref on the polling island before doing this.) */ - gpr_mu_lock(&fd->pi_mu); if (fd->polling_island != NULL) { polling_island *pi_latest = polling_island_lock(fd->polling_island); polling_island_remove_fd_locked(pi_latest, fd, is_fd_closed, &error); gpr_mu_unlock(&pi_latest->mu); - PI_UNREF(fd->polling_island, "fd_orphan"); + unref_pi = fd->polling_island; fd->polling_island = NULL; } - gpr_mu_unlock(&fd->pi_mu); grpc_exec_ctx_sched(exec_ctx, fd->on_done_closure, error, NULL); gpr_mu_unlock(&fd->mu); UNREF_BY(fd, 2, reason); /* Drop the reference */ + if (unref_pi != NULL) { + /* Unref stale polling island here, outside the fd lock above. + The polling island owns a workqueue which owns an fd, and unreffing + inside the lock can cause an eventual lock loop that makes TSAN very + unhappy. */ + PI_UNREF(exec_ctx, unref_pi, "fd_orphan"); + } GRPC_LOG_IF_ERROR("fd_orphan", GRPC_ERROR_REF(error)); } @@ -1037,6 +1037,17 @@ static void fd_notify_on_write(grpc_exec_ctx *exec_ctx, grpc_fd *fd, gpr_mu_unlock(&fd->mu); } +static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { + gpr_mu_lock(&fd->mu); + grpc_workqueue *workqueue = NULL; + if (fd->polling_island != NULL) { + workqueue = + GRPC_WORKQUEUE_REF(fd->polling_island->workqueue, "get_workqueue"); + } + gpr_mu_unlock(&fd->mu); + return workqueue; +} + /******************************************************************************* * Pollset Definitions */ @@ -1227,9 +1238,10 @@ static void fd_become_writable(grpc_exec_ctx *exec_ctx, grpc_fd *fd) { gpr_mu_unlock(&fd->mu); } -static void pollset_release_polling_island(grpc_pollset *ps, char *reason) { +static void pollset_release_polling_island(grpc_exec_ctx *exec_ctx, + grpc_pollset *ps, char *reason) { if (ps->polling_island != NULL) { - PI_UNREF(ps->polling_island, reason); + PI_UNREF(exec_ctx, ps->polling_island, reason); } ps->polling_island = NULL; } @@ -1242,7 +1254,7 @@ static void finish_shutdown_locked(grpc_exec_ctx *exec_ctx, pollset->finish_shutdown_called = true; /* Release the ref and set pollset->polling_island to NULL */ - pollset_release_polling_island(pollset, "ps_shutdown"); + pollset_release_polling_island(exec_ctx, pollset, "ps_shutdown"); grpc_exec_ctx_sched(exec_ctx, pollset->shutdown_done, GRPC_ERROR_NONE, NULL); } @@ -1281,7 +1293,7 @@ static void pollset_reset(grpc_pollset *pollset) { pollset->finish_shutdown_called = false; pollset->kicked_without_pollers = false; pollset->shutdown_done = NULL; - pollset_release_polling_island(pollset, "ps_reset"); + GPR_ASSERT(pollset->polling_island == NULL); } #define GRPC_EPOLL_MAX_EVENTS 1000 @@ -1309,7 +1321,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, this function (i.e pollset_work_and_unlock()) is called */ if (pollset->polling_island == NULL) { - pollset->polling_island = polling_island_create(NULL, error); + pollset->polling_island = polling_island_create(exec_ctx, NULL, error); if (pollset->polling_island == NULL) { GPR_TIMER_END("pollset_work_and_unlock", 0); return; /* Fatal error. We cannot continue */ @@ -1329,7 +1341,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, /* Always do PI_ADD_REF before PI_UNREF because PI_UNREF may cause the polling island to be deleted */ PI_ADD_REF(pi, "ps"); - PI_UNREF(pollset->polling_island, "ps"); + PI_UNREF(exec_ctx, pollset->polling_island, "ps"); pollset->polling_island = pi; } @@ -1400,7 +1412,7 @@ static void pollset_work_and_unlock(grpc_exec_ctx *exec_ctx, that we got before releasing the polling island lock). This is because pollset->polling_island pointer might get udpated in other parts of the code when there is an island merge while we are doing epoll_wait() above */ - PI_UNREF(pi, "ps_work"); + PI_UNREF(exec_ctx, pi, "ps_work"); GPR_TIMER_END("pollset_work_and_unlock", 0); } @@ -1517,10 +1529,11 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_error *error = GRPC_ERROR_NONE; gpr_mu_lock(&pollset->mu); - gpr_mu_lock(&fd->pi_mu); + gpr_mu_lock(&fd->mu); polling_island *pi_new = NULL; +retry: /* 1) If fd->polling_island and pollset->polling_island are both non-NULL and * equal, do nothing. * 2) If fd->polling_island and pollset->polling_island are both NULL, create @@ -1535,15 +1548,44 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, * polling_island fields in both fd and pollset to point to the merged * polling island. */ + + if (fd->orphaned) { + gpr_mu_unlock(&fd->mu); + gpr_mu_unlock(&pollset->mu); + /* early out */ + return; + } + if (fd->polling_island == pollset->polling_island) { pi_new = fd->polling_island; if (pi_new == NULL) { - pi_new = polling_island_create(fd, &error); - - GRPC_POLLING_TRACE( - "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, " - "pollset: %p)", - (void *)pi_new, fd->fd, (void *)pollset); + /* Unlock before creating a new polling island: the polling island will + create a workqueue which creates a file descriptor, and holding an fd + lock here can eventually cause a loop to appear to TSAN (making it + unhappy). We don't think it's a real loop (there's an epoch point where + that loop possibility disappears), but the advantages of keeping TSAN + happy outweigh any performance advantage we might have by keeping the + lock held. */ + gpr_mu_unlock(&fd->mu); + pi_new = polling_island_create(exec_ctx, fd, &error); + gpr_mu_lock(&fd->mu); + /* Need to reverify any assumptions made between the initial lock and + getting to this branch: if they've changed, we need to throw away our + work and figure things out again. */ + if (fd->polling_island != NULL) { + GRPC_POLLING_TRACE( + "pollset_add_fd: Raced creating new polling island. pi_new: %p " + "(fd: %d, pollset: %p)", + (void *)pi_new, fd->fd, (void *)pollset); + PI_ADD_REF(pi_new, "dance_of_destruction"); + PI_UNREF(exec_ctx, pi_new, "dance_of_destruction"); + goto retry; + } else { + GRPC_POLLING_TRACE( + "pollset_add_fd: Created new polling island. pi_new: %p (fd: %d, " + "pollset: %p)", + (void *)pi_new, fd->fd, (void *)pollset); + } } } else if (fd->polling_island == NULL) { pi_new = polling_island_lock(pollset->polling_island); @@ -1579,7 +1621,7 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (fd->polling_island != pi_new) { PI_ADD_REF(pi_new, "fd"); if (fd->polling_island != NULL) { - PI_UNREF(fd->polling_island, "fd"); + PI_UNREF(exec_ctx, fd->polling_island, "fd"); } fd->polling_island = pi_new; } @@ -1587,13 +1629,15 @@ static void pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, if (pollset->polling_island != pi_new) { PI_ADD_REF(pi_new, "ps"); if (pollset->polling_island != NULL) { - PI_UNREF(pollset->polling_island, "ps"); + PI_UNREF(exec_ctx, pollset->polling_island, "ps"); } pollset->polling_island = pi_new; } - gpr_mu_unlock(&fd->pi_mu); + gpr_mu_unlock(&fd->mu); gpr_mu_unlock(&pollset->mu); + + GRPC_LOG_IF_ERROR("pollset_add_fd", error); } /******************************************************************************* @@ -1744,9 +1788,9 @@ static void pollset_set_del_pollset_set(grpc_exec_ctx *exec_ctx, void *grpc_fd_get_polling_island(grpc_fd *fd) { polling_island *pi; - gpr_mu_lock(&fd->pi_mu); + gpr_mu_lock(&fd->mu); pi = fd->polling_island; - gpr_mu_unlock(&fd->pi_mu); + gpr_mu_unlock(&fd->mu); return pi; } @@ -1794,6 +1838,7 @@ static const grpc_event_engine_vtable vtable = { .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, + .fd_get_workqueue = fd_get_workqueue, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, diff --git a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c index 9e306af5fa..c2107e5e39 100644 --- a/src/core/lib/iomgr/ev_poll_and_epoll_posix.c +++ b/src/core/lib/iomgr/ev_poll_and_epoll_posix.c @@ -725,6 +725,8 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, GRPC_FD_UNREF(fd, "poll"); } +static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; } + /******************************************************************************* * pollset_posix.c */ @@ -2006,6 +2008,7 @@ static const grpc_event_engine_vtable vtable = { .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, + .fd_get_workqueue = fd_get_workqueue, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, diff --git a/src/core/lib/iomgr/ev_poll_posix.c b/src/core/lib/iomgr/ev_poll_posix.c index 45c0a5e954..4b593f4b2c 100644 --- a/src/core/lib/iomgr/ev_poll_posix.c +++ b/src/core/lib/iomgr/ev_poll_posix.c @@ -617,6 +617,8 @@ static void fd_end_poll(grpc_exec_ctx *exec_ctx, grpc_fd_watcher *watcher, GRPC_FD_UNREF(fd, "poll"); } +static grpc_workqueue *fd_get_workqueue(grpc_fd *fd) { return NULL; } + /******************************************************************************* * pollset_posix.c */ @@ -1234,6 +1236,7 @@ static const grpc_event_engine_vtable vtable = { .fd_notify_on_read = fd_notify_on_read, .fd_notify_on_write = fd_notify_on_write, .fd_get_read_notifier_pollset = fd_get_read_notifier_pollset, + .fd_get_workqueue = fd_get_workqueue, .pollset_init = pollset_init, .pollset_shutdown = pollset_shutdown, diff --git a/src/core/lib/iomgr/ev_posix.c b/src/core/lib/iomgr/ev_posix.c index a3c1e9db9a..6536672685 100644 --- a/src/core/lib/iomgr/ev_posix.c +++ b/src/core/lib/iomgr/ev_posix.c @@ -148,6 +148,10 @@ grpc_fd *grpc_fd_create(int fd, const char *name) { return g_event_engine->fd_create(fd, name); } +grpc_workqueue *grpc_fd_get_workqueue(grpc_fd *fd) { + return g_event_engine->fd_get_workqueue(fd); +} + int grpc_fd_wrapped_fd(grpc_fd *fd) { return g_event_engine->fd_wrapped_fd(fd); } diff --git a/src/core/lib/iomgr/ev_posix.h b/src/core/lib/iomgr/ev_posix.h index 579c84ef70..c2aa1756ea 100644 --- a/src/core/lib/iomgr/ev_posix.h +++ b/src/core/lib/iomgr/ev_posix.h @@ -56,6 +56,7 @@ typedef struct grpc_event_engine_vtable { void (*fd_notify_on_write)(grpc_exec_ctx *exec_ctx, grpc_fd *fd, grpc_closure *closure); bool (*fd_is_shutdown)(grpc_fd *fd); + grpc_workqueue *(*fd_get_workqueue)(grpc_fd *fd); grpc_pollset *(*fd_get_read_notifier_pollset)(grpc_exec_ctx *exec_ctx, grpc_fd *fd); @@ -107,6 +108,9 @@ const char *grpc_get_poll_strategy_name(); This takes ownership of closing fd. */ grpc_fd *grpc_fd_create(int fd, const char *name); +/* Get a workqueue that's associated with this fd */ +grpc_workqueue *grpc_fd_get_workqueue(grpc_fd *fd); + /* Return the wrapped fd, or -1 if it has been released or closed. */ int grpc_fd_wrapped_fd(grpc_fd *fd); diff --git a/src/core/lib/iomgr/exec_ctx.c b/src/core/lib/iomgr/exec_ctx.c index c44aafcddf..ac7785ec13 100644 --- a/src/core/lib/iomgr/exec_ctx.c +++ b/src/core/lib/iomgr/exec_ctx.c @@ -37,6 +37,7 @@ #include <grpc/support/sync.h> #include <grpc/support/thd.h> +#include "src/core/lib/iomgr/workqueue.h" #include "src/core/lib/profiling/timers.h" bool grpc_exec_ctx_ready_to_finish(grpc_exec_ctx *exec_ctx) { @@ -85,14 +86,17 @@ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx) { void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error, grpc_workqueue *offload_target_or_null) { - GPR_ASSERT(offload_target_or_null == NULL); - grpc_closure_list_append(&exec_ctx->closure_list, closure, error); + if (offload_target_or_null == NULL) { + grpc_closure_list_append(&exec_ctx->closure_list, closure, error); + } else { + grpc_workqueue_enqueue(exec_ctx, offload_target_or_null, closure, error); + GRPC_WORKQUEUE_UNREF(exec_ctx, offload_target_or_null, "exec_ctx_sched"); + } } void grpc_exec_ctx_enqueue_list(grpc_exec_ctx *exec_ctx, grpc_closure_list *list, grpc_workqueue *offload_target_or_null) { - GPR_ASSERT(offload_target_or_null == NULL); grpc_closure_list_move(list, &exec_ctx->closure_list); } diff --git a/src/core/lib/iomgr/exec_ctx.h b/src/core/lib/iomgr/exec_ctx.h index 38f27d9b13..917f332f03 100644 --- a/src/core/lib/iomgr/exec_ctx.h +++ b/src/core/lib/iomgr/exec_ctx.h @@ -93,7 +93,11 @@ bool grpc_exec_ctx_flush(grpc_exec_ctx *exec_ctx); /** Finish any pending work for a grpc_exec_ctx. Must be called before * the instance is destroyed, or work may be lost. */ void grpc_exec_ctx_finish(grpc_exec_ctx *exec_ctx); -/** Add a closure to be executed at the next flush/finish point */ +/** Add a closure to be executed in the future. + If \a offload_target_or_null is NULL, the closure will be executed at the + next exec_ctx.{finish,flush} point. + If \a offload_target_or_null is non-NULL, the closure will be scheduled + against the workqueue, and a reference to the workqueue will be consumed. */ void grpc_exec_ctx_sched(grpc_exec_ctx *exec_ctx, grpc_closure *closure, grpc_error *error, grpc_workqueue *offload_target_or_null); diff --git a/src/core/lib/iomgr/iomgr.c b/src/core/lib/iomgr/iomgr.c index 89292a153e..d67d388b8c 100644 --- a/src/core/lib/iomgr/iomgr.c +++ b/src/core/lib/iomgr/iomgr.c @@ -45,6 +45,7 @@ #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/iomgr_internal.h" +#include "src/core/lib/iomgr/network_status_tracker.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/support/env.h" #include "src/core/lib/support/string.h" @@ -62,6 +63,7 @@ void grpc_iomgr_init(void) { grpc_timer_list_init(gpr_now(GPR_CLOCK_MONOTONIC)); g_root_object.next = g_root_object.prev = &g_root_object; g_root_object.name = "root"; + grpc_network_status_init(); grpc_iomgr_platform_init(); } @@ -140,6 +142,7 @@ void grpc_iomgr_shutdown(void) { grpc_iomgr_platform_shutdown(); grpc_exec_ctx_global_shutdown(); + grpc_network_status_shutdown(); gpr_mu_destroy(&g_mu); gpr_cv_destroy(&g_rcv); } diff --git a/src/core/lib/iomgr/network_status_tracker.c b/src/core/lib/iomgr/network_status_tracker.c index 38a1c9b7d4..b4bb7e3cf7 100644 --- a/src/core/lib/iomgr/network_status_tracker.c +++ b/src/core/lib/iomgr/network_status_tracker.c @@ -42,10 +42,16 @@ typedef struct endpoint_ll_node { static endpoint_ll_node *head = NULL; static gpr_mu g_endpoint_mutex; -static bool g_init_done = false; -void grpc_initialize_network_status_monitor() { - g_init_done = true; +void grpc_network_status_shutdown(void) { + if (head != NULL) { + gpr_log(GPR_ERROR, + "Memory leaked as all network endpoints were not shut down"); + } + gpr_mu_destroy(&g_endpoint_mutex); +} + +void grpc_network_status_init(void) { gpr_mu_init(&g_endpoint_mutex); // TODO(makarandd): Install callback with OS to monitor network status. } @@ -60,9 +66,6 @@ void grpc_destroy_network_status_monitor() { } void grpc_network_status_register_endpoint(grpc_endpoint *ep) { - if (!g_init_done) { - grpc_initialize_network_status_monitor(); - } gpr_mu_lock(&g_endpoint_mutex); if (head == NULL) { head = (endpoint_ll_node *)gpr_malloc(sizeof(endpoint_ll_node)); diff --git a/src/core/lib/iomgr/network_status_tracker.h b/src/core/lib/iomgr/network_status_tracker.h index 74a1aa8135..67cb645f44 100644 --- a/src/core/lib/iomgr/network_status_tracker.h +++ b/src/core/lib/iomgr/network_status_tracker.h @@ -35,7 +35,11 @@ #define GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H #include "src/core/lib/iomgr/endpoint.h" +void grpc_network_status_init(void); +void grpc_network_status_shutdown(void); + void grpc_network_status_register_endpoint(grpc_endpoint *ep); void grpc_network_status_unregister_endpoint(grpc_endpoint *ep); void grpc_network_status_shutdown_all_endpoints(); + #endif /* GRPC_CORE_LIB_IOMGR_NETWORK_STATUS_TRACKER_H */ diff --git a/src/core/lib/iomgr/tcp_posix.c b/src/core/lib/iomgr/tcp_posix.c index 2ab45e33ce..ec21e03944 100644 --- a/src/core/lib/iomgr/tcp_posix.c +++ b/src/core/lib/iomgr/tcp_posix.c @@ -284,7 +284,7 @@ static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep, } /* returns true if done, false if pending; if returning true, *error is set */ -#define MAX_WRITE_IOVEC 16 +#define MAX_WRITE_IOVEC 1024 static bool tcp_flush(grpc_tcp *tcp, grpc_error **error) { struct msghdr msg; struct iovec iov[MAX_WRITE_IOVEC]; @@ -450,9 +450,19 @@ static char *tcp_get_peer(grpc_endpoint *ep) { return gpr_strdup(tcp->peer_string); } -static const grpc_endpoint_vtable vtable = { - tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set, - tcp_shutdown, tcp_destroy, tcp_get_peer}; +static grpc_workqueue *tcp_get_workqueue(grpc_endpoint *ep) { + grpc_tcp *tcp = (grpc_tcp *)ep; + return grpc_fd_get_workqueue(tcp->em_fd); +} + +static const grpc_endpoint_vtable vtable = {tcp_read, + tcp_write, + tcp_get_workqueue, + tcp_add_to_pollset, + tcp_add_to_pollset_set, + tcp_shutdown, + tcp_destroy, + tcp_get_peer}; grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size, const char *peer_string) { diff --git a/src/core/lib/iomgr/tcp_server_posix.c b/src/core/lib/iomgr/tcp_server_posix.c index d3803c3bd0..cb2ff782d6 100644 --- a/src/core/lib/iomgr/tcp_server_posix.c +++ b/src/core/lib/iomgr/tcp_server_posix.c @@ -491,7 +491,8 @@ static grpc_error *clone_port(grpc_tcp_listener *listener, unsigned count) { } for (unsigned i = 0; i < count; i++) { - int fd, port; + int fd = -1; + int port = -1; grpc_dualstack_mode dsmode; err = grpc_create_dualstack_socket(&listener->addr.sockaddr, SOCK_STREAM, 0, &dsmode, &fd); diff --git a/src/core/lib/iomgr/tcp_windows.c b/src/core/lib/iomgr/tcp_windows.c index 37ab59021e..35054c42b5 100644 --- a/src/core/lib/iomgr/tcp_windows.c +++ b/src/core/lib/iomgr/tcp_windows.c @@ -389,9 +389,16 @@ static char *win_get_peer(grpc_endpoint *ep) { return gpr_strdup(tcp->peer_string); } -static grpc_endpoint_vtable vtable = { - win_read, win_write, win_add_to_pollset, win_add_to_pollset_set, - win_shutdown, win_destroy, win_get_peer}; +static grpc_workqueue *win_get_workqueue(grpc_endpoint *ep) { return NULL; } + +static grpc_endpoint_vtable vtable = {win_read, + win_write, + win_get_workqueue, + win_add_to_pollset, + win_add_to_pollset_set, + win_shutdown, + win_destroy, + win_get_peer}; grpc_endpoint *grpc_tcp_create(grpc_winsocket *socket, char *peer_string) { grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp)); diff --git a/src/core/lib/iomgr/udp_server.c b/src/core/lib/iomgr/udp_server.c index 1ebccf2ee2..48032412a2 100644 --- a/src/core/lib/iomgr/udp_server.c +++ b/src/core/lib/iomgr/udp_server.c @@ -60,6 +60,7 @@ #include <grpc/support/string_util.h> #include <grpc/support/sync.h> #include <grpc/support/time.h> +#include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/resolve_address.h" #include "src/core/lib/iomgr/sockaddr_utils.h" @@ -128,7 +129,7 @@ grpc_udp_server *grpc_udp_server_create(void) { } static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { - grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1, NULL); + grpc_exec_ctx_sched(exec_ctx, s->shutdown_complete, GRPC_ERROR_NONE, NULL); gpr_mu_destroy(&s->mu); gpr_cv_destroy(&s->cv); @@ -138,7 +139,7 @@ static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) { } static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server, - bool success) { + grpc_error *error) { grpc_udp_server *s = server; gpr_mu_lock(&s->mu); s->destroyed_ports++; @@ -217,14 +218,23 @@ static int prepare_socket(int fd, const struct sockaddr *addr, goto error; } - if (!grpc_set_socket_nonblocking(fd, 1) || !grpc_set_socket_cloexec(fd, 1)) { - gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd, - strerror(errno)); + if (grpc_set_socket_nonblocking(fd, 1) != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "Unable to set nonblocking %d: %s", fd, strerror(errno)); + goto error; + } + if (grpc_set_socket_cloexec(fd, 1) != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "Unable to set cloexec %d: %s", fd, strerror(errno)); + goto error; } - if (grpc_set_socket_ip_pktinfo_if_possible(fd) && - addr->sa_family == AF_INET6) { - grpc_set_socket_ipv6_recvpktinfo_if_possible(fd); + if (grpc_set_socket_ip_pktinfo_if_possible(fd) != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "Unable to set ip_pktinfo."); + goto error; + } else if (addr->sa_family == AF_INET6) { + if (grpc_set_socket_ipv6_recvpktinfo_if_possible(fd) != GRPC_ERROR_NONE) { + gpr_log(GPR_ERROR, "Unable to set ipv6_recvpktinfo."); + goto error; + } } GPR_ASSERT(addr_len < ~(socklen_t)0); @@ -241,13 +251,13 @@ static int prepare_socket(int fd, const struct sockaddr *addr, goto error; } - if (!grpc_set_socket_sndbuf(fd, buffer_size_bytes)) { + if (grpc_set_socket_sndbuf(fd, buffer_size_bytes) != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Failed to set send buffer size to %d bytes", buffer_size_bytes); goto error; } - if (!grpc_set_socket_rcvbuf(fd, buffer_size_bytes)) { + if (grpc_set_socket_rcvbuf(fd, buffer_size_bytes) != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Failed to set receive buffer size to %d bytes", buffer_size_bytes); goto error; @@ -263,10 +273,10 @@ error: } /* event manager callback when reads are ready */ -static void on_read(grpc_exec_ctx *exec_ctx, void *arg, bool success) { +static void on_read(grpc_exec_ctx *exec_ctx, void *arg, grpc_error *error) { server_port *sp = arg; - if (!success) { + if (error != GRPC_ERROR_NONE) { gpr_mu_lock(&sp->server->mu); if (0 == --sp->server->active_ports) { gpr_mu_unlock(&sp->server->mu); @@ -369,7 +379,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, /* Try listening on IPv6 first. */ addr = (struct sockaddr *)&wild6; addr_len = sizeof(wild6); - fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode); + // TODO(rjshade): Test and propagate the returned grpc_error*: + grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd); allocated_port1 = add_socket_to_server(s, fd, addr, addr_len, read_cb, orphan_cb); if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) { @@ -384,7 +395,8 @@ int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr, addr_len = sizeof(wild4); } - fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode); + // TODO(rjshade): Test and propagate the returned grpc_error*: + grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode, &fd); if (fd < 0) { gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno)); } diff --git a/src/core/lib/iomgr/workqueue.h b/src/core/lib/iomgr/workqueue.h index 5cc40eea50..7156e490d7 100644 --- a/src/core/lib/iomgr/workqueue.h +++ b/src/core/lib/iomgr/workqueue.h @@ -38,6 +38,7 @@ #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/core/lib/iomgr/pollset.h" +#include "src/core/lib/iomgr/pollset_set.h" #ifdef GPR_POSIX_SOCKET #include "src/core/lib/iomgr/workqueue_posix.h" @@ -49,35 +50,45 @@ /* grpc_workqueue is forward declared in exec_ctx.h */ -/** Create a work queue */ -grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, - grpc_workqueue **workqueue); - +/* Deprecated: do not use. + This has *already* been removed in a future commit. */ void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue); -#define GRPC_WORKQUEUE_REFCOUNT_DEBUG +/* Reference counting functions. Use the macro's always + (GRPC_WORKQUEUE_{REF,UNREF}). + + Pass in a descriptive reason string for reffing/unreffing as the last + argument to each macro. When GRPC_WORKQUEUE_REFCOUNT_DEBUG is defined, that + string will be printed alongside the refcount. When it is not defined, the + string will be discarded at compilation time. */ + +//#define GRPC_WORKQUEUE_REFCOUNT_DEBUG #ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG #define GRPC_WORKQUEUE_REF(p, r) \ - grpc_workqueue_ref((p), __FILE__, __LINE__, (r)) -#define GRPC_WORKQUEUE_UNREF(cl, p, r) \ - grpc_workqueue_unref((cl), (p), __FILE__, __LINE__, (r)) + (grpc_workqueue_ref((p), __FILE__, __LINE__, (r)), (p)) +#define GRPC_WORKQUEUE_UNREF(exec_ctx, p, r) \ + grpc_workqueue_unref((exec_ctx), (p), __FILE__, __LINE__, (r)) void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line, const char *reason); void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, const char *file, int line, const char *reason); #else -#define GRPC_WORKQUEUE_REF(p, r) grpc_workqueue_ref((p)) +#define GRPC_WORKQUEUE_REF(p, r) (grpc_workqueue_ref((p)), (p)) #define GRPC_WORKQUEUE_UNREF(cl, p, r) grpc_workqueue_unref((cl), (p)) void grpc_workqueue_ref(grpc_workqueue *workqueue); void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue); #endif -/** Bind this workqueue to a pollset */ -void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue, - grpc_pollset *pollset); +/** Add a work item to a workqueue. Items added to a work queue will be started + in approximately the order they were enqueued, on some thread that may or + may not be the current thread. Successive closures enqueued onto a workqueue + MAY be executed concurrently. + + It is generally more expensive to add a closure to a workqueue than to the + execution context, both in terms of CPU work and in execution latency. -/** Add a work item to a workqueue */ + Use work queues when it's important that other threads be given a chance to + tackle some workload. */ void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, grpc_closure *closure, grpc_error *error); diff --git a/src/core/lib/iomgr/workqueue_posix.c b/src/core/lib/iomgr/workqueue_posix.c index 45e0f6063b..e0d6dac230 100644 --- a/src/core/lib/iomgr/workqueue_posix.c +++ b/src/core/lib/iomgr/workqueue_posix.c @@ -70,7 +70,7 @@ grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, static void workqueue_destroy(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { - GPR_ASSERT(grpc_closure_list_empty(workqueue->closure_list)); + grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); grpc_fd_shutdown(exec_ctx, workqueue->wakeup_read_fd); } @@ -100,12 +100,6 @@ void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { } } -void grpc_workqueue_add_to_pollset(grpc_exec_ctx *exec_ctx, - grpc_workqueue *workqueue, - grpc_pollset *pollset) { - grpc_pollset_add_fd(exec_ctx, pollset, workqueue->wakeup_read_fd); -} - void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) { gpr_mu_lock(&workqueue->mu); grpc_exec_ctx_enqueue_list(exec_ctx, &workqueue->closure_list, NULL); diff --git a/src/core/lib/iomgr/workqueue_posix.h b/src/core/lib/iomgr/workqueue_posix.h index dcb47e7b59..0f26ba58e2 100644 --- a/src/core/lib/iomgr/workqueue_posix.h +++ b/src/core/lib/iomgr/workqueue_posix.h @@ -50,4 +50,9 @@ struct grpc_workqueue { grpc_closure read_closure; }; +/** Create a work queue. Returns an error if creation fails. If creation + succeeds, sets *workqueue to point to it. */ +grpc_error *grpc_workqueue_create(grpc_exec_ctx *exec_ctx, + grpc_workqueue **workqueue); + #endif /* GRPC_CORE_LIB_IOMGR_WORKQUEUE_POSIX_H */ diff --git a/src/core/lib/iomgr/workqueue_windows.c b/src/core/lib/iomgr/workqueue_windows.c index 275f040b1c..23e2dea185 100644 --- a/src/core/lib/iomgr/workqueue_windows.c +++ b/src/core/lib/iomgr/workqueue_windows.c @@ -37,4 +37,26 @@ #include "src/core/lib/iomgr/workqueue.h" +// Minimal implementation of grpc_workqueue for Windows +// Works by directly enqueuing workqueue items onto the current execution +// context, which is at least correct, if not performant or in the spirit of +// workqueues. + +void grpc_workqueue_flush(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {} + +#ifdef GRPC_WORKQUEUE_REFCOUNT_DEBUG +void grpc_workqueue_ref(grpc_workqueue *workqueue, const char *file, int line, + const char *reason) {} +void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + const char *file, int line, const char *reason) {} +#else +void grpc_workqueue_ref(grpc_workqueue *workqueue) {} +void grpc_workqueue_unref(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue) {} +#endif + +void grpc_workqueue_enqueue(grpc_exec_ctx *exec_ctx, grpc_workqueue *workqueue, + grpc_closure *closure, grpc_error *error) { + grpc_exec_ctx_sched(exec_ctx, closure, error, NULL); +} + #endif /* GPR_WINDOWS */ diff --git a/src/core/lib/security/transport/secure_endpoint.c b/src/core/lib/security/transport/secure_endpoint.c index 7650d68e89..bc50f9d1b0 100644 --- a/src/core/lib/security/transport/secure_endpoint.c +++ b/src/core/lib/security/transport/secure_endpoint.c @@ -360,11 +360,19 @@ static char *endpoint_get_peer(grpc_endpoint *secure_ep) { return grpc_endpoint_get_peer(ep->wrapped_ep); } -static const grpc_endpoint_vtable vtable = { - endpoint_read, endpoint_write, - endpoint_add_to_pollset, endpoint_add_to_pollset_set, - endpoint_shutdown, endpoint_destroy, - endpoint_get_peer}; +static grpc_workqueue *endpoint_get_workqueue(grpc_endpoint *secure_ep) { + secure_endpoint *ep = (secure_endpoint *)secure_ep; + return grpc_endpoint_get_workqueue(ep->wrapped_ep); +} + +static const grpc_endpoint_vtable vtable = {endpoint_read, + endpoint_write, + endpoint_get_workqueue, + endpoint_add_to_pollset, + endpoint_add_to_pollset_set, + endpoint_shutdown, + endpoint_destroy, + endpoint_get_peer}; grpc_endpoint *grpc_secure_endpoint_create( struct tsi_frame_protector *protector, grpc_endpoint *transport, diff --git a/src/core/lib/surface/server.c b/src/core/lib/surface/server.c index def6e5068b..2f108af48a 100644 --- a/src/core/lib/surface/server.c +++ b/src/core/lib/surface/server.c @@ -73,6 +73,7 @@ typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type; typedef struct requested_call { requested_call_type type; + size_t cq_idx; void *tag; grpc_server *server; grpc_completion_queue *cq_bound_to_call; @@ -206,11 +207,11 @@ struct grpc_server { registered_method *registered_methods; /** one request matcher for unregistered methods */ request_matcher unregistered_request_matcher; - /** free list of available requested_calls indices */ - gpr_stack_lockfree *request_freelist; + /** free list of available requested_calls_per_cq indices */ + gpr_stack_lockfree **request_freelist_per_cq; /** requested call backing data */ - requested_call *requested_calls; - size_t max_requested_calls; + requested_call **requested_calls_per_cq; + int max_requested_calls_per_cq; gpr_atm shutdown_flag; uint8_t shutdown_published; @@ -357,7 +358,8 @@ static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx, for (size_t i = 0; i < server->cq_count; i++) { while ((request_id = gpr_stack_lockfree_pop(rm->requests_per_cq[i])) != -1) { - fail_call(exec_ctx, server, i, &server->requested_calls[request_id], + fail_call(exec_ctx, server, i, + &server->requested_calls_per_cq[i][request_id], GRPC_ERROR_REF(error)); } } @@ -392,12 +394,16 @@ static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) { } for (i = 0; i < server->cq_count; i++) { GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server"); + if (server->started) { + gpr_stack_lockfree_destroy(server->request_freelist_per_cq[i]); + gpr_free(server->requested_calls_per_cq[i]); + } } - gpr_stack_lockfree_destroy(server->request_freelist); + gpr_free(server->request_freelist_per_cq); + gpr_free(server->requested_calls_per_cq); gpr_free(server->cqs); gpr_free(server->pollsets); gpr_free(server->shutdown_tags); - gpr_free(server->requested_calls); gpr_free(server); } @@ -460,11 +466,13 @@ static void done_request_event(grpc_exec_ctx *exec_ctx, void *req, requested_call *rc = req; grpc_server *server = rc->server; - if (rc >= server->requested_calls && - rc < server->requested_calls + server->max_requested_calls) { - GPR_ASSERT(rc - server->requested_calls <= INT_MAX); - gpr_stack_lockfree_push(server->request_freelist, - (int)(rc - server->requested_calls)); + if (rc >= server->requested_calls_per_cq[rc->cq_idx] && + rc < server->requested_calls_per_cq[rc->cq_idx] + + server->max_requested_calls_per_cq) { + GPR_ASSERT(rc - server->requested_calls_per_cq[rc->cq_idx] <= INT_MAX); + gpr_stack_lockfree_push( + server->request_freelist_per_cq[rc->cq_idx], + (int)(rc - server->requested_calls_per_cq[rc->cq_idx])); } else { gpr_free(req); } @@ -540,7 +548,7 @@ static void publish_new_rpc(grpc_exec_ctx *exec_ctx, void *arg, calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); publish_call(exec_ctx, server, calld, cq_idx, - &server->requested_calls[request_id]); + &server->requested_calls_per_cq[cq_idx][request_id]); return; /* early out */ } } @@ -979,8 +987,6 @@ void grpc_server_register_non_listening_completion_queue( } grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { - size_t i; - GRPC_API_TRACE("grpc_server_create(%p, %p)", 2, (args, reserved)); grpc_server *server = gpr_malloc(sizeof(grpc_server)); @@ -998,15 +1004,7 @@ grpc_server *grpc_server_create(const grpc_channel_args *args, void *reserved) { &server->root_channel_data; /* TODO(ctiller): expose a channel_arg for this */ - server->max_requested_calls = 32768; - server->request_freelist = - gpr_stack_lockfree_create(server->max_requested_calls); - for (i = 0; i < (size_t)server->max_requested_calls; i++) { - gpr_stack_lockfree_push(server->request_freelist, (int)i); - } - server->requested_calls = gpr_malloc(server->max_requested_calls * - sizeof(*server->requested_calls)); - + server->max_requested_calls_per_cq = 32768; server->channel_args = grpc_channel_args_copy(args); return server; @@ -1066,16 +1064,28 @@ void grpc_server_start(grpc_server *server) { server->started = true; size_t pollset_count = 0; server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count); + server->request_freelist_per_cq = + gpr_malloc(sizeof(*server->request_freelist_per_cq) * server->cq_count); + server->requested_calls_per_cq = + gpr_malloc(sizeof(*server->requested_calls_per_cq) * server->cq_count); for (i = 0; i < server->cq_count; i++) { if (!grpc_cq_is_non_listening_server_cq(server->cqs[i])) { server->pollsets[pollset_count++] = grpc_cq_pollset(server->cqs[i]); } + server->request_freelist_per_cq[i] = + gpr_stack_lockfree_create((size_t)server->max_requested_calls_per_cq); + for (int j = 0; j < server->max_requested_calls_per_cq; j++) { + gpr_stack_lockfree_push(server->request_freelist_per_cq[i], j); + } + server->requested_calls_per_cq[i] = + gpr_malloc((size_t)server->max_requested_calls_per_cq * + sizeof(*server->requested_calls_per_cq[i])); } request_matcher_init(&server->unregistered_request_matcher, - server->max_requested_calls, server); + (size_t)server->max_requested_calls_per_cq, server); for (registered_method *rm = server->registered_methods; rm; rm = rm->next) { - request_matcher_init(&rm->request_matcher, server->max_requested_calls, - server); + request_matcher_init(&rm->request_matcher, + (size_t)server->max_requested_calls_per_cq, server); } for (l = server->listeners; l; l = l->next) { @@ -1307,11 +1317,13 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, GRPC_ERROR_CREATE("Server Shutdown")); return GRPC_CALL_OK; } - request_id = gpr_stack_lockfree_pop(server->request_freelist); + request_id = gpr_stack_lockfree_pop(server->request_freelist_per_cq[cq_idx]); if (request_id == -1) { /* out of request ids: just fail this one */ fail_call(exec_ctx, server, cq_idx, rc, - GRPC_ERROR_CREATE("Server Shutdown")); + grpc_error_set_int(GRPC_ERROR_CREATE("Out of request ids"), + GRPC_ERROR_INT_LIMIT, + server->max_requested_calls_per_cq)); return GRPC_CALL_OK; } switch (rc->type) { @@ -1322,7 +1334,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, rm = &rc->data.registered.registered_method->request_matcher; break; } - server->requested_calls[request_id] = *rc; + server->requested_calls_per_cq[cq_idx][request_id] = *rc; gpr_free(rc); if (gpr_stack_lockfree_push(rm->requests_per_cq[cq_idx], request_id)) { /* this was the first queued request: we need to lock and start @@ -1346,7 +1358,7 @@ static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx, calld->state = ACTIVATED; gpr_mu_unlock(&calld->mu_state); publish_call(exec_ctx, server, calld, cq_idx, - &server->requested_calls[request_id]); + &server->requested_calls_per_cq[cq_idx][request_id]); } gpr_mu_lock(&server->mu_call); } @@ -1382,6 +1394,7 @@ grpc_call_error grpc_server_request_call( } grpc_cq_begin_op(cq_for_notification, tag); details->reserved = NULL; + rc->cq_idx = cq_idx; rc->type = BATCH_CALL; rc->server = server; rc->tag = tag; @@ -1430,6 +1443,7 @@ grpc_call_error grpc_server_request_registered_call( goto done; } grpc_cq_begin_op(cq_for_notification, tag); + rc->cq_idx = cq_idx; rc->type = REGISTERED_CALL; rc->server = server; rc->tag = tag; diff --git a/src/core/lib/transport/connectivity_state.c b/src/core/lib/transport/connectivity_state.c index 054f112127..68d05e3a85 100644 --- a/src/core/lib/transport/connectivity_state.c +++ b/src/core/lib/transport/connectivity_state.c @@ -179,6 +179,9 @@ void grpc_connectivity_state_set(grpc_exec_ctx *exec_ctx, while ((w = tracker->watchers) != NULL) { *w->current = tracker->current_state; tracker->watchers = w->next; + if (grpc_connectivity_state_trace) { + gpr_log(GPR_DEBUG, "NOTIFY: %p", w->notify); + } grpc_exec_ctx_sched(exec_ctx, w->notify, GRPC_ERROR_REF(tracker->current_error), NULL); gpr_free(w); diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc index fb4c68ebe4..af04fd4ca6 100644 --- a/src/cpp/server/server.cc +++ b/src/cpp/server/server.cc @@ -281,6 +281,7 @@ Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, : max_message_size_(max_message_size), started_(false), shutdown_(false), + shutdown_notified_(false), num_running_cb_(0), sync_methods_(new std::list<SyncRequest>), has_generic_service_(false), @@ -462,13 +463,16 @@ void Server::ShutdownInternal(gpr_timespec deadline) { while (num_running_cb_ != 0) { callback_cv_.wait(lock); } + + shutdown_notified_ = true; + shutdown_cv_.notify_all(); } } void Server::Wait() { grpc::unique_lock<grpc::mutex> lock(mu_); - while (num_running_cb_ != 0) { - callback_cv_.wait(lock); + while (started_ && !shutdown_notified_) { + shutdown_cv_.wait(lock); } } diff --git a/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec b/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec index 72cadb9319..97f4f586b7 100644 --- a/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec +++ b/src/objective-c/!ProtoCompiler-gRPCPlugin.podspec @@ -36,7 +36,7 @@ Pod::Spec.new do |s| # exclamation mark ensures that other "regular" pods will be able to find it as it'll be installed # before them. s.name = '!ProtoCompiler-gRPCPlugin' - v = '0.14.0' + v = '1.0.0-pre1' s.version = v s.summary = 'The gRPC ProtoC plugin generates Objective-C files from .proto services.' s.description = <<-DESC @@ -82,7 +82,7 @@ Pod::Spec.new do |s| s.authors = { 'The gRPC contributors' => 'grpc-packages@google.com' } repo = 'grpc/grpc' - release = 'release-0_14_1' + release = "objective-c-v#{v}" file = "grpc_objective_c_plugin-#{v}-macos-x86_64.zip" s.source = { :http => "https://github.com/#{repo}/releases/download/#{release}/#{file}", diff --git a/src/objective-c/!ProtoCompiler.podspec b/src/objective-c/!ProtoCompiler.podspec index 5e59b25aee..56aacc3330 100644 --- a/src/objective-c/!ProtoCompiler.podspec +++ b/src/objective-c/!ProtoCompiler.podspec @@ -97,10 +97,9 @@ Pod::Spec.new do |s| s.authors = { 'The Protocol Buffers contributors' => 'protobuf@googlegroups.com' } repo = 'google/protobuf' - release = "v#{v}" file = "protoc-#{v}-osx-x86_64.zip" s.source = { - :http => "https://github.com/#{repo}/releases/download/#{release}/#{file}", + :http => "https://github.com/#{repo}/releases/download/v#{v}/#{file}", # TODO(jcanizales): Add sha1 or sha256 # :sha1 => '??', } diff --git a/src/objective-c/BoringSSL.podspec b/src/objective-c/BoringSSL.podspec index 42b4434d0d..b759997c11 100644 --- a/src/objective-c/BoringSSL.podspec +++ b/src/objective-c/BoringSSL.podspec @@ -31,7 +31,7 @@ Pod::Spec.new do |s| s.name = 'BoringSSL' - version = '4.0' + version = '5.0' s.version = version s.summary = 'BoringSSL is a fork of OpenSSL that is designed to meet Google’s needs.' # Adapted from the homepage: @@ -67,8 +67,11 @@ Pod::Spec.new do |s| # "The name and email addresses of the library maintainers, not the Podspec maintainer." s.authors = 'Adam Langley', 'David Benjamin', 'Matt Braithwaite' - s.source = { :git => 'https://boringssl.googlesource.com/boringssl', - :tag => "version_for_cocoapods_#{version}" } + s.source = { + :git => 'https://boringssl.googlesource.com/boringssl', + :tag => "version_for_cocoapods_#{version}", + # :commit => '8d343b44bbab829d1a28fdef650ca95f7db4412e', + } name = 'openssl' @@ -219,7 +222,7 @@ Pod::Spec.new do |s| * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ - /* This file was generated by err_data_generate.go. */ + /* This file was generated by err_data_generate.go. */ #include <openssl/base.h> #include <openssl/err.h> @@ -385,42 +388,42 @@ Pod::Spec.new do |s| 0x28340c19, 0x283480ac, 0x283500ea, - 0x2c322775, - 0x2c32a783, - 0x2c332795, - 0x2c33a7a7, - 0x2c3427bb, - 0x2c34a7cd, - 0x2c3527e8, - 0x2c35a7fa, - 0x2c36280d, + 0x2c3227cb, + 0x2c32a7d9, + 0x2c3327eb, + 0x2c33a7fd, + 0x2c342811, + 0x2c34a823, + 0x2c35283e, + 0x2c35a850, + 0x2c362863, 0x2c36832d, - 0x2c37281a, - 0x2c37a82c, - 0x2c38283f, - 0x2c38a856, - 0x2c392864, - 0x2c39a874, - 0x2c3a2886, - 0x2c3aa89a, - 0x2c3b28ab, - 0x2c3ba8ca, - 0x2c3c28de, - 0x2c3ca8f4, - 0x2c3d290d, - 0x2c3da92a, - 0x2c3e293b, - 0x2c3ea949, - 0x2c3f2961, - 0x2c3fa979, - 0x2c402986, + 0x2c372870, + 0x2c37a882, + 0x2c382895, + 0x2c38a8ac, + 0x2c3928ba, + 0x2c39a8ca, + 0x2c3a28dc, + 0x2c3aa8f0, + 0x2c3b2901, + 0x2c3ba920, + 0x2c3c2934, + 0x2c3ca94a, + 0x2c3d2963, + 0x2c3da980, + 0x2c3e2991, + 0x2c3ea99f, + 0x2c3f29b7, + 0x2c3fa9cf, + 0x2c4029dc, 0x2c4090e7, - 0x2c412997, - 0x2c41a9aa, + 0x2c4129ed, + 0x2c41aa00, 0x2c4210c0, - 0x2c42a9bb, + 0x2c42aa11, 0x2c430720, - 0x2c43a8bc, + 0x2c43a912, 0x30320000, 0x30328015, 0x3033001f, @@ -594,145 +597,148 @@ Pod::Spec.new do |s| 0x404619e8, 0x40469a08, 0x40471a16, - 0x40479a2a, - 0x40481a3f, - 0x40489a58, - 0x40491a6f, - 0x40499a89, - 0x404a1aa0, - 0x404a9abe, - 0x404b1ad6, - 0x404b9aed, - 0x404c1b03, - 0x404c9b15, - 0x404d1b36, - 0x404d9b58, - 0x404e1b6c, - 0x404e9b79, - 0x404f1b90, - 0x404f9ba0, - 0x40501bca, - 0x40509bde, - 0x40511bf9, - 0x40519c09, - 0x40521c20, - 0x40529c32, - 0x40531c4a, - 0x40539c5d, - 0x40541c72, - 0x40549c95, - 0x40551ca3, - 0x40559cc0, - 0x40561ccd, - 0x40569ce6, - 0x40571cfe, - 0x40579d11, - 0x40581d26, - 0x40589d38, - 0x40591d48, - 0x40599d61, - 0x405a1d75, - 0x405a9d85, - 0x405b1d9d, - 0x405b9dae, - 0x405c1dc1, - 0x405c9dd2, - 0x405d1ddf, - 0x405d9df6, - 0x405e1e16, + 0x40479a3d, + 0x40481a52, + 0x40489a6b, + 0x40491a82, + 0x40499a9c, + 0x404a1ab3, + 0x404a9ad1, + 0x404b1ae9, + 0x404b9b00, + 0x404c1b16, + 0x404c9b28, + 0x404d1b49, + 0x404d9b6b, + 0x404e1b7f, + 0x404e9b8c, + 0x404f1ba3, + 0x404f9bb3, + 0x40501bdd, + 0x40509bf1, + 0x40511c0c, + 0x40519c1c, + 0x40521c33, + 0x40529c45, + 0x40531c5d, + 0x40539c70, + 0x40541c85, + 0x40549ca8, + 0x40551cb6, + 0x40559cd3, + 0x40561ce0, + 0x40569cf9, + 0x40571d11, + 0x40579d24, + 0x40581d39, + 0x40589d4b, + 0x40591d7a, + 0x40599d93, + 0x405a1da7, + 0x405a9db7, + 0x405b1dcf, + 0x405b9de0, + 0x405c1df3, + 0x405c9e04, + 0x405d1e11, + 0x405d9e28, + 0x405e1e48, 0x405e8a95, - 0x405f1e37, - 0x405f9e44, - 0x40601e52, - 0x40609e74, - 0x40611e9c, - 0x40619eb1, - 0x40621ec8, - 0x40629ed9, - 0x40631eea, - 0x40639eff, - 0x40641f16, - 0x40649f27, - 0x40651f42, - 0x40659f59, - 0x40661f71, - 0x40669f9b, - 0x40671fc6, - 0x40679fe7, - 0x40681ffa, - 0x4068a01b, - 0x4069204d, - 0x4069a07b, - 0x406a209c, - 0x406aa0bc, - 0x406b2244, - 0x406ba267, - 0x406c227d, - 0x406ca4a9, - 0x406d24d8, - 0x406da500, - 0x406e2519, - 0x406ea531, - 0x406f2550, - 0x406fa565, - 0x40702578, - 0x4070a595, + 0x405f1e69, + 0x405f9e76, + 0x40601e84, + 0x40609ea6, + 0x40611ece, + 0x40619ee3, + 0x40621efa, + 0x40629f0b, + 0x40631f1c, + 0x40639f31, + 0x40641f48, + 0x40649f59, + 0x40651f74, + 0x40659f8b, + 0x40661fa3, + 0x40669fcd, + 0x40671ff8, + 0x4067a019, + 0x4068202c, + 0x4068a04d, + 0x4069207f, + 0x4069a0ad, + 0x406a20ce, + 0x406aa0ee, + 0x406b2276, + 0x406ba299, + 0x406c22af, + 0x406ca4db, + 0x406d250a, + 0x406da532, + 0x406e254b, + 0x406ea563, + 0x406f2582, + 0x406fa597, + 0x407025aa, + 0x4070a5c7, 0x40710800, - 0x4071a5a7, - 0x407225ba, - 0x4072a5d3, - 0x407325eb, + 0x4071a5d9, + 0x407225ec, + 0x4072a605, + 0x4073261d, 0x4073936d, - 0x407425ff, - 0x4074a619, - 0x4075262a, - 0x4075a63e, - 0x4076264c, + 0x40742631, + 0x4074a64b, + 0x4075265c, + 0x4075a670, + 0x4076267e, 0x407691aa, - 0x40772671, - 0x4077a693, - 0x407826ae, - 0x4078a6c3, - 0x407926da, - 0x4079a6f0, - 0x407a26fc, - 0x407aa70f, - 0x407b2724, - 0x407ba736, - 0x407c274b, - 0x407ca754, - 0x407d2036, - 0x407d9bb0, - 0x41f4216f, - 0x41f92201, - 0x41fe20f4, - 0x41fea2d0, - 0x41ff23c1, - 0x42032188, - 0x420821aa, - 0x4208a1e6, - 0x420920d8, - 0x4209a220, - 0x420a212f, - 0x420aa10f, - 0x420b214f, - 0x420ba1c8, - 0x420c23dd, - 0x420ca29d, - 0x420d22b7, - 0x420da2ee, - 0x42122308, - 0x421723a4, - 0x4217a34a, - 0x421c236c, - 0x421f2327, - 0x422123f4, - 0x42262387, - 0x422b248d, - 0x422ba456, - 0x422c2475, - 0x422ca430, - 0x422d240f, + 0x407726a3, + 0x4077a6c5, + 0x407826e0, + 0x4078a719, + 0x40792730, + 0x4079a746, + 0x407a2752, + 0x407aa765, + 0x407b277a, + 0x407ba78c, + 0x407c27a1, + 0x407ca7aa, + 0x407d2068, + 0x407d9bc3, + 0x407e26f5, + 0x407e9d5b, + 0x407f1a2a, + 0x41f421a1, + 0x41f92233, + 0x41fe2126, + 0x41fea302, + 0x41ff23f3, + 0x420321ba, + 0x420821dc, + 0x4208a218, + 0x4209210a, + 0x4209a252, + 0x420a2161, + 0x420aa141, + 0x420b2181, + 0x420ba1fa, + 0x420c240f, + 0x420ca2cf, + 0x420d22e9, + 0x420da320, + 0x4212233a, + 0x421723d6, + 0x4217a37c, + 0x421c239e, + 0x421f2359, + 0x42212426, + 0x422623b9, + 0x422b24bf, + 0x422ba488, + 0x422c24a7, + 0x422ca462, + 0x422d2441, 0x4432072b, 0x4432873a, 0x44330746, @@ -775,69 +781,69 @@ Pod::Spec.new do |s| 0x4c3d136d, 0x4c3d937c, 0x4c3e1389, - 0x503229cd, - 0x5032a9dc, - 0x503329e7, - 0x5033a9f7, - 0x50342a10, - 0x5034aa2a, - 0x50352a38, - 0x5035aa4e, - 0x50362a60, - 0x5036aa76, - 0x50372a8f, - 0x5037aaa2, - 0x50382aba, - 0x5038aacb, - 0x50392ae0, - 0x5039aaf4, - 0x503a2b14, - 0x503aab2a, - 0x503b2b42, - 0x503bab54, - 0x503c2b70, - 0x503cab87, - 0x503d2ba0, - 0x503dabb6, - 0x503e2bc3, - 0x503eabd9, - 0x503f2beb, + 0x50322a23, + 0x5032aa32, + 0x50332a3d, + 0x5033aa4d, + 0x50342a66, + 0x5034aa80, + 0x50352a8e, + 0x5035aaa4, + 0x50362ab6, + 0x5036aacc, + 0x50372ae5, + 0x5037aaf8, + 0x50382b10, + 0x5038ab21, + 0x50392b36, + 0x5039ab4a, + 0x503a2b6a, + 0x503aab80, + 0x503b2b98, + 0x503babaa, + 0x503c2bc6, + 0x503cabdd, + 0x503d2bf6, + 0x503dac0c, + 0x503e2c19, + 0x503eac2f, + 0x503f2c41, 0x503f8382, - 0x50402bfe, - 0x5040ac0e, - 0x50412c28, - 0x5041ac37, - 0x50422c51, - 0x5042ac6e, - 0x50432c7e, - 0x5043ac8e, - 0x50442c9d, + 0x50402c54, + 0x5040ac64, + 0x50412c7e, + 0x5041ac8d, + 0x50422ca7, + 0x5042acc4, + 0x50432cd4, + 0x5043ace4, + 0x50442cf3, 0x5044843f, - 0x50452cb1, - 0x5045accf, - 0x50462ce2, - 0x5046acf8, - 0x50472d0a, - 0x5047ad1f, - 0x50482d45, - 0x5048ad53, - 0x50492d66, - 0x5049ad7b, - 0x504a2d91, - 0x504aada1, - 0x504b2dc1, - 0x504badd4, - 0x504c2df7, - 0x504cae25, - 0x504d2e37, - 0x504dae54, - 0x504e2e6f, - 0x504eae8b, - 0x504f2e9d, - 0x504faeb4, - 0x50502ec3, + 0x50452d07, + 0x5045ad25, + 0x50462d38, + 0x5046ad4e, + 0x50472d60, + 0x5047ad75, + 0x50482d9b, + 0x5048ada9, + 0x50492dbc, + 0x5049add1, + 0x504a2de7, + 0x504aadf7, + 0x504b2e17, + 0x504bae2a, + 0x504c2e4d, + 0x504cae7b, + 0x504d2e8d, + 0x504daeaa, + 0x504e2ec5, + 0x504eaee1, + 0x504f2ef3, + 0x504faf0a, + 0x50502f19, 0x505086ef, - 0x50512ed6, + 0x50512f2c, 0x58320ec9, 0x68320e8b, 0x68328c25, @@ -1218,6 +1224,7 @@ Pod::Spec.new do |s| "DH_PUBLIC_VALUE_LENGTH_IS_WRONG\\0" "DH_P_TOO_LONG\\0" "DIGEST_CHECK_FAILED\\0" + "DOWNGRADE_DETECTED\\0" "DTLS_MESSAGE_TOO_BIG\\0" "ECC_CERT_NOT_FOR_SIGNING\\0" "EMS_STATE_INCONSISTENT\\0" @@ -1254,6 +1261,7 @@ Pod::Spec.new do |s| "NO_CIPHERS_AVAILABLE\\0" "NO_CIPHERS_PASSED\\0" "NO_CIPHER_MATCH\\0" + "NO_COMMON_SIGNATURE_ALGORITHMS\\0" "NO_COMPRESSION_SPECIFIED\\0" "NO_METHOD_SPECIFIED\\0" "NO_P256_SUPPORT\\0" @@ -1344,6 +1352,7 @@ Pod::Spec.new do |s| "UNSUPPORTED_COMPRESSION_ALGORITHM\\0" "UNSUPPORTED_ELLIPTIC_CURVE\\0" "UNSUPPORTED_PROTOCOL\\0" + "UNSUPPORTED_PROTOCOL_FOR_CUSTOM_KEY\\0" "WRONG_CERTIFICATE_TYPE\\0" "WRONG_CIPHER_RETURNED\\0" "WRONG_CURVE\\0" diff --git a/src/objective-c/README.md b/src/objective-c/README.md index a0ca5f448a..6e917ddd81 100644 --- a/src/objective-c/README.md +++ b/src/objective-c/README.md @@ -48,7 +48,7 @@ Pod::Spec.new do |s| src = '.' # We'll use protoc with the gRPC plugin. - s.dependency '!ProtoCompiler-gRPCPlugin', '~> 0.14' + s.dependency '!ProtoCompiler-gRPCPlugin', '~> 1.0.0-pre1' # Pods directory corresponding to this app's Podfile, relative to the location of this podspec. pods_root = '<path to your Podfile>/Pods' diff --git a/src/objective-c/examples/RemoteTestClient/RemoteTest.podspec b/src/objective-c/examples/RemoteTestClient/RemoteTest.podspec index 6e783fb5ad..7222a80b88 100644 --- a/src/objective-c/examples/RemoteTestClient/RemoteTest.podspec +++ b/src/objective-c/examples/RemoteTestClient/RemoteTest.podspec @@ -11,7 +11,7 @@ Pod::Spec.new do |s| s.osx.deployment_target = '10.9' # Run protoc with the Objective-C and gRPC plugins to generate protocol messages and gRPC clients. - s.dependency "!ProtoCompiler-gRPCPlugin", "~> 0.14" + s.dependency "!ProtoCompiler-gRPCPlugin", "~> 1.0.0-pre1" repo_root = '../../../..' bin_dir = "#{repo_root}/bins/$CONFIG" diff --git a/src/objective-c/examples/RemoteTestClient/empty.proto b/src/objective-c/examples/RemoteTestClient/empty.proto deleted file mode 100644 index a678048289..0000000000 --- a/src/objective-c/examples/RemoteTestClient/empty.proto +++ /dev/null @@ -1,44 +0,0 @@ -// Copyright 2015, Google Inc. -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: -// -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above -// copyright notice, this list of conditions and the following disclaimer -// in the documentation and/or other materials provided with the -// distribution. -// * Neither the name of Google Inc. nor the names of its -// contributors may be used to endorse or promote products derived from -// this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -syntax = "proto3"; - -package grpc.testing; - -option objc_class_prefix = "RMT"; - -// An empty message that you can re-use to avoid defining duplicated empty -// messages in your project. A typical example is to use it as argument or the -// return value of a service API. For instance: -// -// service Foo { -// rpc Bar (grpc.testing.Empty) returns (grpc.testing.Empty) { }; -// }; -// -message Empty {} diff --git a/src/objective-c/examples/RemoteTestClient/test.proto b/src/objective-c/examples/RemoteTestClient/test.proto index 514c3b8095..5c359c5c12 100644 --- a/src/objective-c/examples/RemoteTestClient/test.proto +++ b/src/objective-c/examples/RemoteTestClient/test.proto @@ -31,7 +31,7 @@ // of unary/streaming requests/responses. syntax = "proto3"; -import "empty.proto"; +import "google/protobuf/empty.proto"; import "messages.proto"; package grpc.testing; @@ -42,7 +42,7 @@ option objc_class_prefix = "RMT"; // performance with various types of payload. service TestService { // One empty request followed by one empty response. - rpc EmptyCall(grpc.testing.Empty) returns (grpc.testing.Empty); + rpc EmptyCall(google.protobuf.Empty) returns (google.protobuf.Empty); // One request followed by one response. rpc UnaryCall(SimpleRequest) returns (SimpleResponse); diff --git a/src/objective-c/examples/Sample/Podfile b/src/objective-c/examples/Sample/Podfile index 8740b2f963..f6f0c00d5d 100644 --- a/src/objective-c/examples/Sample/Podfile +++ b/src/objective-c/examples/Sample/Podfile @@ -3,6 +3,8 @@ platform :ios, '8.0' install! 'cocoapods', :deterministic_uuids => false +use_frameworks! if ENV['FRAMEWORKS'] == 'YES' + # Location of gRPC's repo root relative to this file. GRPC_LOCAL_SRC = '../../../..' diff --git a/src/objective-c/examples/Sample/Sample.xcodeproj/project.pbxproj b/src/objective-c/examples/Sample/Sample.xcodeproj/project.pbxproj index 5c2a6d14f9..ab7159cda2 100644 --- a/src/objective-c/examples/Sample/Sample.xcodeproj/project.pbxproj +++ b/src/objective-c/examples/Sample/Sample.xcodeproj/project.pbxproj @@ -7,7 +7,7 @@ objects = { /* Begin PBXBuildFile section */ - 426A5020E0E158A101BCA1D9 /* libPods-Sample.a in Frameworks */ = {isa = PBXBuildFile; fileRef = C20055928615A6F8434E26B4 /* libPods-Sample.a */; }; + 3F035697392F601049D3DDE1 /* Pods_Sample.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = CC1B27EA0C428429B07BC34B /* Pods_Sample.framework */; }; 6369A2701A9322E20015FC5C /* main.m in Sources */ = {isa = PBXBuildFile; fileRef = 6369A26F1A9322E20015FC5C /* main.m */; }; 6369A2731A9322E20015FC5C /* AppDelegate.m in Sources */ = {isa = PBXBuildFile; fileRef = 6369A2721A9322E20015FC5C /* AppDelegate.m */; }; 6369A2761A9322E20015FC5C /* ViewController.m in Sources */ = {isa = PBXBuildFile; fileRef = 6369A2751A9322E20015FC5C /* ViewController.m */; }; @@ -26,7 +26,7 @@ 6369A2751A9322E20015FC5C /* ViewController.m */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.c.objc; path = ViewController.m; sourceTree = "<group>"; }; 6369A2781A9322E20015FC5C /* Base */ = {isa = PBXFileReference; lastKnownFileType = file.storyboard; name = Base; path = Base.lproj/Main.storyboard; sourceTree = "<group>"; }; 6369A27A1A9322E20015FC5C /* Images.xcassets */ = {isa = PBXFileReference; lastKnownFileType = folder.assetcatalog; path = Images.xcassets; sourceTree = "<group>"; }; - C20055928615A6F8434E26B4 /* libPods-Sample.a */ = {isa = PBXFileReference; explicitFileType = archive.ar; includeInIndex = 0; path = "libPods-Sample.a"; sourceTree = BUILT_PRODUCTS_DIR; }; + CC1B27EA0C428429B07BC34B /* Pods_Sample.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = Pods_Sample.framework; sourceTree = BUILT_PRODUCTS_DIR; }; E3C01DF315C4E7433BCEC6E6 /* Pods-Sample.debug.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-Sample.debug.xcconfig"; path = "Pods/Target Support Files/Pods-Sample/Pods-Sample.debug.xcconfig"; sourceTree = "<group>"; }; /* End PBXFileReference section */ @@ -35,7 +35,7 @@ isa = PBXFrameworksBuildPhase; buildActionMask = 2147483647; files = ( - 426A5020E0E158A101BCA1D9 /* libPods-Sample.a in Frameworks */, + 3F035697392F601049D3DDE1 /* Pods_Sample.framework in Frameworks */, ); runOnlyForDeploymentPostprocessing = 0; }; @@ -95,7 +95,7 @@ C4C2C5219053E079C9EFB930 /* Frameworks */ = { isa = PBXGroup; children = ( - C20055928615A6F8434E26B4 /* libPods-Sample.a */, + CC1B27EA0C428429B07BC34B /* Pods_Sample.framework */, ); name = Frameworks; sourceTree = "<group>"; @@ -129,7 +129,7 @@ 6369A2621A9322E20015FC5C /* Project object */ = { isa = PBXProject; attributes = { - LastUpgradeCheck = 0610; + LastUpgradeCheck = 0730; ORGANIZATIONNAME = gRPC; TargetAttributes = { 6369A2691A9322E20015FC5C = { @@ -260,6 +260,7 @@ "CODE_SIGN_IDENTITY[sdk=iphoneos*]" = "iPhone Developer"; COPY_PHASE_STRIP = NO; ENABLE_STRICT_OBJC_MSGSEND = YES; + ENABLE_TESTABILITY = YES; GCC_C_LANGUAGE_STANDARD = gnu99; GCC_DYNAMIC_NO_PIC = NO; GCC_OPTIMIZATION_LEVEL = 0; @@ -325,6 +326,7 @@ ASSETCATALOG_COMPILER_APPICON_NAME = AppIcon; INFOPLIST_FILE = Sample/Info.plist; LD_RUNPATH_SEARCH_PATHS = "$(inherited) @executable_path/Frameworks"; + PRODUCT_BUNDLE_IDENTIFIER = "org.grpc.$(PRODUCT_NAME:rfc1034identifier)"; PRODUCT_NAME = "$(TARGET_NAME)"; }; name = Debug; @@ -336,6 +338,7 @@ ASSETCATALOG_COMPILER_APPICON_NAME = AppIcon; INFOPLIST_FILE = Sample/Info.plist; LD_RUNPATH_SEARCH_PATHS = "$(inherited) @executable_path/Frameworks"; + PRODUCT_BUNDLE_IDENTIFIER = "org.grpc.$(PRODUCT_NAME:rfc1034identifier)"; PRODUCT_NAME = "$(TARGET_NAME)"; }; name = Release; diff --git a/src/objective-c/examples/Sample/Sample/Info.plist b/src/objective-c/examples/Sample/Sample/Info.plist index 4436635ab4..2cdf09dc2f 100644 --- a/src/objective-c/examples/Sample/Sample/Info.plist +++ b/src/objective-c/examples/Sample/Sample/Info.plist @@ -7,7 +7,7 @@ <key>CFBundleExecutable</key> <string>$(EXECUTABLE_NAME)</string> <key>CFBundleIdentifier</key> - <string>org.grpc.$(PRODUCT_NAME:rfc1034identifier)</string> + <string>$(PRODUCT_BUNDLE_IDENTIFIER)</string> <key>CFBundleInfoDictionaryVersion</key> <string>6.0</string> <key>CFBundleName</key> diff --git a/src/objective-c/examples/Sample/Sample/ViewController.m b/src/objective-c/examples/Sample/Sample/ViewController.m index 433a8a2ba3..70244ee62d 100644 --- a/src/objective-c/examples/Sample/Sample/ViewController.m +++ b/src/objective-c/examples/Sample/Sample/ViewController.m @@ -66,9 +66,9 @@ // Same example call using the generic gRPC client library: - ProtoMethod *method = [[ProtoMethod alloc] initWithPackage:@"grpc.testing" - service:@"TestService" - method:@"UnaryCall"]; + GRPCProtoMethod *method = [[GRPCProtoMethod alloc] initWithPackage:@"grpc.testing" + service:@"TestService" + method:@"UnaryCall"]; GRXWriter *requestsWriter = [GRXWriter writerWithValue:[request data]]; diff --git a/src/objective-c/examples/SwiftSample/Bridging-Header.h b/src/objective-c/examples/SwiftSample/Bridging-Header.h deleted file mode 100644 index 65f768a760..0000000000 --- a/src/objective-c/examples/SwiftSample/Bridging-Header.h +++ /dev/null @@ -1,45 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -#ifndef SwiftSample_Bridging_Header_h -#define SwiftSample_Bridging_Header_h - -#import <RxLibrary/GRXWriteable.h> -#import <RxLibrary/GRXWriter.h> -#import <RxLibrary/GRXWriter+Immediate.h> -#import <GRPCClient/GRPCCall.h> -#import <ProtoRPC/ProtoMethod.h> -#import <ProtoRPC/ProtoRPC.h> -#import <RemoteTest/Test.pbrpc.h> - -#endif diff --git a/src/objective-c/examples/SwiftSample/Info.plist b/src/objective-c/examples/SwiftSample/Info.plist index 10f0450b34..2cdf09dc2f 100644 --- a/src/objective-c/examples/SwiftSample/Info.plist +++ b/src/objective-c/examples/SwiftSample/Info.plist @@ -7,7 +7,7 @@ <key>CFBundleExecutable</key> <string>$(EXECUTABLE_NAME)</string> <key>CFBundleIdentifier</key> - <string>io.grpc.$(PRODUCT_NAME:rfc1034identifier)</string> + <string>$(PRODUCT_BUNDLE_IDENTIFIER)</string> <key>CFBundleInfoDictionaryVersion</key> <string>6.0</string> <key>CFBundleName</key> diff --git a/src/objective-c/examples/SwiftSample/Podfile b/src/objective-c/examples/SwiftSample/Podfile index 2f783340f5..b08a346ae2 100644 --- a/src/objective-c/examples/SwiftSample/Podfile +++ b/src/objective-c/examples/SwiftSample/Podfile @@ -3,6 +3,8 @@ platform :ios, '8.0' install! 'cocoapods', :deterministic_uuids => false +use_frameworks! + # Location of gRPC's repo root relative to this file. GRPC_LOCAL_SRC = '../../../..' diff --git a/src/objective-c/examples/SwiftSample/SwiftSample.xcodeproj/project.pbxproj b/src/objective-c/examples/SwiftSample/SwiftSample.xcodeproj/project.pbxproj index 2a1b30f2cf..afc3da7116 100644 --- a/src/objective-c/examples/SwiftSample/SwiftSample.xcodeproj/project.pbxproj +++ b/src/objective-c/examples/SwiftSample/SwiftSample.xcodeproj/project.pbxproj @@ -7,11 +7,11 @@ objects = { /* Begin PBXBuildFile section */ + 2C0B024CB798839E17F76126 /* Pods_SwiftSample.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = B394F343BDE186C5436DBDB9 /* Pods_SwiftSample.framework */; }; 633BFFC81B950B210007E424 /* AppDelegate.swift in Sources */ = {isa = PBXBuildFile; fileRef = 633BFFC71B950B210007E424 /* AppDelegate.swift */; }; 633BFFCA1B950B210007E424 /* ViewController.swift in Sources */ = {isa = PBXBuildFile; fileRef = 633BFFC91B950B210007E424 /* ViewController.swift */; }; 633BFFCD1B950B210007E424 /* Main.storyboard in Resources */ = {isa = PBXBuildFile; fileRef = 633BFFCB1B950B210007E424 /* Main.storyboard */; }; 633BFFCF1B950B210007E424 /* Images.xcassets in Resources */ = {isa = PBXBuildFile; fileRef = 633BFFCE1B950B210007E424 /* Images.xcassets */; }; - 92EDB1408A1E1E7DDAB25D9C /* libPods-SwiftSample.a in Frameworks */ = {isa = PBXBuildFile; fileRef = 69BB5C6CA3C1F97E007AC527 /* libPods-SwiftSample.a */; }; /* End PBXBuildFile section */ /* Begin PBXFileReference section */ @@ -21,9 +21,8 @@ 633BFFC91B950B210007E424 /* ViewController.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ViewController.swift; sourceTree = "<group>"; }; 633BFFCC1B950B210007E424 /* Base */ = {isa = PBXFileReference; lastKnownFileType = file.storyboard; name = Base; path = Base.lproj/Main.storyboard; sourceTree = "<group>"; }; 633BFFCE1B950B210007E424 /* Images.xcassets */ = {isa = PBXFileReference; lastKnownFileType = folder.assetcatalog; path = Images.xcassets; sourceTree = "<group>"; }; - 6367AD231B951655007FD3A4 /* Bridging-Header.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = "Bridging-Header.h"; sourceTree = "<group>"; }; - 69BB5C6CA3C1F97E007AC527 /* libPods-SwiftSample.a */ = {isa = PBXFileReference; explicitFileType = archive.ar; includeInIndex = 0; path = "libPods-SwiftSample.a"; sourceTree = BUILT_PRODUCTS_DIR; }; A7E614A494D89D01BB395761 /* Pods-SwiftSample.debug.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-SwiftSample.debug.xcconfig"; path = "Pods/Target Support Files/Pods-SwiftSample/Pods-SwiftSample.debug.xcconfig"; sourceTree = "<group>"; }; + B394F343BDE186C5436DBDB9 /* Pods_SwiftSample.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = Pods_SwiftSample.framework; sourceTree = BUILT_PRODUCTS_DIR; }; C314E3E246AF23AC29B38FCF /* Pods-SwiftSample.release.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-SwiftSample.release.xcconfig"; path = "Pods/Target Support Files/Pods-SwiftSample/Pods-SwiftSample.release.xcconfig"; sourceTree = "<group>"; }; /* End PBXFileReference section */ @@ -32,7 +31,7 @@ isa = PBXFrameworksBuildPhase; buildActionMask = 2147483647; files = ( - 92EDB1408A1E1E7DDAB25D9C /* libPods-SwiftSample.a in Frameworks */, + 2C0B024CB798839E17F76126 /* Pods_SwiftSample.framework in Frameworks */, ); runOnlyForDeploymentPostprocessing = 0; }; @@ -74,7 +73,6 @@ 633BFFCB1B950B210007E424 /* Main.storyboard */, 633BFFCE1B950B210007E424 /* Images.xcassets */, 633BFFC51B950B210007E424 /* Supporting Files */, - 6367AD231B951655007FD3A4 /* Bridging-Header.h */, ); name = SwiftSample; sourceTree = SOURCE_ROOT; @@ -90,7 +88,7 @@ 9D63A7F6423989BA306810CA /* Frameworks */ = { isa = PBXGroup; children = ( - 69BB5C6CA3C1F97E007AC527 /* libPods-SwiftSample.a */, + B394F343BDE186C5436DBDB9 /* Pods_SwiftSample.framework */, ); name = Frameworks; sourceTree = "<group>"; @@ -125,7 +123,7 @@ isa = PBXProject; attributes = { LastSwiftUpdateCheck = 0710; - LastUpgradeCheck = 0640; + LastUpgradeCheck = 0730; ORGANIZATIONNAME = gRPC; TargetAttributes = { 633BFFC11B950B210007E424 = { @@ -256,6 +254,7 @@ COPY_PHASE_STRIP = NO; DEBUG_INFORMATION_FORMAT = "dwarf-with-dsym"; ENABLE_STRICT_OBJC_MSGSEND = YES; + ENABLE_TESTABILITY = YES; GCC_C_LANGUAGE_STANDARD = gnu99; GCC_DYNAMIC_NO_PIC = NO; GCC_NO_COMMON_BLOCKS = YES; @@ -325,8 +324,9 @@ ASSETCATALOG_COMPILER_APPICON_NAME = AppIcon; INFOPLIST_FILE = Info.plist; LD_RUNPATH_SEARCH_PATHS = "$(inherited) @executable_path/Frameworks"; + PRODUCT_BUNDLE_IDENTIFIER = "io.grpc.$(PRODUCT_NAME:rfc1034identifier)"; PRODUCT_NAME = "$(TARGET_NAME)"; - SWIFT_OBJC_BRIDGING_HEADER = "Bridging-Header.h"; + SWIFT_OBJC_BRIDGING_HEADER = ""; USER_HEADER_SEARCH_PATHS = ""; }; name = Debug; @@ -338,8 +338,9 @@ ASSETCATALOG_COMPILER_APPICON_NAME = AppIcon; INFOPLIST_FILE = Info.plist; LD_RUNPATH_SEARCH_PATHS = "$(inherited) @executable_path/Frameworks"; + PRODUCT_BUNDLE_IDENTIFIER = "io.grpc.$(PRODUCT_NAME:rfc1034identifier)"; PRODUCT_NAME = "$(TARGET_NAME)"; - SWIFT_OBJC_BRIDGING_HEADER = "Bridging-Header.h"; + SWIFT_OBJC_BRIDGING_HEADER = ""; USER_HEADER_SEARCH_PATHS = ""; }; name = Release; diff --git a/src/objective-c/examples/SwiftSample/ViewController.swift b/src/objective-c/examples/SwiftSample/ViewController.swift index 2a95d2de51..e7bab13762 100644 --- a/src/objective-c/examples/SwiftSample/ViewController.swift +++ b/src/objective-c/examples/SwiftSample/ViewController.swift @@ -33,6 +33,8 @@ import UIKit +import RemoteTest + class ViewController: UIViewController { override func viewDidLoad() { @@ -60,7 +62,7 @@ class ViewController: UIViewController { // Same but manipulating headers: - var RPC : ProtoRPC! // Needed to convince Swift to capture by reference (__block) + var RPC : GRPCProtoCall! // Needed to convince Swift to capture by reference (__block) RPC = service.RPCToUnaryCallWithRequest(request) { response, error in if let response = response { NSLog("2. Finished successfully with response:\n\(response)") @@ -79,7 +81,7 @@ class ViewController: UIViewController { // Same example call using the generic gRPC client library: - let method = ProtoMethod(package: "grpc.testing", service: "TestService", method: "UnaryCall") + let method = GRPCProtoMethod(package: "grpc.testing", service: "TestService", method: "UnaryCall") let requestsWriter = GRXWriter(value: request.data()) diff --git a/src/objective-c/tests/RemoteTestClient/RemoteTest.podspec b/src/objective-c/tests/RemoteTestClient/RemoteTest.podspec index 7d84a5ae4d..53ba101913 100644 --- a/src/objective-c/tests/RemoteTestClient/RemoteTest.podspec +++ b/src/objective-c/tests/RemoteTestClient/RemoteTest.podspec @@ -11,7 +11,7 @@ Pod::Spec.new do |s| s.osx.deployment_target = '10.9' # Run protoc with the Objective-C and gRPC plugins to generate protocol messages and gRPC clients. - s.dependency "!ProtoCompiler-gRPCPlugin", "~> 0.14" + s.dependency "!ProtoCompiler-gRPCPlugin", "~> 1.0.0-pre1" repo_root = '../../../..' bin_dir = "#{repo_root}/bins/$CONFIG" diff --git a/src/objective-c/tests/build_example_test.sh b/src/objective-c/tests/build_example_test.sh index 5c3766b4c0..ae75941ec6 100755 --- a/src/objective-c/tests/build_example_test.sh +++ b/src/objective-c/tests/build_example_test.sh @@ -31,44 +31,33 @@ # Don't run this script standalone. Instead, run from the repository root: # ./tools/run_tests/run_tests.py -l objc -set -eo pipefail +set -evo pipefail cd `dirname $0` -BINDIR=`pwd`/../../../bins/$CONFIG -TMP_PATH=$PATH - -# If `protoc` is not found, add bins/$CONFIG/protobuf/protoc to the search path -hash protoc 2>/dev/null || TMP_PATH=$BINDIR/protobuf:$TMP_PATH - -# If `protoc-gen-objcgrpc` is not found, make a symlink from -# bins/$CONGIF/grpc_objective_c_plugin and add it to the search path -PATH=$TMP_PATH hash protoc-gen-objcgrpc 2>/dev/null || { - ln -sf $BINDIR/grpc_objective_c_plugin $BINDIR/protoc-gen-objcgrpc - TMP_PATH=$BINDIR:$TMP_PATH -} - SCHEME=HelloWorld \ EXAMPLE_PATH=examples/objective-c/helloworld \ - PATH=$TMP_PATH \ ./build_one_example.sh SCHEME=RouteGuideClient \ EXAMPLE_PATH=examples/objective-c/route_guide \ - PATH=$TMP_PATH \ ./build_one_example.sh SCHEME=AuthSample \ EXAMPLE_PATH=examples/objective-c/auth_sample \ - PATH=$TMP_PATH \ + ./build_one_example.sh + +rm -f ../examples/RemoteTestClient/*.{h,m} + +SCHEME=Sample \ + EXAMPLE_PATH=src/objective-c/examples/Sample \ ./build_one_example.sh SCHEME=Sample \ EXAMPLE_PATH=src/objective-c/examples/Sample \ - PATH=$TMP_PATH \ + FRAMEWORKS=YES \ ./build_one_example.sh SCHEME=SwiftSample \ EXAMPLE_PATH=src/objective-c/examples/SwiftSample \ - PATH=$TMP_PATH \ ./build_one_example.sh diff --git a/src/objective-c/tests/build_one_example.sh b/src/objective-c/tests/build_one_example.sh index 24fb8b7bab..9fef6582a3 100755 --- a/src/objective-c/tests/build_one_example.sh +++ b/src/objective-c/tests/build_one_example.sh @@ -31,7 +31,7 @@ # Don't run this script standalone. Instead, run from the repository root: # ./tools/run_tests/run_tests.py -l objc -set -e +set -ev # Params: # EXAMPLE_PATH - directory of the example @@ -54,7 +54,7 @@ pod install set -o pipefail XCODEBUILD_FILTER='(^===|^\*\*|\bfatal\b|\berror\b|\bwarning\b|\bfail)' xcodebuild \ - clean build \ + build \ -workspace *.xcworkspace \ -scheme $SCHEME \ -destination name="iPhone 6" \ diff --git a/src/objective-c/tests/build_tests.sh b/src/objective-c/tests/build_tests.sh index 8547bfd3a8..bc5bc04494 100755 --- a/src/objective-c/tests/build_tests.sh +++ b/src/objective-c/tests/build_tests.sh @@ -44,26 +44,10 @@ hash xcodebuild 2>/dev/null || { exit 1 } -BINDIR=../../../bins/$CONFIG - -if [ ! -f $BINDIR/protobuf/protoc ]; then - hash protoc 2>/dev/null || { - echo >&2 "Can't find protoc. Make sure run_tests.py is making" \ - "grpc_objective_c_plugin before calling this script." - exit 1 - } - # When protoc is already installed, make doesn't compile one. Put a link - # there so the podspecs can do codegen using that path. - mkdir -p $BINDIR/protobuf - ln -s `which protoc` $BINDIR/protobuf/protoc -fi - -[ -f $BINDIR/interop_server ] || { - echo >&2 "Can't find the test server. Make sure run_tests.py is making" \ - "interop_server before calling this script. It needs to be done" \ - "before because pod install of gRPC renames some C gRPC files" \ - "and not the server's code references to them." - exit 1 -} +# clean the directory +rm -rf Pods +rm -rf Tests.xcworkspace +rm -f Podfile.lock +rm -f RemoteTestClient/*.{h,m} pod install diff --git a/src/objective-c/tests/run_tests.sh b/src/objective-c/tests/run_tests.sh index c4fc5644f2..a265149f48 100755 --- a/src/objective-c/tests/run_tests.sh +++ b/src/objective-c/tests/run_tests.sh @@ -31,13 +31,21 @@ # Don't run this script standalone. Instead, run from the repository root: # ./tools/run_tests/run_tests.py -l objc -set -e +set -ev cd $(dirname $0) # Run the tests server. -../../../bins/$CONFIG/interop_server --port=5050 & -../../../bins/$CONFIG/interop_server --port=5051 --use_tls & + +BINDIR=../../../bins/$CONFIG + +[ -f $BINDIR/interop_server ] || { + echo >&2 "Can't find the test server. Make sure run_tests.py is making" \ + "interop_server before calling this script." + exit 1 +} +$BINDIR/interop_server --port=5050 & +$BINDIR/interop_server --port=5051 --use_tls & # Kill them when this script exits. trap 'kill -9 `jobs -p`' EXIT diff --git a/src/php/ext/grpc/call.c b/src/php/ext/grpc/call.c index 2cd45f10dc..bd1eccb01b 100644 --- a/src/php/ext/grpc/call.c +++ b/src/php/ext/grpc/call.c @@ -59,12 +59,15 @@ zend_class_entry *grpc_ce_call; +#if PHP_MAJOR_VERSION < 7 + /* Frees and destroys an instance of wrapped_grpc_call */ void free_wrapped_grpc_call(void *object TSRMLS_DC) { wrapped_grpc_call *call = (wrapped_grpc_call *)object; if (call->owned && call->wrapped != NULL) { grpc_call_destroy(call->wrapped); } + zend_object_std_dtor(&call->std TSRMLS_CC); efree(call); } @@ -203,6 +206,131 @@ bool create_metadata_array(zval *array, grpc_metadata_array *metadata) { return true; } +#else + +static zend_object_handlers call_ce_handlers; + +/* Frees and destroys an instance of wrapped_grpc_call */ +static void free_wrapped_grpc_call(zend_object *object) { + wrapped_grpc_call *call = wrapped_grpc_call_from_obj(object); + if (call->owned && call->wrapped != NULL) { + grpc_call_destroy(call->wrapped); + } + zend_object_std_dtor(&call->std); +} + +/* Initializes an instance of wrapped_grpc_call to be associated with an + * object of a class specified by class_type */ +zend_object *create_wrapped_grpc_call(zend_class_entry *class_type) { + wrapped_grpc_call *intern; + intern = ecalloc(1, sizeof(wrapped_grpc_call) + + zend_object_properties_size(class_type)); + zend_object_std_init(&intern->std, class_type); + object_properties_init(&intern->std, class_type); + intern->std.handlers = &call_ce_handlers; + return &intern->std; +} + +/* Wraps a grpc_call struct in a PHP object. Owned indicates whether the + struct should be destroyed at the end of the object's lifecycle */ +void grpc_php_wrap_call(grpc_call *wrapped, bool owned, zval *call_object) { + object_init_ex(call_object, grpc_ce_call); + wrapped_grpc_call *call = Z_WRAPPED_GRPC_CALL_P(call_object); + call->wrapped = wrapped; + call->owned = owned; +} + +/* Creates and returns a PHP array object with the data in a + * grpc_metadata_array. Returns NULL on failure */ +void grpc_parse_metadata_array(grpc_metadata_array *metadata_array, + zval *array) { + int count = metadata_array->count; + grpc_metadata *elements = metadata_array->metadata; + int i; + zval *data; + HashTable *array_hash; + zval inner_array; + char *str_key; + char *str_val; + size_t key_len; + + array_init(array); + array_hash = HASH_OF(array); + grpc_metadata *elem; + for (i = 0; i < count; i++) { + elem = &elements[i]; + key_len = strlen(elem->key); + str_key = ecalloc(key_len + 1, sizeof(char)); + memcpy(str_key, elem->key, key_len); + str_val = ecalloc(elem->value_length + 1, sizeof(char)); + memcpy(str_val, elem->value, elem->value_length); + if ((data = zend_hash_str_find(array_hash, str_key, key_len)) != NULL) { + if (Z_TYPE_P(data) != IS_ARRAY) { + zend_throw_exception(zend_exception_get_default(), + "Metadata hash somehow contains wrong types.", + 1); + efree(str_key); + efree(str_val); + return; + } + add_next_index_stringl(data, str_val, elem->value_length); + } else { + array_init(&inner_array); + add_next_index_stringl(&inner_array, str_val, elem->value_length); + add_assoc_zval(array, str_key, &inner_array); + } + } +} + +/* Populates a grpc_metadata_array with the data in a PHP array object. + Returns true on success and false on failure */ +bool create_metadata_array(zval *array, grpc_metadata_array *metadata) { + zval *inner_array; + zval *value; + HashTable *array_hash; + HashTable *inner_array_hash; + zend_string *key; + if (Z_TYPE_P(array) != IS_ARRAY) { + return false; + } + grpc_metadata_array_init(metadata); + array_hash = HASH_OF(array); + + ZEND_HASH_FOREACH_STR_KEY_VAL(array_hash, key, inner_array) { + if (key == NULL) { + return false; + } + if (Z_TYPE_P(inner_array) != IS_ARRAY) { + return false; + } + inner_array_hash = HASH_OF(inner_array); + metadata->capacity += zend_hash_num_elements(inner_array_hash); + } + ZEND_HASH_FOREACH_END(); + + metadata->metadata = gpr_malloc(metadata->capacity * sizeof(grpc_metadata)); + + ZEND_HASH_FOREACH_STR_KEY_VAL(array_hash, key, inner_array) { + if (key == NULL) { + return false; + } + inner_array_hash = HASH_OF(inner_array); + + ZEND_HASH_FOREACH_VAL(inner_array_hash, value) { + if (Z_TYPE_P(value) != IS_STRING) { + return false; + } + metadata->metadata[metadata->count].key = ZSTR_VAL(key); + metadata->metadata[metadata->count].value = Z_STRVAL_P(value); + metadata->metadata[metadata->count].value_length = Z_STRLEN_P(value); + metadata->count += 1; + } ZEND_HASH_FOREACH_END(); + } ZEND_HASH_FOREACH_END(); + return true; +} + +#endif + /** * Constructs a new instance of the Call class. * @param Channel $channel The channel to associate the call with. Must not be @@ -211,30 +339,38 @@ bool create_metadata_array(zval *array, grpc_metadata_array *metadata) { * @param Timeval $absolute_deadline The deadline for completing the call */ PHP_METHOD(Call, __construct) { - wrapped_grpc_call *call = - (wrapped_grpc_call *)zend_object_store_get_object(getThis() TSRMLS_CC); zval *channel_obj; char *method; - int method_len; zval *deadline_obj; char *host_override = NULL; +#if PHP_MAJOR_VERSION < 7 + int method_len; int host_override_len = 0; + wrapped_grpc_call *call = + (wrapped_grpc_call *)zend_object_store_get_object(getThis() TSRMLS_CC); +#else + size_t method_len; + size_t host_override_len = 0; + wrapped_grpc_call *call = Z_WRAPPED_GRPC_CALL_P(getThis()); +#endif + /* "OsO|s" == 1 Object, 1 string, 1 Object, 1 optional string */ - if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "OsO|s", - &channel_obj, grpc_ce_channel, - &method, &method_len, - &deadline_obj, grpc_ce_timeval, - &host_override, &host_override_len) - == FAILURE) { - zend_throw_exception( - spl_ce_InvalidArgumentException, - "Call expects a Channel, a String, a Timeval and an optional String", - 1 TSRMLS_CC); + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "OsO|s", &channel_obj, + grpc_ce_channel, &method, &method_len, + &deadline_obj, grpc_ce_timeval, &host_override, + &host_override_len) == FAILURE) { + zend_throw_exception(spl_ce_InvalidArgumentException, + "Call expects a Channel, a String, a Timeval and " + "an optional String", 1 TSRMLS_CC); return; } +#if PHP_MAJOR_VERSION < 7 wrapped_grpc_channel *channel = (wrapped_grpc_channel *)zend_object_store_get_object( channel_obj TSRMLS_CC); +#else + wrapped_grpc_channel *channel = Z_WRAPPED_GRPC_CHANNEL_P(channel_obj); +#endif if (channel->wrapped == NULL) { zend_throw_exception(spl_ce_InvalidArgumentException, "Call cannot be constructed from a closed Channel", @@ -242,12 +378,17 @@ PHP_METHOD(Call, __construct) { return; } add_property_zval(getThis(), "channel", channel_obj); +#if PHP_MAJOR_VERSION < 7 wrapped_grpc_timeval *deadline = (wrapped_grpc_timeval *)zend_object_store_get_object( deadline_obj TSRMLS_CC); - call->wrapped = grpc_channel_create_call( - channel->wrapped, NULL, GRPC_PROPAGATE_DEFAULTS, completion_queue, method, - host_override, deadline->wrapped, NULL); +#else + wrapped_grpc_timeval *deadline = Z_WRAPPED_GRPC_TIMEVAL_P(deadline_obj); +#endif + call->wrapped = + grpc_channel_create_call(channel->wrapped, NULL, GRPC_PROPAGATE_DEFAULTS, + completion_queue, method, host_override, + deadline->wrapped, NULL); call->owned = true; } @@ -257,22 +398,40 @@ PHP_METHOD(Call, __construct) { * @return object Object with results of all actions */ PHP_METHOD(Call, startBatch) { +#if PHP_MAJOR_VERSION < 7 wrapped_grpc_call *call = (wrapped_grpc_call *)zend_object_store_get_object(getThis() TSRMLS_CC); - grpc_op ops[8]; - size_t op_num = 0; - zval *array; zval **value; zval **inner_value; - HashTable *array_hash; HashPosition array_pointer; - HashTable *status_hash; - HashTable *message_hash; zval **message_value; zval **message_flags; char *key; uint key_len; ulong index; + zval *result; + zval *recv_status; + MAKE_STD_ZVAL(result); + object_init(result); +#else + wrapped_grpc_call *call = Z_WRAPPED_GRPC_CALL_P(getThis()); + zval *value; + zval *inner_value; + zval *message_value; + zval *message_flags; + zend_string *key; + zend_ulong index; + zval recv_status; + object_init(return_value); +#endif + + grpc_op ops[8]; + size_t op_num = 0; + zval *array; + HashTable *array_hash; + HashTable *status_hash; + HashTable *message_hash; + grpc_metadata_array metadata; grpc_metadata_array trailing_metadata; grpc_metadata_array recv_metadata; @@ -283,17 +442,16 @@ PHP_METHOD(Call, startBatch) { grpc_byte_buffer *message; int cancelled; grpc_call_error error; - zval *result; char *message_str; size_t message_len; - zval *recv_status; + + grpc_metadata_array_init(&metadata); grpc_metadata_array_init(&trailing_metadata); grpc_metadata_array_init(&recv_metadata); grpc_metadata_array_init(&recv_trailing_metadata); - MAKE_STD_ZVAL(result); - object_init(result); memset(ops, 0, sizeof(ops)); + /* "a" == 1 array */ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "a", &array) == FAILURE) { @@ -301,6 +459,9 @@ PHP_METHOD(Call, startBatch) { "start_batch expects an array", 1 TSRMLS_CC); goto cleanup; } + +#if PHP_MAJOR_VERSION < 7 + array_hash = Z_ARRVAL_P(array); for (zend_hash_internal_pointer_reset_ex(array_hash, &array_pointer); zend_hash_get_current_data_ex(array_hash, (void**)&value, @@ -313,124 +474,250 @@ PHP_METHOD(Call, startBatch) { goto cleanup; } switch(index) { - case GRPC_OP_SEND_INITIAL_METADATA: - if (!create_metadata_array(*value, &metadata)) { + case GRPC_OP_SEND_INITIAL_METADATA: + if (!create_metadata_array(*value, &metadata)) { + zend_throw_exception(spl_ce_InvalidArgumentException, + "Bad metadata value given", 1 TSRMLS_CC); + goto cleanup; + } + ops[op_num].data.send_initial_metadata.count = + metadata.count; + ops[op_num].data.send_initial_metadata.metadata = + metadata.metadata; + break; + case GRPC_OP_SEND_MESSAGE: + if (Z_TYPE_PP(value) != IS_ARRAY) { + zend_throw_exception(spl_ce_InvalidArgumentException, + "Expected an array for send message", + 1 TSRMLS_CC); + goto cleanup; + } + message_hash = Z_ARRVAL_PP(value); + if (zend_hash_find(message_hash, "flags", sizeof("flags"), + (void **)&message_flags) == SUCCESS) { + if (Z_TYPE_PP(message_flags) != IS_LONG) { zend_throw_exception(spl_ce_InvalidArgumentException, - "Bad metadata value given", 1 TSRMLS_CC); - goto cleanup; + "Expected an int for message flags", + 1 TSRMLS_CC); } - ops[op_num].data.send_initial_metadata.count = - metadata.count; - ops[op_num].data.send_initial_metadata.metadata = - metadata.metadata; - break; - case GRPC_OP_SEND_MESSAGE: - if (Z_TYPE_PP(value) != IS_ARRAY) { + ops[op_num].flags = Z_LVAL_PP(message_flags) & GRPC_WRITE_USED_MASK; + } + if (zend_hash_find(message_hash, "message", sizeof("message"), + (void **)&message_value) != SUCCESS || + Z_TYPE_PP(message_value) != IS_STRING) { + zend_throw_exception(spl_ce_InvalidArgumentException, + "Expected a string for send message", + 1 TSRMLS_CC); + goto cleanup; + } + ops[op_num].data.send_message = + string_to_byte_buffer(Z_STRVAL_PP(message_value), + Z_STRLEN_PP(message_value)); + break; + case GRPC_OP_SEND_CLOSE_FROM_CLIENT: + break; + case GRPC_OP_SEND_STATUS_FROM_SERVER: + status_hash = Z_ARRVAL_PP(value); + if (zend_hash_find(status_hash, "metadata", sizeof("metadata"), + (void **)&inner_value) == SUCCESS) { + if (!create_metadata_array(*inner_value, &trailing_metadata)) { zend_throw_exception(spl_ce_InvalidArgumentException, - "Expected an array for send message", + "Bad trailing metadata value given", 1 TSRMLS_CC); goto cleanup; } - message_hash = Z_ARRVAL_PP(value); - if (zend_hash_find(message_hash, "flags", sizeof("flags"), - (void **)&message_flags) == SUCCESS) { - if (Z_TYPE_PP(message_flags) != IS_LONG) { - zend_throw_exception(spl_ce_InvalidArgumentException, - "Expected an int for message flags", - 1 TSRMLS_CC); - } - ops[op_num].flags = Z_LVAL_PP(message_flags) & GRPC_WRITE_USED_MASK; + ops[op_num].data.send_status_from_server.trailing_metadata = + trailing_metadata.metadata; + ops[op_num].data.send_status_from_server.trailing_metadata_count = + trailing_metadata.count; + } + if (zend_hash_find(status_hash, "code", sizeof("code"), + (void**)&inner_value) == SUCCESS) { + if (Z_TYPE_PP(inner_value) != IS_LONG) { + zend_throw_exception(spl_ce_InvalidArgumentException, + "Status code must be an integer", + 1 TSRMLS_CC); + goto cleanup; } - if (zend_hash_find(message_hash, "message", sizeof("message"), - (void **)&message_value) != SUCCESS || - Z_TYPE_PP(message_value) != IS_STRING) { + ops[op_num].data.send_status_from_server.status = + Z_LVAL_PP(inner_value); + } else { + zend_throw_exception(spl_ce_InvalidArgumentException, + "Integer status code is required", + 1 TSRMLS_CC); + goto cleanup; + } + if (zend_hash_find(status_hash, "details", sizeof("details"), + (void**)&inner_value) == SUCCESS) { + if (Z_TYPE_PP(inner_value) != IS_STRING) { zend_throw_exception(spl_ce_InvalidArgumentException, - "Expected a string for send message", + "Status details must be a string", 1 TSRMLS_CC); goto cleanup; } - ops[op_num].data.send_message = - string_to_byte_buffer(Z_STRVAL_PP(message_value), - Z_STRLEN_PP(message_value)); - break; - case GRPC_OP_SEND_CLOSE_FROM_CLIENT: - break; - case GRPC_OP_SEND_STATUS_FROM_SERVER: - status_hash = Z_ARRVAL_PP(value); - if (zend_hash_find(status_hash, "metadata", sizeof("metadata"), - (void **)&inner_value) == SUCCESS) { - if (!create_metadata_array(*inner_value, &trailing_metadata)) { - zend_throw_exception(spl_ce_InvalidArgumentException, - "Bad trailing metadata value given", - 1 TSRMLS_CC); - goto cleanup; - } - ops[op_num].data.send_status_from_server.trailing_metadata = - trailing_metadata.metadata; - ops[op_num].data.send_status_from_server.trailing_metadata_count = - trailing_metadata.count; + ops[op_num].data.send_status_from_server.status_details = + Z_STRVAL_PP(inner_value); + } else { + zend_throw_exception(spl_ce_InvalidArgumentException, + "String status details is required", + 1 TSRMLS_CC); + goto cleanup; + } + break; + case GRPC_OP_RECV_INITIAL_METADATA: + ops[op_num].data.recv_initial_metadata = &recv_metadata; + break; + case GRPC_OP_RECV_MESSAGE: + ops[op_num].data.recv_message = &message; + break; + case GRPC_OP_RECV_STATUS_ON_CLIENT: + ops[op_num].data.recv_status_on_client.trailing_metadata = + &recv_trailing_metadata; + ops[op_num].data.recv_status_on_client.status = &status; + ops[op_num].data.recv_status_on_client.status_details = + &status_details; + ops[op_num].data.recv_status_on_client.status_details_capacity = + &status_details_capacity; + break; + case GRPC_OP_RECV_CLOSE_ON_SERVER: + ops[op_num].data.recv_close_on_server.cancelled = &cancelled; + break; + default: + zend_throw_exception(spl_ce_InvalidArgumentException, + "Unrecognized key in batch", 1 TSRMLS_CC); + goto cleanup; + } + ops[op_num].op = (grpc_op_type)index; + ops[op_num].flags = 0; + ops[op_num].reserved = NULL; + op_num++; + } + +#else + +array_hash = HASH_OF(array); + ZEND_HASH_FOREACH_KEY_VAL(array_hash, index, key, value) { + if (key) { + zend_throw_exception(spl_ce_InvalidArgumentException, + "batch keys must be integers", 1); + goto cleanup; + } + + switch(index) { + case GRPC_OP_SEND_INITIAL_METADATA: + if (!create_metadata_array(value, &metadata)) { + zend_throw_exception(spl_ce_InvalidArgumentException, + "Bad metadata value given", 1); + goto cleanup; + } + ops[op_num].data.send_initial_metadata.count = metadata.count; + ops[op_num].data.send_initial_metadata.metadata = metadata.metadata; + break; + case GRPC_OP_SEND_MESSAGE: + if (Z_TYPE_P(value) != IS_ARRAY) { + zend_throw_exception(spl_ce_InvalidArgumentException, + "Expected an array for send message", 1); + goto cleanup; + } + message_hash = HASH_OF(value); + if ((message_flags = + zend_hash_str_find(message_hash, "flags", + sizeof("flags") - 1)) != NULL) { + if (Z_TYPE_P(message_flags) != IS_LONG) { + zend_throw_exception(spl_ce_InvalidArgumentException, + "Expected an int for message flags", 1); + } + ops[op_num].flags = Z_LVAL_P(message_flags) & GRPC_WRITE_USED_MASK; + } + if ((message_value = zend_hash_str_find(message_hash, "message", + sizeof("message") - 1)) + == NULL || Z_TYPE_P(message_value) != IS_STRING) { + zend_throw_exception(spl_ce_InvalidArgumentException, + "Expected a string for send message", 1); + goto cleanup; + } + ops[op_num].data.send_message = + string_to_byte_buffer(Z_STRVAL_P(message_value), + Z_STRLEN_P(message_value)); + break; + case GRPC_OP_SEND_CLOSE_FROM_CLIENT: + break; + case GRPC_OP_SEND_STATUS_FROM_SERVER: + status_hash = HASH_OF(value); + if ((inner_value = zend_hash_str_find(status_hash, "metadata", + sizeof("metadata") - 1)) + != NULL) { + if (!create_metadata_array(inner_value, &trailing_metadata)) { + zend_throw_exception(spl_ce_InvalidArgumentException, + "Bad trailing metadata value given", 1); + goto cleanup; } - if (zend_hash_find(status_hash, "code", sizeof("code"), - (void**)&inner_value) == SUCCESS) { - if (Z_TYPE_PP(inner_value) != IS_LONG) { - zend_throw_exception(spl_ce_InvalidArgumentException, - "Status code must be an integer", - 1 TSRMLS_CC); - goto cleanup; - } - ops[op_num].data.send_status_from_server.status = - Z_LVAL_PP(inner_value); - } else { + ops[op_num].data.send_status_from_server.trailing_metadata = + trailing_metadata.metadata; + ops[op_num].data.send_status_from_server.trailing_metadata_count = + trailing_metadata.count; + } + if ((inner_value = zend_hash_str_find(status_hash, "code", + sizeof("code") - 1)) != NULL) { + if (Z_TYPE_P(inner_value) != IS_LONG) { zend_throw_exception(spl_ce_InvalidArgumentException, - "Integer status code is required", - 1 TSRMLS_CC); + "Status code must be an integer", 1); goto cleanup; } - if (zend_hash_find(status_hash, "details", sizeof("details"), - (void**)&inner_value) == SUCCESS) { - if (Z_TYPE_PP(inner_value) != IS_STRING) { - zend_throw_exception(spl_ce_InvalidArgumentException, - "Status details must be a string", - 1 TSRMLS_CC); - goto cleanup; - } - ops[op_num].data.send_status_from_server.status_details = - Z_STRVAL_PP(inner_value); - } else { + ops[op_num].data.send_status_from_server.status = + Z_LVAL_P(inner_value); + } else { + zend_throw_exception(spl_ce_InvalidArgumentException, + "Integer status code is required", 1); + goto cleanup; + } + if ((inner_value = zend_hash_str_find(status_hash, "details", + sizeof("details") - 1)) != NULL) { + if (Z_TYPE_P(inner_value) != IS_STRING) { zend_throw_exception(spl_ce_InvalidArgumentException, - "String status details is required", - 1 TSRMLS_CC); + "Status details must be a string", 1); goto cleanup; } - break; - case GRPC_OP_RECV_INITIAL_METADATA: - ops[op_num].data.recv_initial_metadata = &recv_metadata; - break; - case GRPC_OP_RECV_MESSAGE: - ops[op_num].data.recv_message = &message; - break; - case GRPC_OP_RECV_STATUS_ON_CLIENT: - ops[op_num].data.recv_status_on_client.trailing_metadata = - &recv_trailing_metadata; - ops[op_num].data.recv_status_on_client.status = &status; - ops[op_num].data.recv_status_on_client.status_details = - &status_details; - ops[op_num].data.recv_status_on_client.status_details_capacity = - &status_details_capacity; - break; - case GRPC_OP_RECV_CLOSE_ON_SERVER: - ops[op_num].data.recv_close_on_server.cancelled = &cancelled; - break; - default: + ops[op_num].data.send_status_from_server.status_details = + Z_STRVAL_P(inner_value); + } else { zend_throw_exception(spl_ce_InvalidArgumentException, - "Unrecognized key in batch", 1 TSRMLS_CC); + "String status details is required", 1); goto cleanup; + } + break; + case GRPC_OP_RECV_INITIAL_METADATA: + ops[op_num].data.recv_initial_metadata = &recv_metadata; + break; + case GRPC_OP_RECV_MESSAGE: + ops[op_num].data.recv_message = &message; + break; + case GRPC_OP_RECV_STATUS_ON_CLIENT: + ops[op_num].data.recv_status_on_client.trailing_metadata = + &recv_trailing_metadata; + ops[op_num].data.recv_status_on_client.status = &status; + ops[op_num].data.recv_status_on_client.status_details = + &status_details; + ops[op_num].data.recv_status_on_client.status_details_capacity = + &status_details_capacity; + break; + case GRPC_OP_RECV_CLOSE_ON_SERVER: + ops[op_num].data.recv_close_on_server.cancelled = &cancelled; + break; + default: + zend_throw_exception(spl_ce_InvalidArgumentException, + "Unrecognized key in batch", 1); + goto cleanup; } ops[op_num].op = (grpc_op_type)index; ops[op_num].flags = 0; ops[op_num].reserved = NULL; op_num++; } + ZEND_HASH_FOREACH_END(); + +#endif + error = grpc_call_start_batch(call->wrapped, ops, op_num, call->wrapped, NULL); if (error != GRPC_CALL_OK) { @@ -441,52 +728,98 @@ PHP_METHOD(Call, startBatch) { } grpc_completion_queue_pluck(completion_queue, call->wrapped, gpr_inf_future(GPR_CLOCK_REALTIME), NULL); +#if PHP_MAJOR_VERSION < 7 for (int i = 0; i < op_num; i++) { switch(ops[i].op) { - case GRPC_OP_SEND_INITIAL_METADATA: - add_property_bool(result, "send_metadata", true); - break; - case GRPC_OP_SEND_MESSAGE: - add_property_bool(result, "send_message", true); - break; - case GRPC_OP_SEND_CLOSE_FROM_CLIENT: - add_property_bool(result, "send_close", true); - break; - case GRPC_OP_SEND_STATUS_FROM_SERVER: - add_property_bool(result, "send_status", true); - break; - case GRPC_OP_RECV_INITIAL_METADATA: - array = grpc_parse_metadata_array(&recv_metadata TSRMLS_CC); - add_property_zval(result, "metadata", array); - Z_DELREF_P(array); - break; - case GRPC_OP_RECV_MESSAGE: - byte_buffer_to_string(message, &message_str, &message_len); - if (message_str == NULL) { - add_property_null(result, "message"); - } else { - add_property_stringl(result, "message", message_str, message_len, - false); - } - break; - case GRPC_OP_RECV_STATUS_ON_CLIENT: - MAKE_STD_ZVAL(recv_status); - object_init(recv_status); - array = grpc_parse_metadata_array(&recv_trailing_metadata TSRMLS_CC); - add_property_zval(recv_status, "metadata", array); - Z_DELREF_P(array); - add_property_long(recv_status, "code", status); - add_property_string(recv_status, "details", status_details, true); - add_property_zval(result, "status", recv_status); - Z_DELREF_P(recv_status); - break; - case GRPC_OP_RECV_CLOSE_ON_SERVER: - add_property_bool(result, "cancelled", cancelled); - break; - default: - break; + case GRPC_OP_SEND_INITIAL_METADATA: + add_property_bool(result, "send_metadata", true); + break; + case GRPC_OP_SEND_MESSAGE: + add_property_bool(result, "send_message", true); + break; + case GRPC_OP_SEND_CLOSE_FROM_CLIENT: + add_property_bool(result, "send_close", true); + break; + case GRPC_OP_SEND_STATUS_FROM_SERVER: + add_property_bool(result, "send_status", true); + break; + case GRPC_OP_RECV_INITIAL_METADATA: + array = grpc_parse_metadata_array(&recv_metadata TSRMLS_CC); + add_property_zval(result, "metadata", array); + Z_DELREF_P(array); + break; + case GRPC_OP_RECV_MESSAGE: + byte_buffer_to_string(message, &message_str, &message_len); + if (message_str == NULL) { + add_property_null(result, "message"); + } else { + add_property_stringl(result, "message", message_str, message_len, + false); + } + break; + case GRPC_OP_RECV_STATUS_ON_CLIENT: + MAKE_STD_ZVAL(recv_status); + object_init(recv_status); + array = grpc_parse_metadata_array(&recv_trailing_metadata TSRMLS_CC); + add_property_zval(recv_status, "metadata", array); + Z_DELREF_P(array); + add_property_long(recv_status, "code", status); + add_property_string(recv_status, "details", status_details, true); + add_property_zval(result, "status", recv_status); + Z_DELREF_P(recv_status); + break; + case GRPC_OP_RECV_CLOSE_ON_SERVER: + add_property_bool(result, "cancelled", cancelled); + break; + default: + break; + } + } +#else + for (int i = 0; i < op_num; i++) { + switch(ops[i].op) { + case GRPC_OP_SEND_INITIAL_METADATA: + add_property_bool(return_value, "send_metadata", true); + break; + case GRPC_OP_SEND_MESSAGE: + add_property_bool(return_value, "send_message", true); + break; + case GRPC_OP_SEND_CLOSE_FROM_CLIENT: + add_property_bool(return_value, "send_close", true); + break; + case GRPC_OP_SEND_STATUS_FROM_SERVER: + add_property_bool(return_value, "send_status", true); + break; + case GRPC_OP_RECV_INITIAL_METADATA: + grpc_parse_metadata_array(&recv_metadata, array); + add_property_zval(return_value, "metadata", array); + break; + case GRPC_OP_RECV_MESSAGE: + byte_buffer_to_string(message, &message_str, &message_len); + if (message_str == NULL) { + add_property_null(return_value, "message"); + } else { + add_property_stringl(return_value, "message", message_str, + message_len); + } + break; + case GRPC_OP_RECV_STATUS_ON_CLIENT: + object_init(&recv_status); + grpc_parse_metadata_array(&recv_trailing_metadata, array); + add_property_zval(&recv_status, "metadata", array); + add_property_long(&recv_status, "code", status); + add_property_string(&recv_status, "details", status_details); + add_property_zval(return_value, "status", &recv_status); + break; + case GRPC_OP_RECV_CLOSE_ON_SERVER: + add_property_bool(return_value, "cancelled", cancelled); + break; + default: + break; } } +#endif + cleanup: grpc_metadata_array_destroy(&metadata); grpc_metadata_array_destroy(&trailing_metadata); @@ -503,7 +836,11 @@ cleanup: grpc_byte_buffer_destroy(message); } } +#if PHP_MAJOR_VERSION < 7 RETURN_DESTROY_ZVAL(result); +#else + RETURN_DESTROY_ZVAL(return_value); +#endif } /** @@ -511,9 +848,14 @@ cleanup: * @return string The URI of the endpoint */ PHP_METHOD(Call, getPeer) { +#if PHP_MAJOR_VERSION < 7 wrapped_grpc_call *call = (wrapped_grpc_call *)zend_object_store_get_object(getThis() TSRMLS_CC); RETURN_STRING(grpc_call_get_peer(call->wrapped), 1); +#else + wrapped_grpc_call *call = Z_WRAPPED_GRPC_CALL_P(getThis()); + RETURN_STRING(grpc_call_get_peer(call->wrapped)); +#endif } /** @@ -521,8 +863,12 @@ PHP_METHOD(Call, getPeer) { * has not already ended with another status. */ PHP_METHOD(Call, cancel) { +#if PHP_MAJOR_VERSION < 7 wrapped_grpc_call *call = (wrapped_grpc_call *)zend_object_store_get_object(getThis() TSRMLS_CC); +#else + wrapped_grpc_call *call = Z_WRAPPED_GRPC_CALL_P(getThis()); +#endif grpc_call_cancel(call->wrapped, NULL); } @@ -543,12 +889,17 @@ PHP_METHOD(Call, setCredentials) { return; } +#if PHP_MAJOR_VERSION < 7 wrapped_grpc_call_credentials *creds = (wrapped_grpc_call_credentials *)zend_object_store_get_object( creds_obj TSRMLS_CC); - wrapped_grpc_call *call = (wrapped_grpc_call *)zend_object_store_get_object(getThis() TSRMLS_CC); +#else + wrapped_grpc_call_credentials *creds = + Z_WRAPPED_GRPC_CALL_CREDS_P(creds_obj); + wrapped_grpc_call *call = Z_WRAPPED_GRPC_CALL_P(getThis()); +#endif grpc_call_error error = GRPC_CALL_ERROR; error = grpc_call_set_credentials(call->wrapped, creds->wrapped); @@ -556,16 +907,23 @@ PHP_METHOD(Call, setCredentials) { } static zend_function_entry call_methods[] = { - PHP_ME(Call, __construct, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_CTOR) - PHP_ME(Call, startBatch, NULL, ZEND_ACC_PUBLIC) - PHP_ME(Call, getPeer, NULL, ZEND_ACC_PUBLIC) - PHP_ME(Call, cancel, NULL, ZEND_ACC_PUBLIC) - PHP_ME(Call, setCredentials, NULL, ZEND_ACC_PUBLIC) - PHP_FE_END}; + PHP_ME(Call, __construct, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_CTOR) + PHP_ME(Call, startBatch, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Call, getPeer, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Call, cancel, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Call, setCredentials, NULL, ZEND_ACC_PUBLIC) + PHP_FE_END +}; void grpc_init_call(TSRMLS_D) { zend_class_entry ce; INIT_CLASS_ENTRY(ce, "Grpc\\Call", call_methods); ce.create_object = create_wrapped_grpc_call; grpc_ce_call = zend_register_internal_class(&ce TSRMLS_CC); +#if PHP_MAJOR_VERSION >= 7 + memcpy(&call_ce_handlers, zend_get_std_object_handlers(), + sizeof(zend_object_handlers)); + call_ce_handlers.offset = XtOffsetOf(wrapped_grpc_call, std); + call_ce_handlers.free_obj = free_wrapped_grpc_call; +#endif } diff --git a/src/php/ext/grpc/call.h b/src/php/ext/grpc/call.h index 36c5f2d272..9fc52d7820 100644 --- a/src/php/ext/grpc/call.h +++ b/src/php/ext/grpc/call.h @@ -48,17 +48,15 @@ /* Class entry for the Call PHP class */ extern zend_class_entry *grpc_ce_call; +#if PHP_MAJOR_VERSION < 7 + /* Wrapper struct for grpc_call that can be associated with a PHP object */ typedef struct wrapped_grpc_call { zend_object std; - bool owned; grpc_call *wrapped; } wrapped_grpc_call; -/* Initializes the Call PHP class */ -void grpc_init_call(TSRMLS_D); - /* Creates a Call object that wraps the given grpc_call struct */ zval *grpc_php_wrap_call(grpc_call *wrapped, bool owned TSRMLS_DC); @@ -66,6 +64,36 @@ zval *grpc_php_wrap_call(grpc_call *wrapped, bool owned TSRMLS_DC); * call metadata */ zval *grpc_parse_metadata_array(grpc_metadata_array *metadata_array TSRMLS_DC); +#else + +/* Wrapper struct for grpc_call that can be associated with a PHP object */ +typedef struct wrapped_grpc_call { + bool owned; + grpc_call *wrapped; + zend_object std; +} wrapped_grpc_call; + +static inline wrapped_grpc_call +*wrapped_grpc_call_from_obj(zend_object *obj) { + return (wrapped_grpc_call*)((char*)(obj) - + XtOffsetOf(wrapped_grpc_call, std)); +} + +#define Z_WRAPPED_GRPC_CALL_P(zv) wrapped_grpc_call_from_obj(Z_OBJ_P((zv))) + +/* Creates a Call object that wraps the given grpc_call struct */ +void grpc_php_wrap_call(grpc_call *wrapped, bool owned, zval *call_object); + +/* Creates and returns a PHP associative array of metadata from a C array of + * call metadata */ +void grpc_parse_metadata_array(grpc_metadata_array *metadata_array, + zval *array); + +#endif /* PHP_MAJOR_VERSION */ + +/* Initializes the Call PHP class */ +void grpc_init_call(TSRMLS_D); + /* Populates a grpc_metadata_array with the data in a PHP array object. Returns true on success and false on failure */ bool create_metadata_array(zval *array, grpc_metadata_array *metadata); diff --git a/src/php/ext/grpc/call_credentials.c b/src/php/ext/grpc/call_credentials.c index ec0e6b9181..70aaffb848 100644 --- a/src/php/ext/grpc/call_credentials.c +++ b/src/php/ext/grpc/call_credentials.c @@ -53,6 +53,8 @@ zend_class_entry *grpc_ce_call_credentials; +#if PHP_MAJOR_VERSION < 7 + /* Frees and destroys an instance of wrapped_grpc_call_credentials */ void free_wrapped_grpc_call_credentials(void *object TSRMLS_DC) { wrapped_grpc_call_credentials *creds = @@ -60,6 +62,7 @@ void free_wrapped_grpc_call_credentials(void *object TSRMLS_DC) { if (creds->wrapped != NULL) { grpc_call_credentials_release(creds->wrapped); } + zend_object_std_dtor(&creds->std TSRMLS_CC); efree(creds); } @@ -94,6 +97,43 @@ zval *grpc_php_wrap_call_credentials(grpc_call_credentials *wrapped TSRMLS_DC) { return credentials_object; } +#else + +static zend_object_handlers call_credentials_ce_handlers; + +/* Frees and destroys an instance of wrapped_grpc_call_credentials */ +static void free_wrapped_grpc_call_credentials(zend_object *object) { + wrapped_grpc_call_credentials *creds = + wrapped_grpc_call_creds_from_obj(object); + if (creds->wrapped != NULL) { + grpc_call_credentials_release(creds->wrapped); + } + zend_object_std_dtor(&creds->std); +} + +/* Initializes an instance of wrapped_grpc_call_credentials to be + * associated with an object of a class specified by class_type */ +zend_object *create_wrapped_grpc_call_credentials(zend_class_entry + *class_type) { + wrapped_grpc_call_credentials *intern; + intern = ecalloc(1, sizeof(wrapped_grpc_call_credentials) + + zend_object_properties_size(class_type)); + zend_object_std_init(&intern->std, class_type); + object_properties_init(&intern->std, class_type); + intern->std.handlers = &call_credentials_ce_handlers; + return &intern->std; +} + +void grpc_php_wrap_call_credentials(grpc_call_credentials *wrapped, + zval *credentials_object) { + object_init_ex(credentials_object, grpc_ce_call_credentials); + wrapped_grpc_call_credentials *credentials = + Z_WRAPPED_GRPC_CALL_CREDS_P(credentials_object); + credentials->wrapped = wrapped; +} + +#endif + /** * Create composite credentials from two existing credentials. * @param CallCredentials cred1 The first credential @@ -113,6 +153,7 @@ PHP_METHOD(CallCredentials, createComposite) { 1 TSRMLS_CC); return; } +#if PHP_MAJOR_VERSION < 7 wrapped_grpc_call_credentials *cred1 = (wrapped_grpc_call_credentials *)zend_object_store_get_object( cred1_obj TSRMLS_CC); @@ -124,6 +165,17 @@ PHP_METHOD(CallCredentials, createComposite) { NULL); zval *creds_object = grpc_php_wrap_call_credentials(creds TSRMLS_CC); RETURN_DESTROY_ZVAL(creds_object); +#else + wrapped_grpc_call_credentials *cred1 = + Z_WRAPPED_GRPC_CALL_CREDS_P(cred1_obj); + wrapped_grpc_call_credentials *cred2 = + Z_WRAPPED_GRPC_CALL_CREDS_P(cred2_obj); + grpc_call_credentials *creds = + grpc_composite_call_credentials_create(cred1->wrapped, + cred2->wrapped, NULL); + grpc_php_wrap_call_credentials(creds, return_value); + RETURN_DESTROY_ZVAL(return_value); +#endif } /** @@ -141,13 +193,10 @@ PHP_METHOD(CallCredentials, createFromPlugin) { memset(fci_cache, 0, sizeof(zend_fcall_info_cache)); /* "f" == 1 function */ - if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "f", fci, - fci_cache, - fci->params, - fci->param_count) == FAILURE) { + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "f*", fci, fci_cache, + fci->params, fci->param_count) == FAILURE) { zend_throw_exception(spl_ce_InvalidArgumentException, - "createFromPlugin expects 1 callback", - 1 TSRMLS_CC); + "createFromPlugin expects 1 callback", 1 TSRMLS_CC); return; } @@ -165,10 +214,15 @@ PHP_METHOD(CallCredentials, createFromPlugin) { plugin.state = (void *)state; plugin.type = ""; - grpc_call_credentials *creds = grpc_metadata_credentials_create_from_plugin( - plugin, NULL); + grpc_call_credentials *creds = + grpc_metadata_credentials_create_from_plugin(plugin, NULL); +#if PHP_MAJOR_VERSION < 7 zval *creds_object = grpc_php_wrap_call_credentials(creds TSRMLS_CC); RETURN_DESTROY_ZVAL(creds_object); +#else + grpc_php_wrap_call_credentials(creds, return_value); + RETURN_DESTROY_ZVAL(return_value); +#endif } /* Callback function for plugin creds API */ @@ -181,6 +235,7 @@ void plugin_get_metadata(void *ptr, grpc_auth_metadata_context context, /* prepare to call the user callback function with info from the * grpc_auth_metadata_context */ +#if PHP_MAJOR_VERSION < 7 zval **params[1]; zval *arg; zval *retval; @@ -192,21 +247,41 @@ void plugin_get_metadata(void *ptr, grpc_auth_metadata_context context, state->fci->param_count = 1; state->fci->params = params; state->fci->retval_ptr_ptr = &retval; +#else + zval arg; + zval retval; + object_init(&arg); + add_property_string(&arg, "service_url", context.service_url); + add_property_string(&arg, "method_name", context.method_name); + state->fci->param_count = 1; + state->fci->params = &arg; + state->fci->retval = &retval; +#endif /* call the user callback function */ zend_call_function(state->fci, state->fci_cache TSRMLS_CC); +#if PHP_MAJOR_VERSION < 7 if (Z_TYPE_P(retval) != IS_ARRAY) { +#else + if (Z_TYPE_P(&retval) != IS_ARRAY) { +#endif zend_throw_exception(spl_ce_InvalidArgumentException, "plugin callback must return metadata array", 1 TSRMLS_CC); + return; } grpc_metadata_array metadata; +#if PHP_MAJOR_VERSION < 7 if (!create_metadata_array(retval, &metadata)) { +#else + if (!create_metadata_array(&retval, &metadata)) { +#endif zend_throw_exception(spl_ce_InvalidArgumentException, "invalid metadata", 1 TSRMLS_CC); grpc_metadata_array_destroy(&metadata); + return; } /* TODO: handle error */ @@ -229,11 +304,21 @@ static zend_function_entry call_credentials_methods[] = { ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) PHP_ME(CallCredentials, createFromPlugin, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) - PHP_FE_END}; + PHP_FE_END +}; void grpc_init_call_credentials(TSRMLS_D) { zend_class_entry ce; INIT_CLASS_ENTRY(ce, "Grpc\\CallCredentials", call_credentials_methods); ce.create_object = create_wrapped_grpc_call_credentials; grpc_ce_call_credentials = zend_register_internal_class(&ce TSRMLS_CC); +#if PHP_MAJOR_VERSION >= 7 + memcpy(&call_credentials_ce_handlers, + zend_get_std_object_handlers(), + sizeof(zend_object_handlers)); + call_credentials_ce_handlers.offset = + XtOffsetOf(wrapped_grpc_call_credentials, std); + call_credentials_ce_handlers.free_obj = + free_wrapped_grpc_call_credentials; +#endif } diff --git a/src/php/ext/grpc/call_credentials.h b/src/php/ext/grpc/call_credentials.h index d2f6a92449..e05e14698b 100755 --- a/src/php/ext/grpc/call_credentials.h +++ b/src/php/ext/grpc/call_credentials.h @@ -49,14 +49,37 @@ /* Class entry for the CallCredentials PHP class */ extern zend_class_entry *grpc_ce_call_credentials; +#if PHP_MAJOR_VERSION < 7 + /* Wrapper struct for grpc_call_credentials that can be associated * with a PHP object */ typedef struct wrapped_grpc_call_credentials { zend_object std; + grpc_call_credentials *wrapped; +} wrapped_grpc_call_credentials; +#else + +/* Wrapper struct for grpc_call_credentials that can be associated + * with a PHP object */ +typedef struct wrapped_grpc_call_credentials { grpc_call_credentials *wrapped; + zend_object std; } wrapped_grpc_call_credentials; +static inline wrapped_grpc_call_credentials +*wrapped_grpc_call_creds_from_obj(zend_object *obj) { + return + (wrapped_grpc_call_credentials*)((char*)(obj) - + XtOffsetOf(wrapped_grpc_call_credentials, + std)); +} + +#define Z_WRAPPED_GRPC_CALL_CREDS_P(zv) \ + wrapped_grpc_call_creds_from_obj(Z_OBJ_P((zv))) + +#endif /* PHP_MAJOR_VERSION */ + /* Struct to hold callback function for plugin creds API */ typedef struct plugin_state { zend_fcall_info *fci; diff --git a/src/php/ext/grpc/channel.c b/src/php/ext/grpc/channel.c index 8d94c59683..6737e340f9 100644 --- a/src/php/ext/grpc/channel.c +++ b/src/php/ext/grpc/channel.c @@ -57,12 +57,15 @@ zend_class_entry *grpc_ce_channel; +#if PHP_MAJOR_VERSION < 7 + /* Frees and destroys an instance of wrapped_grpc_channel */ void free_wrapped_grpc_channel(void *object TSRMLS_DC) { wrapped_grpc_channel *channel = (wrapped_grpc_channel *)object; if (channel->wrapped != NULL) { grpc_channel_destroy(channel->wrapped); } + zend_object_std_dtor(&channel->std TSRMLS_CC); efree(channel); } @@ -83,7 +86,8 @@ zend_object_value create_wrapped_grpc_channel(zend_class_entry *class_type return retval; } -void php_grpc_read_args_array(zval *args_array, grpc_channel_args *args TSRMLS_DC) { +void php_grpc_read_args_array(zval *args_array, + grpc_channel_args *args TSRMLS_DC) { HashTable *array_hash; HashPosition array_pointer; int args_index; @@ -107,23 +111,88 @@ void php_grpc_read_args_array(zval *args_array, grpc_channel_args *args TSRMLS_D } args->args[args_index].key = key; switch (Z_TYPE_P(*data)) { - case IS_LONG: - args->args[args_index].value.integer = (int)Z_LVAL_P(*data); - args->args[args_index].type = GRPC_ARG_INTEGER; - break; - case IS_STRING: - args->args[args_index].value.string = Z_STRVAL_P(*data); - args->args[args_index].type = GRPC_ARG_STRING; - break; - default: - zend_throw_exception(spl_ce_InvalidArgumentException, - "args values must be int or string", 1 TSRMLS_CC); - return; + case IS_LONG: + args->args[args_index].value.integer = (int)Z_LVAL_P(*data); + args->args[args_index].type = GRPC_ARG_INTEGER; + break; + case IS_STRING: + args->args[args_index].value.string = Z_STRVAL_P(*data); + args->args[args_index].type = GRPC_ARG_STRING; + break; + default: + zend_throw_exception(spl_ce_InvalidArgumentException, + "args values must be int or string", 1 TSRMLS_CC); + return; } args_index++; } } +#else + +static zend_object_handlers channel_ce_handlers; + +/* Frees and destroys an instance of wrapped_grpc_channel */ +static void free_wrapped_grpc_channel(zend_object *object) { + wrapped_grpc_channel *channel = wrapped_grpc_channel_from_obj(object); + if (channel->wrapped != NULL) { + grpc_channel_destroy(channel->wrapped); + } + zend_object_std_dtor(&channel->std); +} + +/* Initializes an instance of wrapped_grpc_channel to be associated with an + * object of a class specified by class_type */ +zend_object *create_wrapped_grpc_channel(zend_class_entry *class_type) { + wrapped_grpc_channel *intern; + intern = ecalloc(1, sizeof(wrapped_grpc_channel) + + zend_object_properties_size(class_type)); + zend_object_std_init(&intern->std, class_type); + object_properties_init(&intern->std, class_type); + intern->std.handlers = &channel_ce_handlers; + return &intern->std; +} + +void php_grpc_read_args_array(zval *args_array, grpc_channel_args *args) { + HashTable *array_hash; + int args_index; + zval *data; + zend_string *key; + array_hash = HASH_OF(args_array); + if (!array_hash) { + zend_throw_exception(spl_ce_InvalidArgumentException, + "array_hash is NULL", 1); + return; + } + args->num_args = zend_hash_num_elements(array_hash); + args->args = ecalloc(args->num_args, sizeof(grpc_arg)); + args_index = 0; + ZEND_HASH_FOREACH_STR_KEY_VAL(array_hash, key, data) { + if (key == NULL) { + zend_throw_exception(spl_ce_InvalidArgumentException, + "args keys must be strings", 1); + } + args->args[args_index].key = ZSTR_VAL(key); + switch (Z_TYPE_P(data)) { + case IS_LONG: + args->args[args_index].value.integer = (int)Z_LVAL_P(data); + args->args[args_index].type = GRPC_ARG_INTEGER; + break; + case IS_STRING: + args->args[args_index].value.string = Z_STRVAL_P(data); + args->args[args_index].type = GRPC_ARG_STRING; + break; + default: + zend_throw_exception(spl_ce_InvalidArgumentException, + "args values must be int or string", 1); + return; + } + args_index++; + } ZEND_HASH_FOREACH_END(); +} + +#endif + /** * Construct an instance of the Channel class. If the $args array contains a * "credentials" key mapping to a ChannelCredentials object, a secure channel @@ -132,16 +201,23 @@ void php_grpc_read_args_array(zval *args_array, grpc_channel_args *args TSRMLS_D * @param array $args The arguments to pass to the Channel (optional) */ PHP_METHOD(Channel, __construct) { +#if PHP_MAJOR_VERSION < 7 wrapped_grpc_channel *channel = (wrapped_grpc_channel *)zend_object_store_get_object( getThis() TSRMLS_CC); - char *target; + zval **creds_obj = NULL; int target_length; +#else + wrapped_grpc_channel *channel = Z_WRAPPED_GRPC_CHANNEL_P(getThis()); + zval *creds_obj = NULL; + size_t target_length; +#endif + char *target; zval *args_array = NULL; grpc_channel_args args; HashTable *array_hash; - zval **creds_obj = NULL; wrapped_grpc_channel_credentials *creds = NULL; + /* "sa" == 1 string, 1 array */ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sa", &target, &target_length, &args_array) == FAILURE) { @@ -149,6 +225,7 @@ PHP_METHOD(Channel, __construct) { "Channel expects a string and an array", 1 TSRMLS_CC); return; } +#if PHP_MAJOR_VERSION < 7 array_hash = Z_ARRVAL_P(args_array); if (zend_hash_find(array_hash, "credentials", sizeof("credentials"), (void **)&creds_obj) == SUCCESS) { @@ -167,6 +244,24 @@ PHP_METHOD(Channel, __construct) { zend_hash_del(array_hash, "credentials", 12); } } +#else + array_hash = HASH_OF(args_array); + if ((creds_obj = zend_hash_str_find(array_hash, "credentials", + sizeof("credentials") - 1)) != NULL) { + if (Z_TYPE_P(creds_obj) == IS_NULL) { + creds = NULL; + zend_hash_str_del(array_hash, "credentials", sizeof("credentials") - 1); + } else if (Z_OBJ_P(creds_obj)->ce != grpc_ce_channel_credentials) { + zend_throw_exception(spl_ce_InvalidArgumentException, + "credentials must be a ChannelCredentials object", + 1); + return; + } else { + creds = Z_WRAPPED_GRPC_CHANNEL_CREDS_P(creds_obj); + zend_hash_str_del(array_hash, "credentials", sizeof("credentials") - 1); + } + } +#endif php_grpc_read_args_array(args_array, &args TSRMLS_CC); if (creds == NULL) { channel->wrapped = grpc_insecure_channel_create(target, &args, NULL); @@ -182,9 +277,14 @@ PHP_METHOD(Channel, __construct) { * @return string The URI of the endpoint */ PHP_METHOD(Channel, getTarget) { +#if PHP_MAJOR_VERSION < 7 wrapped_grpc_channel *channel = - (wrapped_grpc_channel *)zend_object_store_get_object(getThis() TSRMLS_CC); + (wrapped_grpc_channel *)zend_object_store_get_object(getThis() TSRMLS_CC); RETURN_STRING(grpc_channel_get_target(channel->wrapped), 1); +#else + wrapped_grpc_channel *channel = Z_WRAPPED_GRPC_CHANNEL_P(getThis()); + RETURN_STRING(grpc_channel_get_target(channel->wrapped)); +#endif } /** @@ -193,12 +293,17 @@ PHP_METHOD(Channel, getTarget) { * @return long The grpc connectivity state */ PHP_METHOD(Channel, getConnectivityState) { +#if PHP_MAJOR_VERSION < 7 wrapped_grpc_channel *channel = (wrapped_grpc_channel *)zend_object_store_get_object(getThis() TSRMLS_CC); - bool try_to_connect; +#else + wrapped_grpc_channel *channel = Z_WRAPPED_GRPC_CHANNEL_P(getThis()); +#endif + bool try_to_connect = false; + /* "|b" == 1 optional bool */ - if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|b", &try_to_connect) == - FAILURE) { + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|b", &try_to_connect) + == FAILURE) { zend_throw_exception(spl_ce_InvalidArgumentException, "getConnectivityState expects a bool", 1 TSRMLS_CC); return; @@ -215,10 +320,16 @@ PHP_METHOD(Channel, getConnectivityState) { * before deadline */ PHP_METHOD(Channel, watchConnectivityState) { +#if PHP_MAJOR_VERSION < 7 + long last_state; wrapped_grpc_channel *channel = (wrapped_grpc_channel *)zend_object_store_get_object(getThis() TSRMLS_CC); - long last_state; +#else + zend_long last_state; + wrapped_grpc_channel *channel = Z_WRAPPED_GRPC_CHANNEL_P(getThis()); +#endif zval *deadline_obj; + /* "lO" == 1 long 1 object */ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "lO", &last_state, &deadline_obj, grpc_ce_timeval) == FAILURE) { @@ -228,15 +339,20 @@ PHP_METHOD(Channel, watchConnectivityState) { return; } +#if PHP_MAJOR_VERSION < 7 wrapped_grpc_timeval *deadline = (wrapped_grpc_timeval *)zend_object_store_get_object( deadline_obj TSRMLS_CC); - grpc_channel_watch_connectivity_state( - channel->wrapped, (grpc_connectivity_state)last_state, - deadline->wrapped, completion_queue, NULL); - grpc_event event = grpc_completion_queue_pluck( - completion_queue, NULL, - gpr_inf_future(GPR_CLOCK_REALTIME), NULL); +#else + wrapped_grpc_timeval *deadline = Z_WRAPPED_GRPC_TIMEVAL_P(deadline_obj); +#endif + grpc_channel_watch_connectivity_state(channel->wrapped, + (grpc_connectivity_state)last_state, + deadline->wrapped, completion_queue, + NULL); + grpc_event event = + grpc_completion_queue_pluck(completion_queue, NULL, + gpr_inf_future(GPR_CLOCK_REALTIME), NULL); RETURN_BOOL(event.success); } @@ -244,8 +360,12 @@ PHP_METHOD(Channel, watchConnectivityState) { * Close the channel */ PHP_METHOD(Channel, close) { +#if PHP_MAJOR_VERSION < 7 wrapped_grpc_channel *channel = - (wrapped_grpc_channel *)zend_object_store_get_object(getThis() TSRMLS_CC); + (wrapped_grpc_channel *)zend_object_store_get_object(getThis() TSRMLS_CC); +#else + wrapped_grpc_channel *channel = Z_WRAPPED_GRPC_CHANNEL_P(getThis()); +#endif if (channel->wrapped != NULL) { grpc_channel_destroy(channel->wrapped); channel->wrapped = NULL; @@ -253,16 +373,24 @@ PHP_METHOD(Channel, close) { } static zend_function_entry channel_methods[] = { - PHP_ME(Channel, __construct, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_CTOR) - PHP_ME(Channel, getTarget, NULL, ZEND_ACC_PUBLIC) - PHP_ME(Channel, getConnectivityState, NULL, ZEND_ACC_PUBLIC) - PHP_ME(Channel, watchConnectivityState, NULL, ZEND_ACC_PUBLIC) - PHP_ME(Channel, close, NULL, ZEND_ACC_PUBLIC) - PHP_FE_END}; + PHP_ME(Channel, __construct, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_CTOR) + PHP_ME(Channel, getTarget, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Channel, getConnectivityState, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Channel, watchConnectivityState, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Channel, close, NULL, ZEND_ACC_PUBLIC) + PHP_FE_END +}; void grpc_init_channel(TSRMLS_D) { zend_class_entry ce; INIT_CLASS_ENTRY(ce, "Grpc\\Channel", channel_methods); ce.create_object = create_wrapped_grpc_channel; grpc_ce_channel = zend_register_internal_class(&ce TSRMLS_CC); +#if PHP_MAJOR_VERSION >= 7 + memcpy(&channel_ce_handlers, zend_get_std_object_handlers(), + sizeof(zend_object_handlers)); + channel_ce_handlers.offset = + XtOffsetOf(wrapped_grpc_channel, std); + channel_ce_handlers.free_obj = free_wrapped_grpc_channel; +#endif } diff --git a/src/php/ext/grpc/channel.h b/src/php/ext/grpc/channel.h index cc5823ee7f..ea5efeaf86 100755 --- a/src/php/ext/grpc/channel.h +++ b/src/php/ext/grpc/channel.h @@ -48,17 +48,38 @@ /* Class entry for the PHP Channel class */ extern zend_class_entry *grpc_ce_channel; +#if PHP_MAJOR_VERSION < 7 + /* Wrapper struct for grpc_channel that can be associated with a PHP object */ typedef struct wrapped_grpc_channel { zend_object std; + grpc_channel *wrapped; +} wrapped_grpc_channel; +#else + +/* Wrapper struct for grpc_channel that can be associated with a PHP object */ +typedef struct wrapped_grpc_channel { grpc_channel *wrapped; + zend_object std; } wrapped_grpc_channel; +static inline wrapped_grpc_channel +*wrapped_grpc_channel_from_obj(zend_object *obj) { + return (wrapped_grpc_channel*)((char*)(obj) - + XtOffsetOf(wrapped_grpc_channel, std)); +} + +#define Z_WRAPPED_GRPC_CHANNEL_P(zv) \ + wrapped_grpc_channel_from_obj(Z_OBJ_P((zv))) + +#endif /* PHP_MAJOR_VERSION */ + /* Initializes the Channel class */ void grpc_init_channel(TSRMLS_D); /* Iterates through a PHP array and populates args with the contents */ -void php_grpc_read_args_array(zval *args_array, grpc_channel_args *args TSRMLS_DC); +void php_grpc_read_args_array(zval *args_array, grpc_channel_args *args + TSRMLS_DC); #endif /* NET_GRPC_PHP_GRPC_CHANNEL_H_ */ diff --git a/src/php/ext/grpc/channel_credentials.c b/src/php/ext/grpc/channel_credentials.c index b76fb105f3..16ba0368eb 100644 --- a/src/php/ext/grpc/channel_credentials.c +++ b/src/php/ext/grpc/channel_credentials.c @@ -52,7 +52,6 @@ #include <grpc/grpc_security.h> zend_class_entry *grpc_ce_channel_credentials; - static char *default_pem_root_certs = NULL; static grpc_ssl_roots_override_result get_ssl_roots_override( @@ -64,6 +63,8 @@ static grpc_ssl_roots_override_result get_ssl_roots_override( return GRPC_SSL_ROOTS_OVERRIDE_OK; } +#if PHP_MAJOR_VERSION < 7 + /* Frees and destroys an instance of wrapped_grpc_channel_credentials */ void free_wrapped_grpc_channel_credentials(void *object TSRMLS_DC) { wrapped_grpc_channel_credentials *creds = @@ -71,6 +72,7 @@ void free_wrapped_grpc_channel_credentials(void *object TSRMLS_DC) { if (creds->wrapped != NULL) { grpc_channel_credentials_release(creds->wrapped); } + zend_object_std_dtor(&creds->std TSRMLS_CC); efree(creds); } @@ -94,7 +96,8 @@ zend_object_value create_wrapped_grpc_channel_credentials( return retval; } -zval *grpc_php_wrap_channel_credentials(grpc_channel_credentials *wrapped TSRMLS_DC) { +zval *grpc_php_wrap_channel_credentials(grpc_channel_credentials + *wrapped TSRMLS_DC) { zval *credentials_object; MAKE_STD_ZVAL(credentials_object); object_init_ex(credentials_object, grpc_ce_channel_credentials); @@ -105,6 +108,43 @@ zval *grpc_php_wrap_channel_credentials(grpc_channel_credentials *wrapped TSRMLS return credentials_object; } +#else + +static zend_object_handlers channel_credentials_ce_handlers; + +/* Frees and destroys an instance of wrapped_grpc_channel_credentials */ +static void free_wrapped_grpc_channel_credentials(zend_object *object) { + wrapped_grpc_channel_credentials *creds = + wrapped_grpc_channel_creds_from_obj(object); + if (creds->wrapped != NULL) { + grpc_channel_credentials_release(creds->wrapped); + } + zend_object_std_dtor(&creds->std); +} + +/* Initializes an instance of wrapped_grpc_channel_credentials to be + * associated with an object of a class specified by class_type */ +zend_object *create_wrapped_grpc_channel_credentials(zend_class_entry + *class_type) { + wrapped_grpc_channel_credentials *intern; + intern = ecalloc(1, sizeof(wrapped_grpc_channel_credentials) + + zend_object_properties_size(class_type)); + zend_object_std_init(&intern->std, class_type); + object_properties_init(&intern->std, class_type); + intern->std.handlers = &channel_credentials_ce_handlers; + return &intern->std; +} + +void grpc_php_wrap_channel_credentials(grpc_channel_credentials *wrapped, + zval *credentials_object) { + object_init_ex(credentials_object, grpc_ce_channel_credentials); + wrapped_grpc_channel_credentials *credentials = + Z_WRAPPED_GRPC_CHANNEL_CREDS_P(credentials_object); + credentials->wrapped = wrapped; +} + +#endif + /** * Set default roots pem. * @param string pem_roots PEM encoding of the server root certificates @@ -112,7 +152,13 @@ zval *grpc_php_wrap_channel_credentials(grpc_channel_credentials *wrapped TSRMLS */ PHP_METHOD(ChannelCredentials, setDefaultRootsPem) { char *pem_roots; +#if PHP_MAJOR_VERSION < 7 int pem_roots_length; +#else + size_t pem_roots_length; +#endif + + /* "s" == 1 string */ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &pem_roots, &pem_roots_length) == FAILURE) { zend_throw_exception(spl_ce_InvalidArgumentException, @@ -129,8 +175,13 @@ PHP_METHOD(ChannelCredentials, setDefaultRootsPem) { */ PHP_METHOD(ChannelCredentials, createDefault) { grpc_channel_credentials *creds = grpc_google_default_credentials_create(); +#if PHP_MAJOR_VERSION < 7 zval *creds_object = grpc_php_wrap_channel_credentials(creds TSRMLS_CC); RETURN_DESTROY_ZVAL(creds_object); +#else + grpc_php_wrap_channel_credentials(creds, return_value); + RETURN_DESTROY_ZVAL(return_value); +#endif } /** @@ -146,11 +197,15 @@ PHP_METHOD(ChannelCredentials, createSsl) { char *pem_root_certs = NULL; grpc_ssl_pem_key_cert_pair pem_key_cert_pair; +#if PHP_MAJOR_VERSION < 7 int root_certs_length = 0, private_key_length = 0, cert_chain_length = 0; +#else + size_t root_certs_length = 0, private_key_length = 0, cert_chain_length = 0; +#endif pem_key_cert_pair.private_key = pem_key_cert_pair.cert_chain = NULL; - /* "|s!s!s! == 3 optional nullable strings */ + /* "|s!s!s!" == 3 optional nullable strings */ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|s!s!s!", &pem_root_certs, &root_certs_length, &pem_key_cert_pair.private_key, @@ -164,8 +219,13 @@ PHP_METHOD(ChannelCredentials, createSsl) { grpc_channel_credentials *creds = grpc_ssl_credentials_create( pem_root_certs, pem_key_cert_pair.private_key == NULL ? NULL : &pem_key_cert_pair, NULL); +#if PHP_MAJOR_VERSION < 7 zval *creds_object = grpc_php_wrap_channel_credentials(creds TSRMLS_CC); RETURN_DESTROY_ZVAL(creds_object); +#else + grpc_php_wrap_channel_credentials(creds, return_value); + RETURN_DESTROY_ZVAL(return_value); +#endif } /** @@ -178,7 +238,7 @@ PHP_METHOD(ChannelCredentials, createComposite) { zval *cred1_obj; zval *cred2_obj; - /* "OO" == 3 Objects */ + /* "OO" == 2 Objects */ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "OO", &cred1_obj, grpc_ce_channel_credentials, &cred2_obj, grpc_ce_call_credentials) == FAILURE) { @@ -186,6 +246,7 @@ PHP_METHOD(ChannelCredentials, createComposite) { "createComposite expects 2 Credentials", 1 TSRMLS_CC); return; } +#if PHP_MAJOR_VERSION < 7 wrapped_grpc_channel_credentials *cred1 = (wrapped_grpc_channel_credentials *)zend_object_store_get_object( cred1_obj TSRMLS_CC); @@ -197,6 +258,17 @@ PHP_METHOD(ChannelCredentials, createComposite) { NULL); zval *creds_object = grpc_php_wrap_channel_credentials(creds TSRMLS_CC); RETURN_DESTROY_ZVAL(creds_object); +#else + wrapped_grpc_channel_credentials *cred1 = + Z_WRAPPED_GRPC_CHANNEL_CREDS_P(cred1_obj); + wrapped_grpc_call_credentials *cred2 = + Z_WRAPPED_GRPC_CALL_CREDS_P(cred2_obj); + grpc_channel_credentials *creds = + grpc_composite_channel_credentials_create(cred1->wrapped, + cred2->wrapped, NULL); + grpc_php_wrap_channel_credentials(creds, return_value); + RETURN_DESTROY_ZVAL(return_value); +#endif } /** @@ -218,7 +290,8 @@ static zend_function_entry channel_credentials_methods[] = { ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) PHP_ME(ChannelCredentials, createInsecure, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) - PHP_FE_END}; + PHP_FE_END +}; void grpc_init_channel_credentials(TSRMLS_D) { zend_class_entry ce; @@ -227,4 +300,13 @@ void grpc_init_channel_credentials(TSRMLS_D) { grpc_set_ssl_roots_override_callback(get_ssl_roots_override); ce.create_object = create_wrapped_grpc_channel_credentials; grpc_ce_channel_credentials = zend_register_internal_class(&ce TSRMLS_CC); +#if PHP_MAJOR_VERSION >= 7 + memcpy(&channel_credentials_ce_handlers, + zend_get_std_object_handlers(), + sizeof(zend_object_handlers)); + channel_credentials_ce_handlers.offset = + XtOffsetOf(wrapped_grpc_channel_credentials, std); + channel_credentials_ce_handlers.free_obj = + free_wrapped_grpc_channel_credentials; +#endif } diff --git a/src/php/ext/grpc/channel_credentials.h b/src/php/ext/grpc/channel_credentials.h index d89984ce60..44071b10f1 100755 --- a/src/php/ext/grpc/channel_credentials.h +++ b/src/php/ext/grpc/channel_credentials.h @@ -49,14 +49,37 @@ /* Class entry for the ChannelCredentials PHP class */ extern zend_class_entry *grpc_ce_channel_credentials; +#if PHP_MAJOR_VERSION < 7 + /* Wrapper struct for grpc_channel_credentials that can be associated * with a PHP object */ typedef struct wrapped_grpc_channel_credentials { zend_object std; + grpc_channel_credentials *wrapped; +} wrapped_grpc_channel_credentials; +#else + +/* Wrapper struct for grpc_channel_credentials that can be associated + * with a PHP object */ +typedef struct wrapped_grpc_channel_credentials { grpc_channel_credentials *wrapped; + zend_object std; } wrapped_grpc_channel_credentials; +static inline wrapped_grpc_channel_credentials +*wrapped_grpc_channel_creds_from_obj(zend_object *obj) { + return + (wrapped_grpc_channel_credentials *) + ((char*)(obj) - + XtOffsetOf(wrapped_grpc_channel_credentials, std)); +} + +#define Z_WRAPPED_GRPC_CHANNEL_CREDS_P(zv) \ + wrapped_grpc_channel_creds_from_obj(Z_OBJ_P((zv))) + +#endif /* PHP_MAJOR_VERSION */ + /* Initializes the ChannelCredentials PHP class */ void grpc_init_channel_credentials(TSRMLS_D); diff --git a/src/php/ext/grpc/php_grpc.c b/src/php/ext/grpc/php_grpc.c index 449ba3cd47..5edfa2da7d 100644 --- a/src/php/ext/grpc/php_grpc.c +++ b/src/php/ext/grpc/php_grpc.c @@ -64,15 +64,19 @@ const zend_function_entry grpc_functions[] = { */ zend_module_entry grpc_module_entry = { #if ZEND_MODULE_API_NO >= 20010901 - STANDARD_MODULE_HEADER, + STANDARD_MODULE_HEADER, #endif - "grpc", grpc_functions, PHP_MINIT(grpc), - PHP_MSHUTDOWN(grpc), NULL, NULL, - PHP_MINFO(grpc), + "grpc", + grpc_functions, + PHP_MINIT(grpc), + PHP_MSHUTDOWN(grpc), + NULL, + NULL, + PHP_MINFO(grpc), #if ZEND_MODULE_API_NO >= 20010901 - PHP_GRPC_VERSION, + PHP_GRPC_VERSION, #endif - STANDARD_MODULE_PROPERTIES}; + STANDARD_MODULE_PROPERTIES}; /* }}} */ #ifdef COMPILE_DL_GRPC @@ -82,23 +86,24 @@ ZEND_GET_MODULE(grpc) /* {{{ PHP_INI */ /* Remove comments and fill if you need to have entries in php.ini -PHP_INI_BEGIN() - STD_PHP_INI_ENTRY("grpc.global_value", "42", PHP_INI_ALL, OnUpdateLong, -global_value, zend_grpc_globals, grpc_globals) - STD_PHP_INI_ENTRY("grpc.global_string", "foobar", PHP_INI_ALL, -OnUpdateString, global_string, zend_grpc_globals, grpc_globals) -PHP_INI_END() + PHP_INI_BEGIN() + STD_PHP_INI_ENTRY("grpc.global_value", "42", PHP_INI_ALL, OnUpdateLong, + global_value, zend_grpc_globals, grpc_globals) + STD_PHP_INI_ENTRY("grpc.global_string", "foobar", PHP_INI_ALL, + OnUpdateString, global_string, zend_grpc_globals, + grpc_globals) + PHP_INI_END() */ /* }}} */ /* {{{ php_grpc_init_globals */ /* Uncomment this function if you have INI entries -static void php_grpc_init_globals(zend_grpc_globals *grpc_globals) -{ - grpc_globals->global_value = 0; - grpc_globals->global_string = NULL; -} + static void php_grpc_init_globals(zend_grpc_globals *grpc_globals) + { + grpc_globals->global_value = 0; + grpc_globals->global_string = NULL; + } */ /* }}} */ @@ -106,7 +111,7 @@ static void php_grpc_init_globals(zend_grpc_globals *grpc_globals) */ PHP_MINIT_FUNCTION(grpc) { /* If you have INI entries, uncomment these lines - REGISTER_INI_ENTRIES(); + REGISTER_INI_ENTRIES(); */ /* Register call error constants */ grpc_init(); @@ -246,7 +251,7 @@ PHP_MINIT_FUNCTION(grpc) { */ PHP_MSHUTDOWN_FUNCTION(grpc) { /* uncomment this line if you have INI entries - UNREGISTER_INI_ENTRIES(); + UNREGISTER_INI_ENTRIES(); */ // WARNING: This function IS being called by PHP when the extension // is unloaded but the logs were somehow suppressed. @@ -265,7 +270,7 @@ PHP_MINFO_FUNCTION(grpc) { php_info_print_table_end(); /* Remove comments if you have entries in php.ini - DISPLAY_INI_ENTRIES(); + DISPLAY_INI_ENTRIES(); */ } /* }}} */ @@ -274,12 +279,3 @@ PHP_MINFO_FUNCTION(grpc) { function definition, where the functions purpose is also documented. Please follow this convention for the convenience of others editing your code. */ - -/* - * Local variables: - * tab-width: 4 - * c-basic-offset: 4 - * End: - * vim600: noet sw=4 ts=4 fdm=marker - * vim<600: noet sw=4 ts=4 - */ diff --git a/src/php/ext/grpc/php_grpc.h b/src/php/ext/grpc/php_grpc.h index 1d4834c50f..bd7ee75a6f 100644 --- a/src/php/ext/grpc/php_grpc.h +++ b/src/php/ext/grpc/php_grpc.h @@ -72,8 +72,8 @@ PHP_MSHUTDOWN_FUNCTION(grpc); PHP_MINFO_FUNCTION(grpc); /* - Declare any global variables you may need between the BEGIN - and END macros here: + Declare any global variables you may need between the BEGIN + and END macros here: ZEND_BEGIN_MODULE_GLOBALS(grpc) ZEND_END_MODULE_GLOBALS(grpc) diff --git a/src/php/ext/grpc/server.c b/src/php/ext/grpc/server.c index c13e7cd1f9..50fb2d0cf9 100644 --- a/src/php/ext/grpc/server.c +++ b/src/php/ext/grpc/server.c @@ -58,6 +58,8 @@ zend_class_entry *grpc_ce_server; +#if PHP_MAJOR_VERSION < 7 + /* Frees and destroys an instance of wrapped_grpc_server */ void free_wrapped_grpc_server(void *object TSRMLS_DC) { wrapped_grpc_server *server = (wrapped_grpc_server *)object; @@ -68,6 +70,7 @@ void free_wrapped_grpc_server(void *object TSRMLS_DC) { gpr_inf_future(GPR_CLOCK_REALTIME), NULL); grpc_server_destroy(server->wrapped); } + zend_object_std_dtor(&server->std TSRMLS_CC); efree(server); } @@ -90,15 +93,51 @@ zend_object_value create_wrapped_grpc_server(zend_class_entry *class_type return retval; } +#else + +static zend_object_handlers server_ce_handlers; + +/* Frees and destroys an instance of wrapped_grpc_server */ +static void free_wrapped_grpc_server(zend_object *object) { + wrapped_grpc_server *server = wrapped_grpc_server_from_obj(object); + if (server->wrapped != NULL) { + grpc_server_shutdown_and_notify(server->wrapped, completion_queue, NULL); + grpc_server_cancel_all_calls(server->wrapped); + grpc_completion_queue_pluck(completion_queue, NULL, + gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + grpc_server_destroy(server->wrapped); + } + zend_object_std_dtor(&server->std); +} + +/* Initializes an instance of wrapped_grpc_call to be associated with an object + * of a class specified by class_type */ +zend_object *create_wrapped_grpc_server(zend_class_entry *class_type) { + wrapped_grpc_server *intern; + intern = ecalloc(1, sizeof(wrapped_grpc_server) + + zend_object_properties_size(class_type)); + zend_object_std_init(&intern->std, class_type); + object_properties_init(&intern->std, class_type); + intern->std.handlers = &server_ce_handlers; + return &intern->std; +} + +#endif + /** * Constructs a new instance of the Server class * @param array $args The arguments to pass to the server (optional) */ PHP_METHOD(Server, __construct) { +#if PHP_MAJOR_VERSION < 7 wrapped_grpc_server *server = (wrapped_grpc_server *)zend_object_store_get_object(getThis() TSRMLS_CC); +#else + wrapped_grpc_server *server = Z_WRAPPED_GRPC_SERVER_P(getThis()); +#endif zval *args_array = NULL; grpc_channel_args args; + /* "|a" == 1 optional array */ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "|a", &args_array) == FAILURE) { @@ -110,6 +149,8 @@ PHP_METHOD(Server, __construct) { if (args_array == NULL) { server->wrapped = grpc_server_create(NULL, NULL); } else { + //TODO(thinkerou): deal it if key of array is long, crash now on php7 + // and update unit test case php_grpc_read_args_array(args_array, &args TSRMLS_CC); server->wrapped = grpc_server_create(&args, NULL); efree(args.args); @@ -126,15 +167,22 @@ PHP_METHOD(Server, __construct) { */ PHP_METHOD(Server, requestCall) { grpc_call_error error_code; - wrapped_grpc_server *server = - (wrapped_grpc_server *)zend_object_store_get_object(getThis() TSRMLS_CC); grpc_call *call; grpc_call_details details; grpc_metadata_array metadata; - zval *result; grpc_event event; + +#if PHP_MAJOR_VERSION < 7 + wrapped_grpc_server *server = + (wrapped_grpc_server *)zend_object_store_get_object(getThis() TSRMLS_CC); + zval *result; MAKE_STD_ZVAL(result); object_init(result); +#else + wrapped_grpc_server *server = Z_WRAPPED_GRPC_SERVER_P(getThis()); + object_init(return_value); +#endif + grpc_call_details_init(&details); grpc_metadata_array_init(&metadata); error_code = @@ -146,23 +194,48 @@ PHP_METHOD(Server, requestCall) { goto cleanup; } event = grpc_completion_queue_pluck(completion_queue, NULL, - gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + gpr_inf_future(GPR_CLOCK_REALTIME), + NULL); if (!event.success) { zend_throw_exception(spl_ce_LogicException, "Failed to request a call for some reason", 1 TSRMLS_CC); goto cleanup; } +#if PHP_MAJOR_VERSION < 7 add_property_zval(result, "call", grpc_php_wrap_call(call, true TSRMLS_CC)); add_property_string(result, "method", details.method, true); add_property_string(result, "host", details.host, true); add_property_zval(result, "absolute_deadline", grpc_php_wrap_timeval(details.deadline TSRMLS_CC)); - add_property_zval(result, "metadata", grpc_parse_metadata_array(&metadata TSRMLS_CC)); + add_property_zval(result, "metadata", grpc_parse_metadata_array(&metadata + TSRMLS_CC)); + cleanup: grpc_call_details_destroy(&details); grpc_metadata_array_destroy(&metadata); RETURN_DESTROY_ZVAL(result); + +#else + + zval zv_call; + zval zv_timeval; + zval zv_md; + grpc_php_wrap_call(call, true, &zv_call); + grpc_php_wrap_timeval(details.deadline, &zv_timeval); + grpc_parse_metadata_array(&metadata, &zv_md); + + add_property_zval(return_value, "call", &zv_call); + add_property_string(return_value, "method", details.method); + add_property_string(return_value, "host", details.host); + add_property_zval(return_value, "absolute_deadline", &zv_timeval); + add_property_zval(return_value, "metadata", &zv_md); + + cleanup: + grpc_call_details_destroy(&details); + grpc_metadata_array_destroy(&metadata); + RETURN_DESTROY_ZVAL(return_value); +#endif } /** @@ -171,13 +244,19 @@ cleanup: * @return true on success, false on failure */ PHP_METHOD(Server, addHttp2Port) { - wrapped_grpc_server *server = - (wrapped_grpc_server *)zend_object_store_get_object(getThis() TSRMLS_CC); const char *addr; +#if PHP_MAJOR_VERSION < 7 int addr_len; + wrapped_grpc_server *server = + (wrapped_grpc_server *)zend_object_store_get_object(getThis() TSRMLS_CC); +#else + size_t addr_len; + wrapped_grpc_server *server = Z_WRAPPED_GRPC_SERVER_P(getThis()); +#endif + /* "s" == 1 string */ - if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &addr, &addr_len) == - FAILURE) { + if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s", &addr, &addr_len) + == FAILURE) { zend_throw_exception(spl_ce_InvalidArgumentException, "add_http2_port expects a string", 1 TSRMLS_CC); return; @@ -186,11 +265,17 @@ PHP_METHOD(Server, addHttp2Port) { } PHP_METHOD(Server, addSecureHttp2Port) { - wrapped_grpc_server *server = - (wrapped_grpc_server *)zend_object_store_get_object(getThis() TSRMLS_CC); const char *addr; - int addr_len; zval *creds_obj; +#if PHP_MAJOR_VERSION < 7 + int addr_len; + wrapped_grpc_server *server = + (wrapped_grpc_server *)zend_object_store_get_object(getThis() TSRMLS_CC); +#else + size_t addr_len; + wrapped_grpc_server *server = Z_WRAPPED_GRPC_SERVER_P(getThis()); +#endif + /* "sO" == 1 string, 1 object */ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sO", &addr, &addr_len, &creds_obj, grpc_ce_server_credentials) == @@ -200,9 +285,14 @@ PHP_METHOD(Server, addSecureHttp2Port) { "add_http2_port expects a string and a ServerCredentials", 1 TSRMLS_CC); return; } +#if PHP_MAJOR_VERSION < 7 wrapped_grpc_server_credentials *creds = (wrapped_grpc_server_credentials *)zend_object_store_get_object( creds_obj TSRMLS_CC); +#else + wrapped_grpc_server_credentials *creds = + Z_WRAPPED_GRPC_SERVER_CREDS_P(creds_obj); +#endif RETURN_LONG(grpc_server_add_secure_http2_port(server->wrapped, addr, creds->wrapped)); } @@ -212,21 +302,33 @@ PHP_METHOD(Server, addSecureHttp2Port) { * @return Void */ PHP_METHOD(Server, start) { +#if PHP_MAJOR_VERSION < 7 wrapped_grpc_server *server = (wrapped_grpc_server *)zend_object_store_get_object(getThis() TSRMLS_CC); +#else + wrapped_grpc_server *server = Z_WRAPPED_GRPC_SERVER_P(getThis()); +#endif grpc_server_start(server->wrapped); } static zend_function_entry server_methods[] = { - PHP_ME(Server, __construct, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_CTOR) - PHP_ME(Server, requestCall, NULL, ZEND_ACC_PUBLIC) - PHP_ME(Server, addHttp2Port, NULL, ZEND_ACC_PUBLIC) - PHP_ME(Server, addSecureHttp2Port, NULL, ZEND_ACC_PUBLIC) - PHP_ME(Server, start, NULL, ZEND_ACC_PUBLIC) PHP_FE_END}; + PHP_ME(Server, __construct, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_CTOR) + PHP_ME(Server, requestCall, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Server, addHttp2Port, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Server, addSecureHttp2Port, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Server, start, NULL, ZEND_ACC_PUBLIC) + PHP_FE_END +}; void grpc_init_server(TSRMLS_D) { zend_class_entry ce; INIT_CLASS_ENTRY(ce, "Grpc\\Server", server_methods); ce.create_object = create_wrapped_grpc_server; grpc_ce_server = zend_register_internal_class(&ce TSRMLS_CC); +#if PHP_MAJOR_VERSION >= 7 + memcpy(&server_ce_handlers, zend_get_std_object_handlers(), + sizeof(zend_object_handlers)); + server_ce_handlers.offset = XtOffsetOf(wrapped_grpc_server, std); + server_ce_handlers.free_obj = free_wrapped_grpc_server; +#endif } diff --git a/src/php/ext/grpc/server.h b/src/php/ext/grpc/server.h index 022257f37c..a7df456a11 100755 --- a/src/php/ext/grpc/server.h +++ b/src/php/ext/grpc/server.h @@ -48,13 +48,33 @@ /* Class entry for the Server PHP class */ extern zend_class_entry *grpc_ce_server; +#if PHP_MAJOR_VERSION < 7 + /* Wrapper struct for grpc_server that can be associated with a PHP object */ typedef struct wrapped_grpc_server { zend_object std; + grpc_server *wrapped; +} wrapped_grpc_server; +#else + +/* Wrapper struct for grpc_server that can be associated with a PHP object */ +typedef struct wrapped_grpc_server { grpc_server *wrapped; + zend_object std; } wrapped_grpc_server; +static inline wrapped_grpc_server +*wrapped_grpc_server_from_obj(zend_object *obj) { + return (wrapped_grpc_server*)((char*)(obj) - + XtOffsetOf(wrapped_grpc_server, std)); +} + +#define Z_WRAPPED_GRPC_SERVER_P(zv) \ + wrapped_grpc_server_from_obj(Z_OBJ_P((zv))) + +#endif /* PHP_MAJOR_VERSION */ + /* Initializes the Server class */ void grpc_init_server(TSRMLS_D); diff --git a/src/php/ext/grpc/server_credentials.c b/src/php/ext/grpc/server_credentials.c index 505da10a28..296632d5bd 100644 --- a/src/php/ext/grpc/server_credentials.c +++ b/src/php/ext/grpc/server_credentials.c @@ -51,6 +51,8 @@ zend_class_entry *grpc_ce_server_credentials; +#if PHP_MAJOR_VERSION < 7 + /* Frees and destroys an instace of wrapped_grpc_server_credentials */ void free_wrapped_grpc_server_credentials(void *object TSRMLS_DC) { wrapped_grpc_server_credentials *creds = @@ -58,6 +60,7 @@ void free_wrapped_grpc_server_credentials(void *object TSRMLS_DC) { if (creds->wrapped != NULL) { grpc_server_credentials_release(creds->wrapped); } + zend_object_std_dtor(&creds->std TSRMLS_CC); efree(creds); } @@ -81,7 +84,8 @@ zend_object_value create_wrapped_grpc_server_credentials( return retval; } -zval *grpc_php_wrap_server_credentials(grpc_server_credentials *wrapped TSRMLS_DC) { +zval *grpc_php_wrap_server_credentials(grpc_server_credentials + *wrapped TSRMLS_DC) { zval *server_credentials_object; MAKE_STD_ZVAL(server_credentials_object); object_init_ex(server_credentials_object, grpc_ce_server_credentials); @@ -92,6 +96,43 @@ zval *grpc_php_wrap_server_credentials(grpc_server_credentials *wrapped TSRMLS_D return server_credentials_object; } +#else + +static zend_object_handlers server_credentials_ce_handlers; + +/* Frees and destroys an instace of wrapped_grpc_server_credentials */ +static void free_wrapped_grpc_server_credentials(zend_object *object) { + wrapped_grpc_server_credentials *creds = + wrapped_grpc_server_creds_from_obj(object); + if (creds->wrapped != NULL) { + grpc_server_credentials_release(creds->wrapped); + } + zend_object_std_dtor(&creds->std); +} + +/* Initializes an instace of wrapped_grpc_server_credentials to be associated + * with an object of a class specified by class_type */ +zend_object *create_wrapped_grpc_server_credentials(zend_class_entry + *class_type) { + wrapped_grpc_server_credentials *intern; + intern = ecalloc(1, sizeof(wrapped_grpc_server_credentials) + + zend_object_properties_size(class_type)); + zend_object_std_init(&intern->std, class_type); + object_properties_init(&intern->std, class_type); + intern->std.handlers = &server_credentials_ce_handlers; + return &intern->std; +} + +void grpc_php_wrap_server_credentials(grpc_server_credentials *wrapped, + zval *server_credentials_object) { + object_init_ex(server_credentials_object, grpc_ce_server_credentials); + wrapped_grpc_server_credentials *server_credentials = + Z_WRAPPED_GRPC_SERVER_CREDS_P(server_credentials_object); + server_credentials->wrapped = wrapped; +} + +#endif + /** * Create SSL credentials. * @param string pem_root_certs PEM encoding of the server root certificates @@ -103,7 +144,11 @@ PHP_METHOD(ServerCredentials, createSsl) { char *pem_root_certs = 0; grpc_ssl_pem_key_cert_pair pem_key_cert_pair; +#if PHP_MAJOR_VERSION < 7 int root_certs_length = 0, private_key_length, cert_chain_length; +#else + size_t root_certs_length = 0, private_key_length, cert_chain_length; +#endif /* "s!ss" == 1 nullable string, 2 strings */ /* TODO: support multiple key cert pairs. */ @@ -120,17 +165,33 @@ PHP_METHOD(ServerCredentials, createSsl) { grpc_server_credentials *creds = grpc_ssl_server_credentials_create_ex( pem_root_certs, &pem_key_cert_pair, 1, GRPC_SSL_DONT_REQUEST_CLIENT_CERTIFICATE, NULL); +#if PHP_MAJOR_VERSION < 7 zval *creds_object = grpc_php_wrap_server_credentials(creds TSRMLS_CC); RETURN_DESTROY_ZVAL(creds_object); +#else + grpc_php_wrap_server_credentials(creds, return_value); + RETURN_DESTROY_ZVAL(return_value); +#endif } static zend_function_entry server_credentials_methods[] = { - PHP_ME(ServerCredentials, createSsl, NULL, - ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) PHP_FE_END}; + PHP_ME(ServerCredentials, createSsl, NULL, + ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) + PHP_FE_END + }; void grpc_init_server_credentials(TSRMLS_D) { zend_class_entry ce; INIT_CLASS_ENTRY(ce, "Grpc\\ServerCredentials", server_credentials_methods); ce.create_object = create_wrapped_grpc_server_credentials; grpc_ce_server_credentials = zend_register_internal_class(&ce TSRMLS_CC); +#if PHP_MAJOR_VERSION >= 7 + memcpy(&server_credentials_ce_handlers, + zend_get_std_object_handlers(), + sizeof(zend_object_handlers)); + server_credentials_ce_handlers.offset = + XtOffsetOf(wrapped_grpc_server_credentials, std); + server_credentials_ce_handlers.free_obj = + free_wrapped_grpc_server_credentials; +#endif } diff --git a/src/php/ext/grpc/server_credentials.h b/src/php/ext/grpc/server_credentials.h index 7101d65000..d37fafc0dc 100755 --- a/src/php/ext/grpc/server_credentials.h +++ b/src/php/ext/grpc/server_credentials.h @@ -49,14 +49,34 @@ /* Class entry for the Server_Credentials PHP class */ extern zend_class_entry *grpc_ce_server_credentials; +#if PHP_MAJOR_VERSION < 7 + /* Wrapper struct for grpc_server_credentials that can be associated with a PHP * object */ typedef struct wrapped_grpc_server_credentials { zend_object std; + grpc_server_credentials *wrapped; +} wrapped_grpc_server_credentials; +#else + +typedef struct wrapped_grpc_server_credentials { grpc_server_credentials *wrapped; + zend_object std; } wrapped_grpc_server_credentials; +static inline wrapped_grpc_server_credentials +*wrapped_grpc_server_creds_from_obj(zend_object *obj) { + return (wrapped_grpc_server_credentials*) + ((char*)(obj) - + XtOffsetOf(wrapped_grpc_server_credentials, std)); +} + +#define Z_WRAPPED_GRPC_SERVER_CREDS_P(zv) \ + wrapped_grpc_server_creds_from_obj(Z_OBJ_P((zv))) + +#endif /* PHP_MAJOR_VERSION */ + /* Initializes the Server_Credentials PHP class */ void grpc_init_server_credentials(TSRMLS_D); diff --git a/src/php/ext/grpc/timeval.c b/src/php/ext/grpc/timeval.c index 5e242162a8..8dc997c2dd 100644 --- a/src/php/ext/grpc/timeval.c +++ b/src/php/ext/grpc/timeval.c @@ -52,8 +52,14 @@ zend_class_entry *grpc_ce_timeval; +#if PHP_MAJOR_VERSION < 7 + /* Frees and destroys an instance of wrapped_grpc_call */ -void free_wrapped_grpc_timeval(void *object TSRMLS_DC) { efree(object); } +void free_wrapped_grpc_timeval(void *object TSRMLS_DC) { + wrapped_grpc_timeval *timeval = (wrapped_grpc_timeval *)object; + zend_object_std_dtor(&timeval->std TSRMLS_CC); + efree(timeval); +} /* Initializes an instance of wrapped_grpc_timeval to be associated with an * object of a class specified by class_type */ @@ -83,14 +89,50 @@ zval *grpc_php_wrap_timeval(gpr_timespec wrapped TSRMLS_DC) { return timeval_object; } +#else + +static zend_object_handlers timeval_ce_handlers; + +/* Frees and destroys an instance of wrapped_grpc_call */ +static void free_wrapped_grpc_timeval(zend_object *object) { + wrapped_grpc_timeval *timeval = wrapped_grpc_timeval_from_obj(object); + zend_object_std_dtor(&timeval->std); +} + +/* Initializes an instance of wrapped_grpc_timeval to be associated with an + * object of a class specified by class_type */ +zend_object *create_wrapped_grpc_timeval(zend_class_entry *class_type) { + wrapped_grpc_timeval *intern; + intern = ecalloc(1, sizeof(wrapped_grpc_timeval) + + zend_object_properties_size(class_type)); + zend_object_std_init(&intern->std, class_type); + object_properties_init(&intern->std, class_type); + intern->std.handlers = &timeval_ce_handlers; + return &intern->std; +} + +void grpc_php_wrap_timeval(gpr_timespec wrapped, zval *timeval_object) { + object_init_ex(timeval_object, grpc_ce_timeval); + wrapped_grpc_timeval *timeval = Z_WRAPPED_GRPC_TIMEVAL_P(timeval_object); + memcpy(&timeval->wrapped, &wrapped, sizeof(gpr_timespec)); +} + +#endif + /** * Constructs a new instance of the Timeval class * @param long $usec The number of microseconds in the interval */ PHP_METHOD(Timeval, __construct) { +#if PHP_MAJOR_VERSION < 7 wrapped_grpc_timeval *timeval = (wrapped_grpc_timeval *)zend_object_store_get_object(getThis() TSRMLS_CC); long microseconds; +#else + wrapped_grpc_timeval *timeval = Z_WRAPPED_GRPC_TIMEVAL_P(getThis()); + zend_long microseconds; +#endif + /* "l" == 1 long */ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "l", µseconds) == FAILURE) { @@ -110,6 +152,7 @@ PHP_METHOD(Timeval, __construct) { */ PHP_METHOD(Timeval, add) { zval *other_obj; + /* "O" == 1 Object */ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "O", &other_obj, grpc_ce_timeval) == FAILURE) { @@ -117,13 +160,23 @@ PHP_METHOD(Timeval, add) { "add expects a Timeval", 1 TSRMLS_CC); return; } +#if PHP_MAJOR_VERSION < 7 wrapped_grpc_timeval *self = (wrapped_grpc_timeval *)zend_object_store_get_object(getThis() TSRMLS_CC); wrapped_grpc_timeval *other = (wrapped_grpc_timeval *)zend_object_store_get_object(other_obj TSRMLS_CC); zval *sum = - grpc_php_wrap_timeval(gpr_time_add(self->wrapped, other->wrapped) TSRMLS_CC); + grpc_php_wrap_timeval(gpr_time_add(self->wrapped, other->wrapped) + TSRMLS_CC); RETURN_DESTROY_ZVAL(sum); +#else + wrapped_grpc_timeval *self = Z_WRAPPED_GRPC_TIMEVAL_P(getThis()); + wrapped_grpc_timeval *other = Z_WRAPPED_GRPC_TIMEVAL_P(other_obj); + + grpc_php_wrap_timeval(gpr_time_add(self->wrapped, other->wrapped), + return_value); + RETURN_DESTROY_ZVAL(return_value); +#endif } /** @@ -134,6 +187,7 @@ PHP_METHOD(Timeval, add) { */ PHP_METHOD(Timeval, subtract) { zval *other_obj; + /* "O" == 1 Object */ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "O", &other_obj, grpc_ce_timeval) == FAILURE) { @@ -141,13 +195,22 @@ PHP_METHOD(Timeval, subtract) { "subtract expects a Timeval", 1 TSRMLS_CC); return; } +#if PHP_MAJOR_VERSION < 7 wrapped_grpc_timeval *self = (wrapped_grpc_timeval *)zend_object_store_get_object(getThis() TSRMLS_CC); wrapped_grpc_timeval *other = (wrapped_grpc_timeval *)zend_object_store_get_object(other_obj TSRMLS_CC); zval *diff = - grpc_php_wrap_timeval(gpr_time_sub(self->wrapped, other->wrapped) TSRMLS_CC); + grpc_php_wrap_timeval(gpr_time_sub(self->wrapped, other->wrapped) + TSRMLS_CC); RETURN_DESTROY_ZVAL(diff); +#else + wrapped_grpc_timeval *self = Z_WRAPPED_GRPC_TIMEVAL_P(getThis()); + wrapped_grpc_timeval *other = Z_WRAPPED_GRPC_TIMEVAL_P(other_obj); + grpc_php_wrap_timeval(gpr_time_sub(self->wrapped, other->wrapped), + return_value); + RETURN_DESTROY_ZVAL(return_value); +#endif } /** @@ -158,7 +221,9 @@ PHP_METHOD(Timeval, subtract) { * @return long */ PHP_METHOD(Timeval, compare) { - zval *a_obj, *b_obj; + zval *a_obj; + zval *b_obj; + /* "OO" == 2 Objects */ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "OO", &a_obj, grpc_ce_timeval, &b_obj, @@ -167,10 +232,15 @@ PHP_METHOD(Timeval, compare) { "compare expects two Timevals", 1 TSRMLS_CC); return; } +#if PHP_MAJOR_VERSION < 7 wrapped_grpc_timeval *a = (wrapped_grpc_timeval *)zend_object_store_get_object(a_obj TSRMLS_CC); wrapped_grpc_timeval *b = (wrapped_grpc_timeval *)zend_object_store_get_object(b_obj TSRMLS_CC); +#else + wrapped_grpc_timeval *a = Z_WRAPPED_GRPC_TIMEVAL_P(a_obj); + wrapped_grpc_timeval *b = Z_WRAPPED_GRPC_TIMEVAL_P(b_obj); +#endif long result = gpr_time_cmp(a->wrapped, b->wrapped); RETURN_LONG(result); } @@ -183,7 +253,10 @@ PHP_METHOD(Timeval, compare) { * @return bool True if $a and $b are within $threshold, False otherwise */ PHP_METHOD(Timeval, similar) { - zval *a_obj, *b_obj, *thresh_obj; + zval *a_obj; + zval *b_obj; + zval *thresh_obj; + /* "OOO" == 3 Objects */ if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "OOO", &a_obj, grpc_ce_timeval, &b_obj, grpc_ce_timeval, @@ -192,6 +265,7 @@ PHP_METHOD(Timeval, similar) { "compare expects three Timevals", 1 TSRMLS_CC); return; } +#if PHP_MAJOR_VERSION < 7 wrapped_grpc_timeval *a = (wrapped_grpc_timeval *)zend_object_store_get_object(a_obj TSRMLS_CC); wrapped_grpc_timeval *b = @@ -199,6 +273,11 @@ PHP_METHOD(Timeval, similar) { wrapped_grpc_timeval *thresh = (wrapped_grpc_timeval *)zend_object_store_get_object( thresh_obj TSRMLS_CC); +#else + wrapped_grpc_timeval *a = Z_WRAPPED_GRPC_TIMEVAL_P(a_obj); + wrapped_grpc_timeval *b = Z_WRAPPED_GRPC_TIMEVAL_P(b_obj); + wrapped_grpc_timeval *thresh = Z_WRAPPED_GRPC_TIMEVAL_P(thresh_obj); +#endif int result = gpr_time_similar(a->wrapped, b->wrapped, thresh->wrapped); RETURN_BOOL(result); } @@ -208,8 +287,13 @@ PHP_METHOD(Timeval, similar) { * @return Timeval The current time */ PHP_METHOD(Timeval, now) { +#if PHP_MAJOR_VERSION < 7 zval *now = grpc_php_wrap_timeval(gpr_now(GPR_CLOCK_REALTIME) TSRMLS_CC); RETURN_DESTROY_ZVAL(now); +#else + grpc_php_wrap_timeval(gpr_now(GPR_CLOCK_REALTIME), return_value); + RETURN_DESTROY_ZVAL(return_value); +#endif } /** @@ -217,11 +301,18 @@ PHP_METHOD(Timeval, now) { * @return Timeval Zero length time interval */ PHP_METHOD(Timeval, zero) { +#if PHP_MAJOR_VERSION < 7 zval *grpc_php_timeval_zero = grpc_php_wrap_timeval(gpr_time_0(GPR_CLOCK_REALTIME) TSRMLS_CC); RETURN_ZVAL(grpc_php_timeval_zero, false, /* Copy original before returning? */ true /* Destroy original before returning */); +#else + grpc_php_wrap_timeval(gpr_time_0(GPR_CLOCK_REALTIME), return_value); + RETURN_ZVAL(return_value, + false, /* Copy original before returning? */ + true /* Destroy original before returning */); +#endif } /** @@ -229,9 +320,14 @@ PHP_METHOD(Timeval, zero) { * @return Timeval Infinite future time value */ PHP_METHOD(Timeval, infFuture) { +#if PHP_MAJOR_VERSION < 7 zval *grpc_php_timeval_inf_future = grpc_php_wrap_timeval(gpr_inf_future(GPR_CLOCK_REALTIME) TSRMLS_CC); RETURN_DESTROY_ZVAL(grpc_php_timeval_inf_future); +#else + grpc_php_wrap_timeval(gpr_inf_future(GPR_CLOCK_REALTIME), return_value); + RETURN_DESTROY_ZVAL(return_value); +#endif } /** @@ -239,9 +335,14 @@ PHP_METHOD(Timeval, infFuture) { * @return Timeval Infinite past time value */ PHP_METHOD(Timeval, infPast) { +#if PHP_MAJOR_VERSION < 7 zval *grpc_php_timeval_inf_past = grpc_php_wrap_timeval(gpr_inf_past(GPR_CLOCK_REALTIME) TSRMLS_CC); RETURN_DESTROY_ZVAL(grpc_php_timeval_inf_past); +#else + grpc_php_wrap_timeval(gpr_inf_past(GPR_CLOCK_REALTIME), return_value); + RETURN_DESTROY_ZVAL(return_value); +#endif } /** @@ -249,28 +350,41 @@ PHP_METHOD(Timeval, infPast) { * @return void */ PHP_METHOD(Timeval, sleepUntil) { +#if PHP_MAJOR_VERSION < 7 wrapped_grpc_timeval *this = (wrapped_grpc_timeval *)zend_object_store_get_object(getThis() TSRMLS_CC); +#else + wrapped_grpc_timeval *this = Z_WRAPPED_GRPC_TIMEVAL_P(getThis()); +#endif gpr_sleep_until(this->wrapped); } static zend_function_entry timeval_methods[] = { - PHP_ME(Timeval, __construct, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_CTOR) - PHP_ME(Timeval, add, NULL, ZEND_ACC_PUBLIC) - PHP_ME(Timeval, compare, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) - PHP_ME(Timeval, infFuture, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) - PHP_ME(Timeval, infPast, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) - PHP_ME(Timeval, now, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) - PHP_ME(Timeval, similar, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) - PHP_ME(Timeval, sleepUntil, NULL, ZEND_ACC_PUBLIC) - PHP_ME(Timeval, subtract, NULL, ZEND_ACC_PUBLIC) - PHP_ME(Timeval, zero, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) PHP_FE_END}; + PHP_ME(Timeval, __construct, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_CTOR) + PHP_ME(Timeval, add, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Timeval, compare, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) + PHP_ME(Timeval, infFuture, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) + PHP_ME(Timeval, infPast, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) + PHP_ME(Timeval, now, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) + PHP_ME(Timeval, similar, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) + PHP_ME(Timeval, sleepUntil, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Timeval, subtract, NULL, ZEND_ACC_PUBLIC) + PHP_ME(Timeval, zero, NULL, ZEND_ACC_PUBLIC | ZEND_ACC_STATIC) + PHP_FE_END +}; void grpc_init_timeval(TSRMLS_D) { zend_class_entry ce; INIT_CLASS_ENTRY(ce, "Grpc\\Timeval", timeval_methods); ce.create_object = create_wrapped_grpc_timeval; grpc_ce_timeval = zend_register_internal_class(&ce TSRMLS_CC); +#if PHP_MAJOR_VERSION >= 7 + memcpy(&timeval_ce_handlers, zend_get_std_object_handlers(), + sizeof(zend_object_handlers)); + timeval_ce_handlers.offset = + XtOffsetOf(wrapped_grpc_timeval, std); + timeval_ce_handlers.free_obj = free_wrapped_grpc_timeval; +#endif } void grpc_shutdown_timeval(TSRMLS_D) {} diff --git a/src/php/ext/grpc/timeval.h b/src/php/ext/grpc/timeval.h index 7456eb6d58..d4eb2facde 100755 --- a/src/php/ext/grpc/timeval.h +++ b/src/php/ext/grpc/timeval.h @@ -50,12 +50,31 @@ extern zend_class_entry *grpc_ce_timeval; /* Wrapper struct for timeval that can be associated with a PHP object */ +#if PHP_MAJOR_VERSION < 7 + typedef struct wrapped_grpc_timeval { zend_object std; + gpr_timespec wrapped; +} wrapped_grpc_timeval; +#else + +typedef struct wrapped_grpc_timeval { gpr_timespec wrapped; + zend_object std; } wrapped_grpc_timeval; +static inline wrapped_grpc_timeval +*wrapped_grpc_timeval_from_obj(zend_object *obj) { + return (wrapped_grpc_timeval*)((char*)(obj) - + XtOffsetOf(wrapped_grpc_timeval, std)); +} + +#define Z_WRAPPED_GRPC_TIMEVAL_P(zv) \ + wrapped_grpc_timeval_from_obj(Z_OBJ_P((zv))) + +#endif /* PHP_MAJOR_VERSION */ + /* Initialize the Timeval PHP class */ void grpc_init_timeval(TSRMLS_D); @@ -63,6 +82,10 @@ void grpc_init_timeval(TSRMLS_D); void grpc_shutdown_timeval(TSRMLS_D); /* Creates a Timeval object that wraps the given timeval struct */ +#if PHP_MAJOR_VERSION < 7 zval *grpc_php_wrap_timeval(gpr_timespec wrapped TSRMLS_DC); +#else +void grpc_php_wrap_timeval(gpr_timespec wrapped, zval *timeval_object); +#endif /* PHP_MAJOR_VERSION */ #endif /* NET_GRPC_PHP_GRPC_TIMEVAL_H_ */ diff --git a/src/php/lib/Grpc/BaseStub.php b/src/php/lib/Grpc/BaseStub.php index df3fe85d44..2fec1bd9cc 100755..100644 --- a/src/php/lib/Grpc/BaseStub.php +++ b/src/php/lib/Grpc/BaseStub.php @@ -84,8 +84,8 @@ class BaseStub } if ($channel) { if (!is_a($channel, 'Channel')) { - throw new \Exception("The channel argument is not a". - "Channel object"); + throw new \Exception('The channel argument is not a'. + 'Channel object'); } $this->channel = $channel; } else { diff --git a/src/php/lib/Grpc/BidiStreamingCall.php b/src/php/lib/Grpc/BidiStreamingCall.php index 95e51c5088..c2fdb94b86 100644 --- a/src/php/lib/Grpc/BidiStreamingCall.php +++ b/src/php/lib/Grpc/BidiStreamingCall.php @@ -113,6 +113,7 @@ class BidiStreamingCall extends AbstractCall ]); $this->trailing_metadata = $status_event->status->metadata; + return $status_event->status; } } diff --git a/src/php/lib/Grpc/ClientStreamingCall.php b/src/php/lib/Grpc/ClientStreamingCall.php index 315a406735..4050f7ed06 100644 --- a/src/php/lib/Grpc/ClientStreamingCall.php +++ b/src/php/lib/Grpc/ClientStreamingCall.php @@ -88,6 +88,7 @@ class ClientStreamingCall extends AbstractCall $status = $event->status; $this->trailing_metadata = $status->metadata; + return [$this->deserializeResponse($event->message), $status]; } } diff --git a/src/php/lib/Grpc/ServerStreamingCall.php b/src/php/lib/Grpc/ServerStreamingCall.php index 53599fe4ae..ba89d9f972 100644 --- a/src/php/lib/Grpc/ServerStreamingCall.php +++ b/src/php/lib/Grpc/ServerStreamingCall.php @@ -92,6 +92,7 @@ class ServerStreamingCall extends AbstractCall ]); $this->trailing_metadata = $status_event->status->metadata; + return $status_event->status; } } diff --git a/src/php/lib/Grpc/UnaryCall.php b/src/php/lib/Grpc/UnaryCall.php index b114b771b8..a71b05dc93 100644 --- a/src/php/lib/Grpc/UnaryCall.php +++ b/src/php/lib/Grpc/UnaryCall.php @@ -77,6 +77,7 @@ class UnaryCall extends AbstractCall $status = $event->status; $this->trailing_metadata = $status->metadata; + return [$this->deserializeResponse($event->message), $status]; } } diff --git a/src/php/tests/interop/interop_client.php b/src/php/tests/interop/interop_client.php index 43b3199d92..bf40549a04 100755 --- a/src/php/tests/interop/interop_client.php +++ b/src/php/tests/interop/interop_client.php @@ -450,7 +450,7 @@ function statusCodeAndMessage($stub) { $echo_status = new grpc\testing\EchoStatus(); $echo_status->setCode(2); - $echo_status->setMessage("test status message"); + $echo_status->setMessage('test status message'); $request = new grpc\testing\SimpleRequest(); $request->setResponseStatus($echo_status); @@ -460,7 +460,7 @@ function statusCodeAndMessage($stub) hardAssert($status->code === 2, 'Received unexpected status code'); - hardAssert($status->details === "test status message", + hardAssert($status->details === 'test status message', 'Received unexpected status details'); $streaming_call = $stub->FullDuplexCall(); @@ -473,7 +473,7 @@ function statusCodeAndMessage($stub) $status = $streaming_call->getStatus(); hardAssert($status->code === 2, 'Received unexpected status code'); - hardAssert($status->details === "test status message", + hardAssert($status->details === 'test status message', 'Received unexpected status details'); } @@ -570,9 +570,9 @@ function _makeStub($args) } if ($test_case == 'unimplemented_method') { - $stub = new grpc\testing\UnimplementedServiceClient($server_address, $opts); + $stub = new grpc\testing\UnimplementedServiceClient($server_address, $opts); } else { - $stub = new grpc\testing\TestServiceClient($server_address, $opts); + $stub = new grpc\testing\TestServiceClient($server_address, $opts); } return $stub; diff --git a/src/php/tests/unit_tests/CallCredentialsTest.php b/src/php/tests/unit_tests/CallCredentialsTest.php index 5fec06cd13..1994c8afe5 100644 --- a/src/php/tests/unit_tests/CallCredentialsTest.php +++ b/src/php/tests/unit_tests/CallCredentialsTest.php @@ -148,7 +148,8 @@ class CallCredentialsTest extends PHPUnit_Framework_TestCase $this->call_credentials, $call_credentials2 ); - $this->assertSame('Grpc\CallCredentials', get_class($call_credentials3)); + $this->assertSame('Grpc\CallCredentials', + get_class($call_credentials3)); } /** diff --git a/src/php/tests/unit_tests/CallTest.php b/src/php/tests/unit_tests/CallTest.php index fa026f0935..d736f51546 100755..100644 --- a/src/php/tests/unit_tests/CallTest.php +++ b/src/php/tests/unit_tests/CallTest.php @@ -50,6 +50,18 @@ class CallTest extends PHPUnit_Framework_TestCase Grpc\Timeval::infFuture()); } + public function tearDown() + { + unset($this->call); + unset($this->channel); + } + + public function testConstructor() + { + $this->assertSame('Grpc\Call', get_class($this->call)); + $this->assertObjectHasAttribute('channel', $this->call); + } + public function testAddEmptyMetadata() { $batch = [ @@ -81,7 +93,8 @@ class CallTest extends PHPUnit_Framework_TestCase { $batch = [ Grpc\OP_SEND_INITIAL_METADATA => ['key1' => ['value1'], - 'key2' => ['value2', 'value3'], ], + 'key2' => ['value2', + 'value3'], ], ]; $result = $this->call->startBatch($batch); $this->assertTrue($result->send_metadata); @@ -118,4 +131,38 @@ class CallTest extends PHPUnit_Framework_TestCase ]; $result = $this->call->startBatch($batch); } + + /** + * @expectedException InvalidArgumentException + */ + public function testInvalidConstuctor() + { + $this->call = new Grpc\Call(); + $this->assertNull($this->call); + } + + /** + * @expectedException InvalidArgumentException + */ + public function testInvalidConstuctor2() + { + $this->call = new Grpc\Call('hi', 'hi', 'hi'); + $this->assertNull($this->call); + } + + /** + * @expectedException InvalidArgumentException + */ + public function testInvalidSetCredentials() + { + $this->call->setCredentials('hi'); + } + + /** + * @expectedException InvalidArgumentException + */ + public function testInvalidSetCredentials2() + { + $this->call->setCredentials([]); + } } diff --git a/src/php/tests/unit_tests/ChannelCredentialsTest.php b/src/php/tests/unit_tests/ChannelCredentialsTest.php index 56c1d8f006..e822929ccd 100644 --- a/src/php/tests/unit_tests/ChannelCredentialsTest.php +++ b/src/php/tests/unit_tests/ChannelCredentialsTest.php @@ -42,10 +42,23 @@ class ChanellCredentialsTest extends PHPUnit_Framework_TestCase { } - public function testCreateDefault() + public function testCreateSslWith3Null() { - $channel_credentials = Grpc\ChannelCredentials::createDefault(); - $this->assertSame('Grpc\ChannelCredentials', get_class($channel_credentials)); + $channel_credentials = Grpc\ChannelCredentials::createSsl(null, null, + null); + $this->assertNotNull($channel_credentials); + } + + public function testCreateSslWith3NullString() + { + $channel_credentials = Grpc\ChannelCredentials::createSsl('', '', ''); + $this->assertNotNull($channel_credentials); + } + + public function testCreateInsecure() + { + $channel_credentials = Grpc\ChannelCredentials::createInsecure(); + $this->assertNull($channel_credentials); } /** @@ -64,10 +77,4 @@ class ChanellCredentialsTest extends PHPUnit_Framework_TestCase $channel_credentials = Grpc\ChannelCredentials::createComposite( 'something', 'something'); } - - public function testCreateInsecure() - { - $channel_credentials = Grpc\ChannelCredentials::createInsecure(); - $this->assertNull($channel_credentials); - } } diff --git a/src/php/tests/unit_tests/ChannelTest.php b/src/php/tests/unit_tests/ChannelTest.php index a1f9053c39..4b35b1a28c 100644 --- a/src/php/tests/unit_tests/ChannelTest.php +++ b/src/php/tests/unit_tests/ChannelTest.php @@ -40,6 +40,7 @@ class ChannelTest extends PHPUnit_Framework_TestCase public function tearDown() { + unset($this->channel); } public function testInsecureCredentials() @@ -53,6 +54,82 @@ class ChannelTest extends PHPUnit_Framework_TestCase $this->assertSame('Grpc\Channel', get_class($this->channel)); } + public function testGetConnectivityState() + { + $this->channel = new Grpc\Channel('localhost:0', + ['credentials' => Grpc\ChannelCredentials::createInsecure()]); + $state = $this->channel->getConnectivityState(); + $this->assertEquals(0, $state); + } + + public function testGetConnectivityStateWithInt() + { + $this->channel = new Grpc\Channel('localhost:0', + ['credentials' => Grpc\ChannelCredentials::createInsecure()]); + $state = $this->channel->getConnectivityState(123); + $this->assertEquals(0, $state); + } + + public function testGetConnectivityStateWithString() + { + $this->channel = new Grpc\Channel('localhost:0', + ['credentials' => Grpc\ChannelCredentials::createInsecure()]); + $state = $this->channel->getConnectivityState('hello'); + $this->assertEquals(0, $state); + } + + public function testGetConnectivityStateWithBool() + { + $this->channel = new Grpc\Channel('localhost:0', + ['credentials' => Grpc\ChannelCredentials::createInsecure()]); + $state = $this->channel->getConnectivityState(true); + $this->assertEquals(0, $state); + } + + public function testGetTarget() + { + $this->channel = new Grpc\Channel('localhost:8888', + ['credentials' => Grpc\ChannelCredentials::createInsecure()]); + $target = $this->channel->getTarget(); + $this->assertTrue(is_string($target)); + } + + public function testWatchConnectivityState() + { + $this->channel = new Grpc\Channel('localhost:0', + ['credentials' => Grpc\ChannelCredentials::createInsecure()]); + $time = new Grpc\Timeval(1000); + $state = $this->channel->watchConnectivityState(123, $time); + $this->assertTrue($state); + unset($time); + } + + public function testClose() + { + $this->channel = new Grpc\Channel('localhost:0', + ['credentials' => Grpc\ChannelCredentials::createInsecure()]); + $this->assertNotNull($this->channel); + $this->channel->close(); + } + + /** + * @expectedException InvalidArgumentException + */ + public function testInvalidConstructorWithNull() + { + $this->channel = new Grpc\Channel(); + $this->assertNull($this->channel); + } + + /** + * @expectedException InvalidArgumentException + */ + public function testInvalidConstructorWith() + { + $this->channel = new Grpc\Channel('localhost', 'invalid'); + $this->assertNull($this->channel); + } + /** * @expectedException InvalidArgumentException */ @@ -78,4 +155,34 @@ class ChannelTest extends PHPUnit_Framework_TestCase ] ); } + + /** + * @expectedException InvalidArgumentException + */ + public function testInvalidGetConnectivityStateWithArray() + { + $this->channel = new Grpc\Channel('localhost:0', + ['credentials' => Grpc\ChannelCredentials::createInsecure()]); + $this->channel->getConnectivityState([]); + } + + /** + * @expectedException InvalidArgumentException + */ + public function testInvalidWatchConnectivityState() + { + $this->channel = new Grpc\Channel('localhost:0', + ['credentials' => Grpc\ChannelCredentials::createInsecure()]); + $this->channel->watchConnectivityState([]); + } + + /** + * @expectedException InvalidArgumentException + */ + public function testInvalidWatchConnectivityState2() + { + $this->channel = new Grpc\Channel('localhost:0', + ['credentials' => Grpc\ChannelCredentials::createInsecure()]); + $this->channel->watchConnectivityState(1, 'hi'); + } } diff --git a/src/php/tests/unit_tests/EndToEndTest.php b/src/php/tests/unit_tests/EndToEndTest.php index 2b09f9d112..09364580c0 100755..100644 --- a/src/php/tests/unit_tests/EndToEndTest.php +++ b/src/php/tests/unit_tests/EndToEndTest.php @@ -521,7 +521,8 @@ class EndToEndTest extends PHPUnit_Framework_TestCase public function testGetConnectivityState() { - $this->assertTrue($this->channel->getConnectivityState() == Grpc\CHANNEL_IDLE); + $this->assertTrue($this->channel->getConnectivityState() == + Grpc\CHANNEL_IDLE); } public function testWatchConnectivityStateFailed() diff --git a/src/php/tests/unit_tests/SecureEndToEndTest.php b/src/php/tests/unit_tests/SecureEndToEndTest.php index c388ee5031..c388ee5031 100755..100644 --- a/src/php/tests/unit_tests/SecureEndToEndTest.php +++ b/src/php/tests/unit_tests/SecureEndToEndTest.php diff --git a/src/php/tests/unit_tests/ServerTest.php b/src/php/tests/unit_tests/ServerTest.php index 76aaa06970..f2346ab113 100644 --- a/src/php/tests/unit_tests/ServerTest.php +++ b/src/php/tests/unit_tests/ServerTest.php @@ -36,10 +36,70 @@ class ServerTest extends PHPUnit_Framework_TestCase { public function setUp() { + $this->server = null; } public function tearDown() { + unset($this->server); + } + + public function testConstructorWithNull() + { + $this->server = new Grpc\Server(); + $this->assertNotNull($this->server); + } + + public function testConstructorWithNullArray() + { + $this->server = new Grpc\Server([]); + $this->assertNotNull($this->server); + } + + public function testConstructorWithArray() + { + // key of array must be string + $this->server = new Grpc\Server(['ip' => '127.0.0.1', + 'port' => '8080', ]); + $this->assertNotNull($this->server); + } + + public function testRequestCall() + { + $this->server = new Grpc\Server(); + $port = $this->server->addHttp2Port('0.0.0.0:8888'); + $this->server->start(); + $channel = new Grpc\Channel('localhost:8888', + ['credentials' => Grpc\ChannelCredentials::createInsecure()]); + + $deadline = Grpc\Timeval::infFuture(); + $call = new Grpc\Call($channel, 'dummy_method', $deadline); + + $event = $call->startBatch([Grpc\OP_SEND_INITIAL_METADATA => [], + Grpc\OP_SEND_CLOSE_FROM_CLIENT => true, + ]); + + $c = $this->server->requestCall(); + $this->assertObjectHasAttribute('call', $c); + $this->assertObjectHasAttribute('method', $c); + $this->assertSame('dummy_method', $c->method); + $this->assertObjectHasAttribute('host', $c); + $this->assertTrue(is_string($c->host)); + $this->assertObjectHasAttribute('absolute_deadline', $c); + $this->assertObjectHasAttribute('metadata', $c); + + unset($call); + unset($channel); + } + + private function createSslObj() + { + $server_credentials = Grpc\ServerCredentials::createSsl( + null, + file_get_contents(dirname(__FILE__).'/../data/server1.key'), + file_get_contents(dirname(__FILE__).'/../data/server1.pem')); + + return $server_credentials; } /** @@ -47,7 +107,8 @@ class ServerTest extends PHPUnit_Framework_TestCase */ public function testInvalidConstructor() { - $server = new Grpc\Server('invalid_host'); + $this->server = new Grpc\Server('invalid_host'); + $this->assertNull($this->server); } /** @@ -56,7 +117,7 @@ class ServerTest extends PHPUnit_Framework_TestCase public function testInvalidAddHttp2Port() { $this->server = new Grpc\Server([]); - $this->port = $this->server->addHttp2Port(['0.0.0.0:0']); + $port = $this->server->addHttp2Port(['0.0.0.0:0']); } /** @@ -65,6 +126,24 @@ class ServerTest extends PHPUnit_Framework_TestCase public function testInvalidAddSecureHttp2Port() { $this->server = new Grpc\Server([]); - $this->port = $this->server->addSecureHttp2Port(['0.0.0.0:0']); + $port = $this->server->addSecureHttp2Port(['0.0.0.0:0']); + } + + /** + * @expectedException InvalidArgumentException + */ + public function testInvalidAddSecureHttp2Port2() + { + $this->server = new Grpc\Server(); + $port = $this->server->addSecureHttp2Port('0.0.0.0:0'); + } + + /** + * @expectedException InvalidArgumentException + */ + public function testInvalidAddSecureHttp2Port3() + { + $this->server = new Grpc\Server(); + $port = $this->server->addSecureHttp2Port('0.0.0.0:0', 'invalid'); } } diff --git a/src/php/tests/unit_tests/TimevalTest.php b/src/php/tests/unit_tests/TimevalTest.php index a3dbce079f..2d19f64c79 100755..100644 --- a/src/php/tests/unit_tests/TimevalTest.php +++ b/src/php/tests/unit_tests/TimevalTest.php @@ -33,6 +33,57 @@ */ class TimevalTest extends PHPUnit_Framework_TestCase { + public function setUp() + { + } + + public function tearDown() + { + unset($this->time); + } + + public function testConstructorWithInt() + { + $this->time = new Grpc\Timeval(1234); + $this->assertNotNull($this->time); + $this->assertSame('Grpc\Timeval', get_class($this->time)); + } + + public function testConstructorWithNegative() + { + $this->time = new Grpc\Timeval(-123); + $this->assertNotNull($this->time); + $this->assertSame('Grpc\Timeval', get_class($this->time)); + } + + public function testConstructorWithZero() + { + $this->time = new Grpc\Timeval(0); + $this->assertNotNull($this->time); + $this->assertSame('Grpc\Timeval', get_class($this->time)); + } + + public function testConstructorWithOct() + { + $this->time = new Grpc\Timeval(0123); + $this->assertNotNull($this->time); + $this->assertSame('Grpc\Timeval', get_class($this->time)); + } + + public function testConstructorWithHex() + { + $this->time = new Grpc\Timeval(0x1A); + $this->assertNotNull($this->time); + $this->assertSame('Grpc\Timeval', get_class($this->time)); + } + + public function testConstructorWithFloat() + { + $this->time = new Grpc\Timeval(123.456); + $this->assertNotNull($this->time); + $this->assertSame('Grpc\Timeval', get_class($this->time)); + } + public function testCompareSame() { $zero = Grpc\Timeval::zero(); @@ -70,6 +121,7 @@ class TimevalTest extends PHPUnit_Framework_TestCase public function testNowAndAdd() { $now = Grpc\Timeval::now(); + $this->assertNotNull($now); $delta = new Grpc\Timeval(1000); $deadline = $now->add($delta); $this->assertGreaterThan(0, Grpc\Timeval::compare($deadline, $now)); @@ -154,5 +206,6 @@ class TimevalTest extends PHPUnit_Framework_TestCase public function testSimilarInvalidParam() { $a = Grpc\Timeval::similar(1000, 1100, 1200); + $this->assertNull($delta); } } diff --git a/src/proto/grpc/testing/control.proto b/src/proto/grpc/testing/control.proto index 20496a8116..ece6910815 100644 --- a/src/proto/grpc/testing/control.proto +++ b/src/proto/grpc/testing/control.proto @@ -229,4 +229,7 @@ message ScenarioResult { repeated int32 server_cores = 5; // An after-the-fact computed summary ScenarioResultSummary summary = 6; + // Information on success or failure of each worker + repeated bool client_success = 7; + repeated bool server_success = 8; } diff --git a/src/python/grpcio/_unixccompiler_patch.py b/src/python/grpcio/_unixccompiler_patch.py index 9a697989b3..894c3ef395 100644 --- a/src/python/grpcio/_unixccompiler_patch.py +++ b/src/python/grpcio/_unixccompiler_patch.py @@ -34,88 +34,44 @@ from distutils import errors from distutils import unixccompiler import os import os.path +import shlex import shutil import sys import tempfile +def _unix_commandfile_spawn(self, command): + """Wrapper around distutils.util.spawn that attempts to use command files. -def _unix_piecemeal_link( - self, target_desc, objects, output_filename, output_dir=None, - libraries=None, library_dirs=None, runtime_library_dirs=None, - export_symbols=None, debug=0, extra_preargs=None, extra_postargs=None, - build_temp=None, target_lang=None): - """`link` externalized method taken almost verbatim from UnixCCompiler. + Meant to replace the CCompiler method `spawn` on UnixCCompiler and its + derivatives (e.g. the MinGW32 compiler). - Modifies the link command for unix-like compilers by using a command file so - that long command line argument strings don't break the command shell's - ARG_MAX character limit. + Some commands like `gcc` (and friends like `clang`) support command files to + work around shell command length limits. """ - objects, output_dir = self._fix_object_args(objects, output_dir) - libraries, library_dirs, runtime_library_dirs = self._fix_lib_args( - libraries, library_dirs, runtime_library_dirs) - # filter out standard library paths, which are not explicitely needed - # for linking - library_dirs = [dir for dir in library_dirs - if not dir in ('/lib', '/lib64', '/usr/lib', '/usr/lib64')] - runtime_library_dirs = [dir for dir in runtime_library_dirs - if not dir in ('/lib', '/lib64', '/usr/lib', '/usr/lib64')] - lib_opts = ccompiler.gen_lib_options(self, library_dirs, runtime_library_dirs, - libraries) - if (not (isinstance(output_dir, str) or isinstance(output_dir, bytes)) - and output_dir is not None): - raise TypeError("'output_dir' must be a string or None") - if output_dir is not None: - output_filename = os.path.join(output_dir, output_filename) - - if self._need_link(objects, output_filename): - ld_args = (objects + self.objects + - lib_opts + ['-o', output_filename]) - if debug: - ld_args[:0] = ['-g'] - if extra_preargs: - ld_args[:0] = extra_preargs - if extra_postargs: - ld_args.extend(extra_postargs) - self.mkpath(os.path.dirname(output_filename)) - try: - if target_desc == ccompiler.CCompiler.EXECUTABLE: - linker = self.linker_exe[:] - else: - linker = self.linker_so[:] - if target_lang == "c++" and self.compiler_cxx: - # skip over environment variable settings if /usr/bin/env - # is used to set up the linker's environment. - # This is needed on OSX. Note: this assumes that the - # normal and C++ compiler have the same environment - # settings. - i = 0 - if os.path.basename(linker[0]) == "env": - i = 1 - while '=' in linker[i]: - i = i + 1 - - linker[i] = self.compiler_cxx[i] - - if sys.platform == 'darwin': - import _osx_support - linker = _osx_support.compiler_fixup(linker, ld_args) - - temporary_directory = tempfile.mkdtemp() - command_filename = os.path.abspath( - os.path.join(temporary_directory, 'command')) - with open(command_filename, 'w') as command_file: - escaped_ld_args = [arg.replace('\\', '\\\\') for arg in ld_args] - command_file.write(' '.join(escaped_ld_args)) - self.spawn(linker + ['@{}'.format(command_filename)]) - except errors.DistutilsExecError: - raise ccompiler.LinkError + # Sometimes distutils embeds the executables as full strings including some + # hard-coded flags rather than as lists. + command = list(shlex.split(command[0])) + list(command[1:]) + command_base = os.path.basename(command[0].strip()) + if command_base == 'ccache': + command_base = command[:2] + command_args = command[2:] + elif command_base.startswith('ccache') or command_base in ['gcc', 'clang', 'clang++', 'g++']: + command_base = command[:1] + command_args = command[1:] else: - log.debug("skipping %s (up-to-date)", output_filename) + return ccompiler.CCompiler.spawn(self, command) + temporary_directory = tempfile.mkdtemp() + command_filename = os.path.abspath(os.path.join(temporary_directory, 'command')) + with open(command_filename, 'w') as command_file: + escaped_args = [arg.replace('\\', '\\\\') for arg in command_args] + command_file.write(' '.join(escaped_args)) + modified_command = command_base + ['@{}'.format(command_filename)] + result = ccompiler.CCompiler.spawn(self, modified_command) + shutil.rmtree(temporary_directory) + return result + -# TODO(atash) try replacing this monkeypatch of the compiler harness' link -# operation with a monkeypatch of the distutils `spawn` that applies -# command-argument-file hacks where it can. Might be cleaner. def monkeypatch_unix_compiler(): """Monkeypatching is dumb, but it's either that or we become maintainers of something much, much bigger.""" - unixccompiler.UnixCCompiler.link = _unix_piecemeal_link + unixccompiler.UnixCCompiler.spawn = _unix_commandfile_spawn diff --git a/src/python/grpcio/grpc/_channel.py b/src/python/grpcio/grpc/_channel.py index 29dbc3a668..3117dd1cb3 100644 --- a/src/python/grpcio/grpc/_channel.py +++ b/src/python/grpcio/grpc/_channel.py @@ -353,12 +353,12 @@ class _Rendezvous(grpc.RpcError, grpc.Future, grpc.Call): else: return max(self._deadline - time.time(), 0) - def add_cancellation_callback(self, callback): + def add_callback(self, callback): with self._state.condition: if self._state.callbacks is None: return False else: - self._state.callbacks.append(lambda unused_future: callback()) + self._state.callbacks.append(lambda: callback()) return True def initial_metadata(self): diff --git a/src/python/grpcio/grpc/beta/_client_adaptations.py b/src/python/grpcio/grpc/beta/_client_adaptations.py index 73415e0be7..e4ee44d7a3 100644 --- a/src/python/grpcio/grpc/beta/_client_adaptations.py +++ b/src/python/grpcio/grpc/beta/_client_adaptations.py @@ -67,7 +67,7 @@ def _abortion(rpc_error_call): error_kind = face.Abortion.Kind.LOCAL_FAILURE if pair is None else pair[0] return face.Abortion( error_kind, rpc_error_call.initial_metadata(), - rpc_error_call.trailing_metadata(), code, rpc_error_code.details()) + rpc_error_call.trailing_metadata(), code, rpc_error_call.details()) def _abortion_error(rpc_error_call): @@ -159,9 +159,11 @@ class _Rendezvous(future.Future, face.Call): return self._call.time_remaining() def add_abortion_callback(self, abortion_callback): - registered = self._call.add_callback( - lambda: abortion_callback(_abortion(self._call))) - return None if registered else _abortion(self._call) + def done_callback(): + if self.code() is not grpc.StatusCode.OK: + abortion_callback(_abortion(self._call)) + registered = self._call.add_callback(done_callback) + return None if registered else done_callback() def protocol_context(self): return _InvocationProtocolContext() diff --git a/src/python/grpcio_health_checking/MANIFEST.in b/src/python/grpcio_health_checking/MANIFEST.in index 7d26647697..7407f646d1 100644 --- a/src/python/grpcio_health_checking/MANIFEST.in +++ b/src/python/grpcio_health_checking/MANIFEST.in @@ -1,3 +1,4 @@ +include grpc_version.py include health_commands.py -graft grpc_health +graft grpc global-exclude *.pyc diff --git a/src/python/grpcio_health_checking/grpc_health/health/v1/__init__.py b/src/python/grpcio_health_checking/grpc/__init__.py index 7086519106..fcc7048815 100644 --- a/src/python/grpcio_health_checking/grpc_health/health/v1/__init__.py +++ b/src/python/grpcio_health_checking/grpc/__init__.py @@ -27,4 +27,4 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - +__import__('pkg_resources').declare_namespace(__name__) diff --git a/src/python/grpcio_health_checking/grpc_health/__init__.py b/src/python/grpcio_health_checking/grpc/health/__init__.py index 7086519106..7086519106 100644 --- a/src/python/grpcio_health_checking/grpc_health/__init__.py +++ b/src/python/grpcio_health_checking/grpc/health/__init__.py diff --git a/src/python/grpcio_health_checking/grpc_health/health/__init__.py b/src/python/grpcio_health_checking/grpc/health/v1/__init__.py index 7086519106..7086519106 100644 --- a/src/python/grpcio_health_checking/grpc_health/health/__init__.py +++ b/src/python/grpcio_health_checking/grpc/health/v1/__init__.py diff --git a/src/python/grpcio_health_checking/grpc_health/health/v1/health.py b/src/python/grpcio_health_checking/grpc/health/v1/health.py index 8da60c70cb..8108ac1096 100644 --- a/src/python/grpcio_health_checking/grpc_health/health/v1/health.py +++ b/src/python/grpcio_health_checking/grpc/health/v1/health.py @@ -31,10 +31,12 @@ import threading -from grpc_health.health.v1 import health_pb2 +import grpc +from grpc.health.v1 import health_pb2 -class HealthServicer(health_pb2.BetaHealthServicer): + +class HealthServicer(health_pb2.HealthServicer): """Servicer handling RPCs for service statuses.""" def __init__(self): @@ -43,14 +45,12 @@ class HealthServicer(health_pb2.BetaHealthServicer): def Check(self, request, context): with self._server_status_lock: - if request.service not in self._server_status: - # TODO(atash): once the Python API has a way of setting the server - # status, bring us into conformance with the health check spec by - # returning the NOT_FOUND status here. - raise NotImplementedError() + status = self._server_status.get(request.service) + if status is None: + context.set_code(grpc.StatusCode.NOT_FOUND) + return health_pb2.HealthCheckResponse() else: - return health_pb2.HealthCheckResponse( - status=self._server_status[request.service]) + return health_pb2.HealthCheckResponse(status=status) def set(self, service, status): """Sets the status of a service. @@ -63,4 +63,3 @@ class HealthServicer(health_pb2.BetaHealthServicer): """ with self._server_status_lock: self._server_status[service] = status - diff --git a/src/python/grpcio_health_checking/grpc_version.py b/src/python/grpcio_health_checking/grpc_version.py new file mode 100644 index 0000000000..2e48fde893 --- /dev/null +++ b/src/python/grpcio_health_checking/grpc_version.py @@ -0,0 +1,32 @@ +# Copyright 2016, Google Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +# AUTO-GENERATED FROM `$REPO_ROOT/templates/src/python/grpcio_health_checking/grpc_version.py.template`!!! + +VERSION='1.0.0rc1' diff --git a/src/python/grpcio_health_checking/health_commands.py b/src/python/grpcio_health_checking/health_commands.py index a7a59f6974..66df25da63 100644 --- a/src/python/grpcio_health_checking/health_commands.py +++ b/src/python/grpcio_health_checking/health_commands.py @@ -29,16 +29,10 @@ """Provides distutils command classes for the GRPC Python setup process.""" -import distutils -import glob import os -import os.path import shutil -import subprocess -import sys import setuptools -from setuptools.command import build_py ROOT_DIR = os.path.abspath(os.path.dirname(os.path.abspath(__file__))) HEALTH_PROTO = os.path.join(ROOT_DIR, '../../proto/grpc/health/v1/health.proto') @@ -60,12 +54,25 @@ class CopyProtoModules(setuptools.Command): if os.path.isfile(HEALTH_PROTO): shutil.copyfile( HEALTH_PROTO, - os.path.join(ROOT_DIR, 'grpc_health/health/v1/health.proto')) + os.path.join(ROOT_DIR, 'grpc/health/v1/health.proto')) -class BuildPy(build_py.build_py): - """Custom project build command.""" +class BuildPackageProtos(setuptools.Command): + """Command to generate project *_pb2.py modules from proto files.""" + + description = 'build grpc protobuf modules' + user_options = [] + + def initialize_options(self): + pass + + def finalize_options(self): + pass def run(self): - self.run_command('build_proto_modules') - build_py.build_py.run(self) + # due to limitations of the proto generator, we require that only *one* + # directory is provided as an 'include' directory. We assume it's the '' key + # to `self.distribution.package_dir` (and get a key error if it's not + # there). + from grpc.tools import command + command.build_package_protos(self.distribution.package_dir['']) diff --git a/src/python/grpcio_health_checking/setup.py b/src/python/grpcio_health_checking/setup.py index 70b4575bf5..727d628885 100644 --- a/src/python/grpcio_health_checking/setup.py +++ b/src/python/grpcio_health_checking/setup.py @@ -30,49 +30,42 @@ """Setup module for the GRPC Python package's optional health checking.""" import os -import os.path import sys -from distutils import core as _core import setuptools -import grpc.tools.command - # Ensure we're in the proper directory whether or not we're being used by pip. os.chdir(os.path.dirname(os.path.abspath(__file__))) # Break import-style to ensure we can actually find our commands module. import health_commands - -PACKAGES = ( - setuptools.find_packages('.') -) +import grpc_version PACKAGE_DIRECTORIES = { '': '.', } SETUP_REQUIRES = ( - 'grpcio-tools>=0.14.0', + 'grpcio-tools>=0.15.0', ) INSTALL_REQUIRES = ( - 'grpcio>=0.13.1', + 'grpcio>=0.15.0', ) COMMAND_CLASS = { # Run preprocess from the repository *before* doing any packaging! 'preprocess': health_commands.CopyProtoModules, - - 'build_proto_modules': grpc.tools.command.BuildProtoModules, - 'build_py': health_commands.BuildPy, + 'build_package_protos': health_commands.BuildPackageProtos, } setuptools.setup( name='grpcio-health-checking', - version='0.14.0', - packages=list(PACKAGES), + version=grpc_version.VERSION, + license='3-clause BSD', package_dir=PACKAGE_DIRECTORIES, + packages=setuptools.find_packages('.'), + namespace_packages=['grpc'], install_requires=INSTALL_REQUIRES, setup_requires=SETUP_REQUIRES, cmdclass=COMMAND_CLASS diff --git a/src/python/grpcio_tests/commands.py b/src/python/grpcio_tests/commands.py index 171829b62f..5ee551cfe1 100644 --- a/src/python/grpcio_tests/commands.py +++ b/src/python/grpcio_tests/commands.py @@ -138,7 +138,7 @@ class BuildPy(build_py.build_py): def run(self): try: - self.run_command('build_proto_modules') + self.run_command('build_package_protos') except CommandError as error: sys.stderr.write('warning: %s\n' % error.message) build_py.build_py.run(self) diff --git a/src/python/grpcio_tests/setup.py b/src/python/grpcio_tests/setup.py index 7eef420bdb..0afaf7dfa2 100644 --- a/src/python/grpcio_tests/setup.py +++ b/src/python/grpcio_tests/setup.py @@ -75,7 +75,7 @@ COMMAND_CLASS = { # Run `preprocess` *before* doing any packaging! 'preprocess': commands.GatherProto, - 'build_proto_modules': grpc.tools.command.BuildProtoModules, + 'build_package_protos': grpc.tools.command.BuildPackageProtos, 'build_py': commands.BuildPy, 'run_interop': commands.RunInterop, 'test_lite': commands.TestLite diff --git a/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py b/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py index 1b63388663..80300d13df 100644 --- a/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py +++ b/src/python/grpcio_tests/tests/health_check/_health_servicer_test.py @@ -27,48 +27,68 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -"""Tests of grpc_health.health.v1.health.""" +"""Tests of grpc.health.v1.health.""" import unittest -from grpc_health.health.v1 import health -from grpc_health.health.v1 import health_pb2 +import grpc +from grpc.framework.foundation import logging_pool +from grpc.health.v1 import health +from grpc.health.v1 import health_pb2 + +from tests.unit.framework.common import test_constants class HealthServicerTest(unittest.TestCase): def setUp(self): - self.servicer = health.HealthServicer() - self.servicer.set('', health_pb2.HealthCheckResponse.SERVING) - self.servicer.set('grpc.test.TestServiceServing', - health_pb2.HealthCheckResponse.SERVING) - self.servicer.set('grpc.test.TestServiceUnknown', - health_pb2.HealthCheckResponse.UNKNOWN) - self.servicer.set('grpc.test.TestServiceNotServing', - health_pb2.HealthCheckResponse.NOT_SERVING) + servicer = health.HealthServicer() + servicer.set('', health_pb2.HealthCheckResponse.SERVING) + servicer.set('grpc.test.TestServiceServing', + health_pb2.HealthCheckResponse.SERVING) + servicer.set('grpc.test.TestServiceUnknown', + health_pb2.HealthCheckResponse.UNKNOWN) + servicer.set('grpc.test.TestServiceNotServing', + health_pb2.HealthCheckResponse.NOT_SERVING) + server_pool = logging_pool.pool(test_constants.THREAD_CONCURRENCY) + self._server = grpc.server(server_pool) + port = self._server.add_insecure_port('[::]:0') + health_pb2.add_HealthServicer_to_server(servicer, self._server) + self._server.start() + + channel = grpc.insecure_channel('localhost:%d' % port) + self._stub = health_pb2.HealthStub(channel) def test_empty_service(self): request = health_pb2.HealthCheckRequest() - resp = self.servicer.Check(request, None) - self.assertEqual(resp.status, health_pb2.HealthCheckResponse.SERVING) + resp = self._stub.Check(request) + self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status) def test_serving_service(self): request = health_pb2.HealthCheckRequest( service='grpc.test.TestServiceServing') - resp = self.servicer.Check(request, None) - self.assertEqual(resp.status, health_pb2.HealthCheckResponse.SERVING) + resp = self._stub.Check(request) + self.assertEqual(health_pb2.HealthCheckResponse.SERVING, resp.status) def test_unknown_serivce(self): request = health_pb2.HealthCheckRequest( service='grpc.test.TestServiceUnknown') - resp = self.servicer.Check(request, None) - self.assertEqual(resp.status, health_pb2.HealthCheckResponse.UNKNOWN) + resp = self._stub.Check(request) + self.assertEqual(health_pb2.HealthCheckResponse.UNKNOWN, resp.status) def test_not_serving_service(self): request = health_pb2.HealthCheckRequest( service='grpc.test.TestServiceNotServing') - resp = self.servicer.Check(request, None) - self.assertEqual(resp.status, health_pb2.HealthCheckResponse.NOT_SERVING) + resp = self._stub.Check(request) + self.assertEqual(health_pb2.HealthCheckResponse.NOT_SERVING, resp.status) + + def test_not_found_service(self): + request = health_pb2.HealthCheckRequest( + service='not-found') + with self.assertRaises(grpc.RpcError) as context: + resp = self._stub.Check(request) + + self.assertEqual(grpc.StatusCode.NOT_FOUND, context.exception.code()) if __name__ == '__main__': diff --git a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py index d32208f9eb..df620b19ba 100644 --- a/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py +++ b/src/python/grpcio_tests/tests/unit/framework/interfaces/face/_future_invocation_asynchronous_event_service.py @@ -434,11 +434,13 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. for test_messages in test_messages_sequence: request = test_messages.request() callback = _Callback() + abortion_callback = _Callback() with self._control.fail(): response_future = self._invoker.future(group, method)( request, _3069_test_constant.REALLY_SHORT_TIMEOUT) response_future.add_done_callback(callback) + response_future.add_abortion_callback(abortion_callback) self.assertIs(callback.future(), response_future) # Because the servicer fails outside of the thread from which the @@ -450,6 +452,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. with self.assertRaises(face.ExpirationError): response_future.result() self.assertIsNotNone(response_future.traceback()) + self.assertIsNotNone(abortion_callback.future()) def testFailedUnaryRequestStreamResponse(self): for (group, method), test_messages_sequence in ( @@ -472,11 +475,13 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. for test_messages in test_messages_sequence: requests = test_messages.requests() callback = _Callback() + abortion_callback = _Callback() with self._control.fail(): response_future = self._invoker.future(group, method)( iter(requests), _3069_test_constant.REALLY_SHORT_TIMEOUT) response_future.add_done_callback(callback) + response_future.add_abortion_callback(abortion_callback) self.assertIs(callback.future(), response_future) # Because the servicer fails outside of the thread from which the @@ -488,6 +493,7 @@ class TestCase(six.with_metaclass(abc.ABCMeta, test_coverage.Coverage, unittest. with self.assertRaises(face.ExpirationError): response_future.result() self.assertIsNotNone(response_future.traceback()) + self.assertIsNotNone(abortion_callback.future()) def testFailedStreamRequestStreamResponse(self): for (group, method), test_messages_sequence in ( diff --git a/src/ruby/ext/grpc/rb_channel.c b/src/ruby/ext/grpc/rb_channel.c index 18a15d0125..e6d30a174b 100644 --- a/src/ruby/ext/grpc/rb_channel.c +++ b/src/ruby/ext/grpc/rb_channel.c @@ -40,6 +40,7 @@ #include <grpc/grpc_security.h> #include <grpc/support/alloc.h> #include <grpc/support/log.h> +#include <grpc/support/time.h> #include "rb_grpc.h" #include "rb_call.h" #include "rb_channel_args.h" @@ -71,6 +72,7 @@ typedef struct grpc_rb_channel { /* The actual channel */ grpc_channel *wrapped; + grpc_completion_queue *queue; } grpc_rb_channel; /* Destroys Channel instances. */ @@ -83,6 +85,7 @@ static void grpc_rb_channel_free(void *p) { if (ch->wrapped != NULL) { grpc_channel_destroy(ch->wrapped); + grpc_rb_completion_queue_destroy(ch->queue); } xfree(p); @@ -165,6 +168,7 @@ static VALUE grpc_rb_channel_init(int argc, VALUE *argv, VALUE self) { } rb_ivar_set(self, id_target, target); wrapper->wrapped = ch; + wrapper->queue = grpc_completion_queue_create(NULL); return self; } @@ -203,16 +207,18 @@ static VALUE grpc_rb_channel_get_connectivity_state(int argc, VALUE *argv, the completion queue with success=0 */ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, VALUE last_state, - VALUE cqueue, - VALUE deadline, - VALUE tag) { + VALUE deadline) { grpc_rb_channel *wrapper = NULL; grpc_channel *ch = NULL; grpc_completion_queue *cq = NULL; - cq = grpc_rb_get_wrapped_completion_queue(cqueue); + void *tag = wrapper; + + grpc_event event; + TypedData_Get_Struct(self, grpc_rb_channel, &grpc_channel_data_type, wrapper); ch = wrapper->wrapped; + cq = wrapper->queue; if (ch == NULL) { rb_raise(rb_eRuntimeError, "closed!"); return Qnil; @@ -222,9 +228,16 @@ static VALUE grpc_rb_channel_watch_connectivity_state(VALUE self, (grpc_connectivity_state)NUM2LONG(last_state), grpc_rb_time_timeval(deadline, /* absolute time */ 0), cq, - ROBJECT(tag)); + tag); - return Qnil; + event = rb_completion_queue_pluck(cq, tag, + gpr_inf_future(GPR_CLOCK_REALTIME), NULL); + + if (event.success) { + return Qtrue; + } else { + return Qfalse; + } } /* Create a call given a grpc_channel, in order to call method. The request diff --git a/src/ruby/ext/grpc/rb_completion_queue.h b/src/ruby/ext/grpc/rb_completion_queue.h index 9f8f6aa5ff..aa9dc6416a 100644 --- a/src/ruby/ext/grpc/rb_completion_queue.h +++ b/src/ruby/ext/grpc/rb_completion_queue.h @@ -38,9 +38,6 @@ #include <grpc/grpc.h> -/* Gets the wrapped completion queue from the ruby wrapper */ -grpc_completion_queue *grpc_rb_get_wrapped_completion_queue(VALUE v); - void grpc_rb_completion_queue_destroy(grpc_completion_queue *cq); /** |