aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface
diff options
context:
space:
mode:
authorGravatar ctiller <ctiller@google.com>2015-01-07 12:13:17 -0800
committerGravatar Nicolas Noble <nnoble@google.com>2015-01-09 17:23:18 -0800
commite4b409364e4c493a66d4b2a6fe897075aa5c174e (patch)
tree29467626f50aea49e072e15004dd141625146709 /src/core/surface
parent8232204a36712553b9eedb2dacab13b7c38642c6 (diff)
Add a --forever flag, to continuously run tests as things change.
Change on 2015/01/07 by ctiller <ctiller@google.com> ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=83451760
Diffstat (limited to 'src/core/surface')
-rw-r--r--src/core/surface/call.c4
-rw-r--r--src/core/surface/completion_queue.c66
-rw-r--r--src/core/surface/server.c13
-rw-r--r--src/core/surface/server.h3
-rw-r--r--src/core/surface/server_chttp2.c4
5 files changed, 42 insertions, 48 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 9c5f5064eb..9ed617f665 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -878,9 +878,9 @@ grpc_metadata_buffer *grpc_call_get_metadata_buffer(grpc_call *call) {
return &call->incoming_metadata;
}
-static void call_alarm(void *arg, int success) {
+static void call_alarm(void *arg, grpc_iomgr_cb_status status) {
grpc_call *call = arg;
- if (success) {
+ if (status == GRPC_CALLBACK_SUCCESS) {
grpc_call_cancel(call);
}
grpc_call_internal_unref(call);
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index b59c36e03a..4837f5b978 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -36,7 +36,7 @@
#include <stdio.h>
#include <string.h>
-#include "src/core/iomgr/pollset.h"
+#include "src/core/iomgr/iomgr_completion_queue_interface.h"
#include "src/core/surface/call.h"
#include "src/core/surface/event_string.h"
#include "src/core/surface/surface_trace.h"
@@ -61,7 +61,6 @@ typedef struct event {
/* Completion queue structure */
struct grpc_completion_queue {
- /* TODO(ctiller): see if this can be removed */
int allow_polling;
/* When refs drops to zero, we are in shutdown mode, and will be destroyable
@@ -101,7 +100,7 @@ void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc) {
/* Create and append an event to the queue. Returns the event so that its data
members can be filled in.
- Requires GRPC_POLLSET_MU(&cc->pollset) locked. */
+ Requires grpc_iomgr_mu locked. */
static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
void *tag, grpc_call *call,
grpc_event_finish_func on_finish, void *user_data) {
@@ -127,8 +126,7 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
ev->bucket_prev = cc->buckets[bucket]->bucket_prev;
ev->bucket_next->bucket_prev = ev->bucket_prev->bucket_next = ev;
}
- gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
- grpc_pollset_kick(&cc->pollset);
+ gpr_cv_broadcast(&grpc_iomgr_cv);
return ev;
}
@@ -151,7 +149,7 @@ static void end_op_locked(grpc_completion_queue *cc,
if (gpr_unref(&cc->refs)) {
GPR_ASSERT(!cc->shutdown);
cc->shutdown = 1;
- gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
+ gpr_cv_broadcast(&grpc_iomgr_cv);
}
}
@@ -159,11 +157,11 @@ void grpc_cq_end_read(grpc_completion_queue *cc, void *tag, grpc_call *call,
grpc_event_finish_func on_finish, void *user_data,
grpc_byte_buffer *read) {
event *ev;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
ev = add_locked(cc, GRPC_READ, tag, call, on_finish, user_data);
ev->base.data.read = read;
end_op_locked(cc, GRPC_READ);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag,
@@ -171,11 +169,11 @@ void grpc_cq_end_invoke_accepted(grpc_completion_queue *cc, void *tag,
grpc_event_finish_func on_finish,
void *user_data, grpc_op_error error) {
event *ev;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
ev = add_locked(cc, GRPC_INVOKE_ACCEPTED, tag, call, on_finish, user_data);
ev->base.data.invoke_accepted = error;
end_op_locked(cc, GRPC_INVOKE_ACCEPTED);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag,
@@ -183,11 +181,11 @@ void grpc_cq_end_write_accepted(grpc_completion_queue *cc, void *tag,
grpc_event_finish_func on_finish,
void *user_data, grpc_op_error error) {
event *ev;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
ev = add_locked(cc, GRPC_WRITE_ACCEPTED, tag, call, on_finish, user_data);
ev->base.data.write_accepted = error;
end_op_locked(cc, GRPC_WRITE_ACCEPTED);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag,
@@ -195,11 +193,11 @@ void grpc_cq_end_finish_accepted(grpc_completion_queue *cc, void *tag,
grpc_event_finish_func on_finish,
void *user_data, grpc_op_error error) {
event *ev;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
ev = add_locked(cc, GRPC_FINISH_ACCEPTED, tag, call, on_finish, user_data);
ev->base.data.finish_accepted = error;
end_op_locked(cc, GRPC_FINISH_ACCEPTED);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag,
@@ -208,13 +206,13 @@ void grpc_cq_end_client_metadata_read(grpc_completion_queue *cc, void *tag,
void *user_data, size_t count,
grpc_metadata *elements) {
event *ev;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
ev = add_locked(cc, GRPC_CLIENT_METADATA_READ, tag, call, on_finish,
user_data);
ev->base.data.client_metadata_read.count = count;
ev->base.data.client_metadata_read.elements = elements;
end_op_locked(cc, GRPC_CLIENT_METADATA_READ);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call,
@@ -223,14 +221,14 @@ void grpc_cq_end_finished(grpc_completion_queue *cc, void *tag, grpc_call *call,
grpc_metadata *metadata_elements,
size_t metadata_count) {
event *ev;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
ev = add_locked(cc, GRPC_FINISHED, tag, call, on_finish, user_data);
ev->base.data.finished.status = status;
ev->base.data.finished.details = details;
ev->base.data.finished.metadata_count = metadata_count;
ev->base.data.finished.metadata_elements = metadata_elements;
end_op_locked(cc, GRPC_FINISHED);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
@@ -239,7 +237,7 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
gpr_timespec deadline, size_t metadata_count,
grpc_metadata *metadata_elements) {
event *ev;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
ev = add_locked(cc, GRPC_SERVER_RPC_NEW, tag, call, on_finish, user_data);
ev->base.data.server_rpc_new.method = method;
ev->base.data.server_rpc_new.host = host;
@@ -247,7 +245,7 @@ void grpc_cq_end_new_rpc(grpc_completion_queue *cc, void *tag, grpc_call *call,
ev->base.data.server_rpc_new.metadata_count = metadata_count;
ev->base.data.server_rpc_new.metadata_elements = metadata_elements;
end_op_locked(cc, GRPC_SERVER_RPC_NEW);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
/* Create a GRPC_QUEUE_SHUTDOWN event without queuing it anywhere */
@@ -264,7 +262,7 @@ grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc,
gpr_timespec deadline) {
event *ev = NULL;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
for (;;) {
if (cc->queue != NULL) {
gpr_uintptr bucket;
@@ -290,16 +288,15 @@ grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc,
ev = create_shutdown_event();
break;
}
- if (cc->allow_polling && grpc_pollset_work(&cc->pollset, deadline)) {
+ if (cc->allow_polling && grpc_iomgr_work(deadline)) {
continue;
}
- if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset),
- GRPC_POLLSET_MU(&cc->pollset), deadline)) {
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, deadline)) {
+ gpr_mu_unlock(&grpc_iomgr_mu);
return NULL;
}
}
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&grpc_iomgr_mu);
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
return &ev->base;
}
@@ -337,7 +334,7 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
gpr_timespec deadline) {
event *ev = NULL;
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
for (;;) {
if ((ev = pluck_event(cc, tag))) {
break;
@@ -346,16 +343,15 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
ev = create_shutdown_event();
break;
}
- if (cc->allow_polling && grpc_pollset_work(&cc->pollset, deadline)) {
+ if (cc->allow_polling && grpc_iomgr_work(deadline)) {
continue;
}
- if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset),
- GRPC_POLLSET_MU(&cc->pollset), deadline)) {
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ if (gpr_cv_wait(&grpc_iomgr_cv, &grpc_iomgr_mu, deadline)) {
+ gpr_mu_unlock(&grpc_iomgr_mu);
return NULL;
}
}
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_unlock(&grpc_iomgr_mu);
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
return &ev->base;
}
@@ -364,11 +360,11 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
to zero here, then enter shutdown mode and wake up any waiters */
void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
if (gpr_unref(&cc->refs)) {
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_mu_lock(&grpc_iomgr_mu);
GPR_ASSERT(!cc->shutdown);
cc->shutdown = 1;
- gpr_cv_broadcast(GRPC_POLLSET_CV(&cc->pollset));
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ gpr_cv_broadcast(&grpc_iomgr_cv);
+ gpr_mu_unlock(&grpc_iomgr_mu);
}
}
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index aa544a97f2..3829e7aa8f 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -52,7 +52,7 @@ typedef enum { PENDING_START, ALL_CALLS, CALL_LIST_COUNT } call_list;
typedef struct listener {
void *arg;
- void (*start)(grpc_server *server, void *arg, grpc_pollset *pollset);
+ void (*start)(grpc_server *server, void *arg);
void (*destroy)(grpc_server *server, void *arg);
struct listener *next;
} listener;
@@ -192,7 +192,7 @@ static void orphan_channel(channel_data *chand) {
chand->next = chand->prev = chand;
}
-static void finish_destroy_channel(void *cd, int success) {
+static void finish_destroy_channel(void *cd, grpc_iomgr_cb_status status) {
channel_data *chand = cd;
grpc_server *server = chand->server;
/*gpr_log(GPR_INFO, "destroy channel %p", chand->channel);*/
@@ -247,7 +247,7 @@ static void start_new_rpc(grpc_call_element *elem) {
gpr_mu_unlock(&server->mu);
}
-static void kill_zombie(void *elem, int success) {
+static void kill_zombie(void *elem, grpc_iomgr_cb_status status) {
grpc_call_destroy(grpc_call_from_top_element(elem));
}
@@ -336,7 +336,7 @@ static void channel_op(grpc_channel_element *elem,
}
}
-static void finish_shutdown_channel(void *cd, int success) {
+static void finish_shutdown_channel(void *cd, grpc_iomgr_cb_status status) {
channel_data *chand = cd;
grpc_channel_op op;
op.type = GRPC_CHANNEL_DISCONNECT;
@@ -468,7 +468,7 @@ void grpc_server_start(grpc_server *server) {
listener *l;
for (l = server->listeners; l; l = l->next) {
- l->start(server, l->arg, grpc_cq_pollset(server->cq));
+ l->start(server, l->arg);
}
}
@@ -596,8 +596,7 @@ void grpc_server_destroy(grpc_server *server) {
}
void grpc_server_add_listener(grpc_server *server, void *arg,
- void (*start)(grpc_server *server, void *arg,
- grpc_pollset *pollset),
+ void (*start)(grpc_server *server, void *arg),
void (*destroy)(grpc_server *server, void *arg)) {
listener *l = gpr_malloc(sizeof(listener));
l->arg = arg;
diff --git a/src/core/surface/server.h b/src/core/surface/server.h
index 61292ebe4e..f0773ab9d5 100644
--- a/src/core/surface/server.h
+++ b/src/core/surface/server.h
@@ -47,8 +47,7 @@ grpc_server *grpc_server_create_from_filters(grpc_completion_queue *cq,
/* Add a listener to the server: when the server starts, it will call start,
and when it shuts down, it will call destroy */
void grpc_server_add_listener(grpc_server *server, void *listener,
- void (*start)(grpc_server *server, void *arg,
- grpc_pollset *pollset),
+ void (*start)(grpc_server *server, void *arg),
void (*destroy)(grpc_server *server, void *arg));
/* Setup a transport - creates a channel stack, binds the transport to the
diff --git a/src/core/surface/server_chttp2.c b/src/core/surface/server_chttp2.c
index a0961bd449..a5fdd03774 100644
--- a/src/core/surface/server_chttp2.c
+++ b/src/core/surface/server_chttp2.c
@@ -59,9 +59,9 @@ static void new_transport(void *server, grpc_endpoint *tcp) {
}
/* Server callback: start listening on our ports */
-static void start(grpc_server *server, void *tcpp, grpc_pollset *pollset) {
+static void start(grpc_server *server, void *tcpp) {
grpc_tcp_server *tcp = tcpp;
- grpc_tcp_server_start(tcp, pollset, new_transport, server);
+ grpc_tcp_server_start(tcp, new_transport, server);
}
/* Server callback: destroy the tcp listener (so we don't generate further