diff options
author | ctiller <ctiller@google.com> | 2015-01-07 12:13:17 -0800 |
---|---|---|
committer | Nicolas Noble <nnoble@google.com> | 2015-01-09 17:23:18 -0800 |
commit | e4b409364e4c493a66d4b2a6fe897075aa5c174e (patch) | |
tree | 29467626f50aea49e072e15004dd141625146709 /src/core/surface | |
parent | 8232204a36712553b9eedb2dacab13b7c38642c6 (diff) |
Add a --forever flag, to continuously run tests as things change.
Change on 2015/01/07 by ctiller <ctiller@google.com>
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=83451760
Diffstat (limited to 'src/core/surface')
-rw-r--r-- | src/core/surface/call.c | 4 | ||||
-rw-r--r-- | src/core/surface/completion_queue.c | 66 | ||||
-rw-r--r-- | src/core/surface/server.c | 13 | ||||
-rw-r--r-- | src/core/surface/server.h | 3 | ||||
-rw-r--r-- | src/core/surface/server_chttp2.c | 4 |
5 files changed, 42 insertions, 48 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 9c5f5064eb..9ed617f665 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -878,9 +878,9 @@ grpc_metadata_buffer *grpc_call_get_metadata_buffer(grpc_call *call) { return &call->incoming_metadata; } -static void call_alarm(void *arg, int success) { +static void call_alarm(void *arg, grpc_iomgr_cb_status status) { grpc_call *call = arg; - if (success) { + if (status == GRPC_CALLBACK_SUCCESS) { grpc_call_cancel(call); } grpc_call_internal_unref(call); diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index b59c36e03a..4837f5b978 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -36,7 +36,7 @@ #include <stdio.h> #include <string.h> -#include "src/core/iomgr/pollset.h" +#include "src/core/iomgr/iomgr_completion_queue_interface.h" #include "src/core/surface/call.h" #include "src/core/surface/event_string.h" #include "src/core/surface/surface_trace.h" @@ -61,7 +61,6 @@ typedef struct event { /* Completion queue structure */ struct grpc_completion_queue { - /* TODO(ctiller): see if this can be removed */ int allow_polling; /* When refs drops to zero, we are in shutdown mode, and will be destroyable @@ -101,7 +100,7 @@ void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc) { /* Create and append an event to the queue. Returns the event so that its data members can be filled in. - Requires GRPC_POLLSET_MU(&cc->pollset) locked. */ + Requires grpc_iomgr_mu locked. */ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type, void *tag, grpc_call *call, grpc_event_finish_func on_finish, void *user_data) { @@ -127,8 +126,7 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type, ev->bucket_prev = cc->buckets[bucket]->bucket_prev; ev->bucket_next->bucket_prev = ev->bucket_prev->bucket_next = ev; } - gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset)); - grpc_pollset_kick(&cc->pollset); + gpr_cv_broadcast(&grpc_iomgr_cv); return ev; } @@ -151,7 +149,7 @@ static void end_op_locked(grpc_completion_queue *cc, if (gpr_unref(&cc->refs)) { GPR_ASSERT(!cc->shutdown); cc->shutdown = 1; - gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset)); + gpr_cv_broadcast(&grpc_iomgr_cv); } } @@ -159,11 +157,11 @@ void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call, grpc_event_finish_func on_finish, void *user_data, grpc_byte_buffer *read) { event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(&grpc_iomgr_mu); ev = add_locked(cc, GRPC_READ, tag, call, on_finish, user_data); ev->base.data.read = read; end_op_locked(cc, GRPC_READ); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&grpc_iomgr_mu); } void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag, @@ -171,11 +169,11 @@ void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag, grpc_event_finish_func on_finish, void *user_data, grpc_op_error error) { event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(&grpc_iomgr_mu); ev = add_locked(cc, GRPC_INVOKE_ACCEPTED, tag, call, on_finish, user_data); ev->base.data.invoke_accepted = error; end_op_locked(cc, GRPC_INVOKE_ACCEPTED); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&grpc_iomgr_mu); } void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag, @@ -183,11 +181,11 @@ void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag, grpc_event_finish_func on_finish, void *user_data, grpc_op_error error) { event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(&grpc_iomgr_mu); ev = add_locked(cc, GRPC_WRITE_ACCEPTED, tag, call, on_finish, user_data); ev->base.data.write_accepted = error; end_op_locked(cc, GRPC_WRITE_ACCEPTED); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&grpc_iomgr_mu); } void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag, @@ -195,11 +193,11 @@ void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag, grpc_event_finish_func on_finish, void *user_data, grpc_op_error error) { event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(&grpc_iomgr_mu); ev = add_locked(cc, GRPC_FINISH_ACCEPTED, tag, call, on_finish, user_data); ev->base.data.finish_accepted = error; end_op_locked(cc, GRPC_FINISH_ACCEPTED); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&grpc_iomgr_mu); } void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag, @@ -208,13 +206,13 @@ void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag, void *user_data, size_t count, grpc_metadata *elements) { event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(&grpc_iomgr_mu); ev = add_locked(cc, GRPC_CLIENT_METADATA_READ, tag, call, on_finish, user_data); ev->base.data.client_metadata_read.count = count; ev->base.data.client_metadata_read.elements = elements; end_op_locked(cc, GRPC_CLIENT_METADATA_READ); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&grpc_iomgr_mu); } void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call, @@ -223,14 +221,14 @@ void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call, grpc_metadata *metadata_elements, size_t metadata_count) { event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(&grpc_iomgr_mu); ev = add_locked(cc, GRPC_FINISHED, tag, call, on_finish, user_data); ev->base.data.finished.status = status; ev->base.data.finished.details = details; ev->base.data.finished.metadata_count = metadata_count; ev->base.data.finished.metadata_elements = metadata_elements; end_op_locked(cc, GRPC_FINISHED); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&grpc_iomgr_mu); } void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call, @@ -239,7 +237,7 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call, gpr_timespec deadline, size_t metadata_count, grpc_metadata *metadata_elements) { event *ev; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(&grpc_iomgr_mu); ev = add_locked(cc, GRPC_SERVER_RPC_NEW, tag, call, on_finish, user_data); ev->base.data.server_rpc_new.method = method; ev->base.data.server_rpc_new.host = host; @@ -247,7 +245,7 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call, ev->base.data.server_rpc_new.metadata_count = metadata_count; ev->base.data.server_rpc_new.metadata_elements = metadata_elements; end_op_locked(cc, GRPC_SERVER_RPC_NEW); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&grpc_iomgr_mu); } /* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */ @@ -264,7 +262,7 @@ grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc, gpr_timespec deadline) { event *ev = NULL; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(&grpc_iomgr_mu); for (;;) { if (cc->queue != NULL) { gpr_uintptr bucket; @@ -290,16 +288,15 @@ grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc, ev = create_shutdown_event(); break; } - if (cc->allow_polling && grpc_pollset_work(&cc->pollset, deadline)) { + if (cc->allow_polling && grpc_iomgr_work(deadline)) { continue; } - if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset), - GRPC_POLLSET_MU(&cc->pollset), deadline)) { - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, deadline)) { + gpr_mu_unlock(&grpc_iomgr_mu); return NULL; } } - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&grpc_iomgr_mu); GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base); return &ev->base; } @@ -337,7 +334,7 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, gpr_timespec deadline) { event *ev = NULL; - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(&grpc_iomgr_mu); for (;;) { if ((ev = pluck_event(cc, tag))) { break; @@ -346,16 +343,15 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, ev = create_shutdown_event(); break; } - if (cc->allow_polling && grpc_pollset_work(&cc->pollset, deadline)) { + if (cc->allow_polling && grpc_iomgr_work(deadline)) { continue; } - if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset), - GRPC_POLLSET_MU(&cc->pollset), deadline)) { - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, deadline)) { + gpr_mu_unlock(&grpc_iomgr_mu); return NULL; } } - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_unlock(&grpc_iomgr_mu); GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base); return &ev->base; } @@ -364,11 +360,11 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, to zero here, then enter shutdown mode and wake up any waiters */ void grpc_completion_queue_shutdown(grpc_completion_queue *cc) { if (gpr_unref(&cc->refs)) { - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_mu_lock(&grpc_iomgr_mu); GPR_ASSERT(!cc->shutdown); cc->shutdown = 1; - gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset)); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + gpr_cv_broadcast(&grpc_iomgr_cv); + gpr_mu_unlock(&grpc_iomgr_mu); } } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index aa544a97f2..3829e7aa8f 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -52,7 +52,7 @@ typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list; typedef struct listener { void *arg; - void (*start)(grpc_server *server, void *arg, grpc_pollset *pollset); + void (*start)(grpc_server *server, void *arg); void (*destroy)(grpc_server *server, void *arg); struct listener *next; } listener; @@ -192,7 +192,7 @@ static void orphan_channel(channel_data *chand) { chand->next = chand->prev = chand; } -static void finish_destroy_channel(void *cd, int success) { +static void finish_destroy_channel(void *cd, grpc_iomgr_cb_status status) { channel_data *chand = cd; grpc_server *server = chand->server; /*gpr_log(GPR_INFO, "destroy channel %p", chand->channel);*/ @@ -247,7 +247,7 @@ static void start_new_rpc(grpc_call_element *elem) { gpr_mu_unlock(&server->mu); } -static void kill_zombie(void *elem, int success) { +static void kill_zombie(void *elem, grpc_iomgr_cb_status status) { grpc_call_destroy(grpc_call_from_top_element(elem)); } @@ -336,7 +336,7 @@ static void channel_op(grpc_channel_element *elem, } } -static void finish_shutdown_channel(void *cd, int success) { +static void finish_shutdown_channel(void *cd, grpc_iomgr_cb_status status) { channel_data *chand = cd; grpc_channel_op op; op.type = GRPC_CHANNEL_DISCONNECT; @@ -468,7 +468,7 @@ void grpc_server_start(grpc_server *server) { listener *l; for (l = server->listeners; l; l = l->next) { - l->start(server, l->arg, grpc_cq_pollset(server->cq)); + l->start(server, l->arg); } } @@ -596,8 +596,7 @@ void grpc_server_destroy(grpc_server *server) { } void grpc_server_add_listener(grpc_server *server, void *arg, - void (*start)(grpc_server *server, void *arg, - grpc_pollset *pollset), + void (*start)(grpc_server *server, void *arg), void (*destroy)(grpc_server *server, void *arg)) { listener *l = gpr_malloc(sizeof(listener)); l->arg = arg; diff --git a/src/core/surface/server.h b/src/core/surface/server.h index 61292ebe4e..f0773ab9d5 100644 --- a/src/core/surface/server.h +++ b/src/core/surface/server.h @@ -47,8 +47,7 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq, /* Add a listener to the server: when the server starts, it will call start, and when it shuts down, it will call destroy */ void grpc_server_add_listener(grpc_server *server, void *listener, - void (*start)(grpc_server *server, void *arg, - grpc_pollset *pollset), + void (*start)(grpc_server *server, void *arg), void (*destroy)(grpc_server *server, void *arg)); /* Setup a transport - creates a channel stack, binds the transport to the diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c index a0961bd449..a5fdd03774 100644 --- a/src/core/surface/server_chttp2.c +++ b/src/core/surface/server_chttp2.c @@ -59,9 +59,9 @@ static void new_transport(void *server, grpc_endpoint *tcp) { } /* Server callback: start listening on our ports */ -static void start(grpc_server *server, void *tcpp, grpc_pollset *pollset) { +static void start(grpc_server *server, void *tcpp) { grpc_tcp_server *tcp = tcpp; - grpc_tcp_server_start(tcp, pollset, new_transport, server); + grpc_tcp_server_start(tcp, new_transport, server); } /* Server callback: destroy the tcp listener (so we don't generate further |