aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core
diff options
context:
space:
mode:
authorGravatar Craig Tiller <ctiller@google.com>2015-05-04 14:53:51 -0700
committerGravatar Craig Tiller <ctiller@google.com>2015-05-04 14:53:51 -0700
commit64be9f7a30a4bcb9ce3647f11ba9e06994aa3bb7 (patch)
tree42a4af35a2fe0f3a79573ff37130fd6b74c55cb9 /src/core
parentc112d146a2dcc5e90d5f5cca10f55f212f9492c6 (diff)
C Core API cleanup.
Simplify grpc_event into something that can be non-heap allocated. Deprecate grpc_event_finish. Remove grpc_op_error - use an int as this is more idiomatic C style.
Diffstat (limited to 'src/core')
-rw-r--r--src/core/surface/call.c81
-rw-r--r--src/core/surface/call.h3
-rw-r--r--src/core/surface/completion_queue.c93
-rw-r--r--src/core/surface/completion_queue.h14
-rw-r--r--src/core/surface/event_string.c26
-rw-r--r--src/core/surface/server.c28
6 files changed, 85 insertions, 160 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 070be1b25a..3b8d514b4f 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -62,7 +62,7 @@ typedef enum {
typedef struct {
grpc_ioreq_completion_func on_complete;
void *user_data;
- grpc_op_error status;
+ int success;
} completed_request;
/* See request_set in grpc_call below for a description */
@@ -74,7 +74,7 @@ typedef struct {
typedef struct {
/* Overall status of the operation: starts OK, may degrade to
non-OK */
- grpc_op_error status;
+ int success;
/* Completion function to call at the end of the operation */
grpc_ioreq_completion_func on_complete;
void *user_data;
@@ -239,7 +239,6 @@ struct grpc_call {
y = temp; \
} while (0)
-static void do_nothing(void *ignored, grpc_op_error also_ignored) {}
static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline);
static void call_on_done_recv(void *call, int success);
static void call_on_done_send(void *call, int success);
@@ -460,7 +459,7 @@ static void unlock(grpc_call *call) {
if (completing_requests > 0) {
for (i = 0; i < completing_requests; i++) {
- completed_requests[i].on_complete(call, completed_requests[i].status,
+ completed_requests[i].on_complete(call, completed_requests[i].success,
completed_requests[i].user_data);
}
lock(call);
@@ -520,7 +519,7 @@ no_details:
}
static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
- grpc_op_error status) {
+ int success) {
completed_request *cr;
gpr_uint8 master_set = call->request_set[op];
reqinfo_master *master;
@@ -528,8 +527,8 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
/* ioreq is live: we need to do something */
master = &call->masters[master_set];
master->complete_mask |= 1u << op;
- if (status != GRPC_OP_OK) {
- master->status = status;
+ if (!success) {
+ master->success = 0;
}
if (master->complete_mask == master->need_mask) {
for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
@@ -540,7 +539,7 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
switch ((grpc_ioreq_op)i) {
case GRPC_IOREQ_RECV_MESSAGE:
case GRPC_IOREQ_SEND_MESSAGE:
- if (master->status == GRPC_OP_OK) {
+ if (master->success) {
call->request_set[i] = REQSET_EMPTY;
} else {
call->write_state = WRITE_STATE_WRITE_CLOSED;
@@ -575,33 +574,31 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
}
}
cr = &call->completed_requests[call->num_completed_requests++];
- cr->status = master->status;
+ cr->success = master->success;
cr->on_complete = master->on_complete;
cr->user_data = master->user_data;
}
}
-static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op,
- grpc_op_error status) {
+static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, int success) {
if (is_op_live(call, op)) {
- finish_live_ioreq_op(call, op, status);
+ finish_live_ioreq_op(call, op, success);
}
}
static void call_on_done_send(void *pc, int success) {
grpc_call *call = pc;
- grpc_op_error error = success ? GRPC_OP_OK : GRPC_OP_ERROR;
lock(call);
if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) {
- finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, error);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, success);
}
if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_MESSAGE)) {
- finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, error);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, success);
}
if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_CLOSE)) {
- finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, error);
- finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, error);
- finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_OK);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, success);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, success);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1);
}
call->last_send_contains = 0;
call->sending = 0;
@@ -720,12 +717,12 @@ static void call_on_done_recv(void *pc, int success) {
}
finish_read_ops(call);
} else {
- finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_ERROR);
- finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_ERROR);
- finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_ERROR);
- finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_ERROR);
- finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_ERROR);
- finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, GRPC_OP_ERROR);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, 0);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, 0);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, 0);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, 0);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, 0);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, 0);
}
call->recv_ops.nops = 0;
unlock(call);
@@ -877,7 +874,7 @@ static void finish_read_ops(grpc_call *call) {
(NULL == (*call->request_data[GRPC_IOREQ_RECV_MESSAGE].recv_message =
grpc_bbq_pop(&call->incoming_queue)));
if (!empty) {
- finish_live_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
+ finish_live_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, 1);
empty = grpc_bbq_empty(&call->incoming_queue);
}
} else {
@@ -887,19 +884,19 @@ static void finish_read_ops(grpc_call *call) {
switch (call->read_state) {
case READ_STATE_STREAM_CLOSED:
if (empty) {
- finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, 1);
}
/* fallthrough */
case READ_STATE_READ_CLOSED:
if (empty) {
- finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, 1);
}
- finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK);
- finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, GRPC_OP_OK);
- finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, 1);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, 1);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, 1);
/* fallthrough */
case READ_STATE_GOT_INITIAL_METADATA:
- finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK);
+ finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, 1);
/* fallthrough */
case READ_STATE_INITIAL:
/* do nothing */
@@ -910,13 +907,13 @@ static void finish_read_ops(grpc_call *call) {
static void early_out_write_ops(grpc_call *call) {
switch (call->write_state) {
case WRITE_STATE_WRITE_CLOSED:
- finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, GRPC_OP_ERROR);
- finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, GRPC_OP_ERROR);
- finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_ERROR);
- finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_OK);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1);
/* fallthrough */
case WRITE_STATE_STARTED:
- finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, GRPC_OP_ERROR);
+ finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0);
/* fallthrough */
case WRITE_STATE_INITIAL:
/* do nothing */
@@ -957,7 +954,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
}
master = &call->masters[set];
- master->status = GRPC_OP_OK;
+ master->success = 1;
master->need_mask = have_ops;
master->complete_mask = 0;
master->on_complete = completion;
@@ -1144,8 +1141,8 @@ static void set_cancelled_value(grpc_status_code status, void *dest) {
*(grpc_status_code *)dest = (status != GRPC_STATUS_OK);
}
-static void finish_batch(grpc_call *call, grpc_op_error result, void *tag) {
- grpc_cq_end_op(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK);
+static void finish_batch(grpc_call *call, int success, void *tag) {
+ grpc_cq_end_op(call->cq, tag, call, 1);
}
grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
@@ -1159,8 +1156,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag);
if (nops == 0) {
- grpc_cq_begin_op(call->cq, call, GRPC_OP_COMPLETE);
- grpc_cq_end_op(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK);
+ grpc_cq_begin_op(call->cq, call);
+ grpc_cq_end_op(call->cq, tag, call, 1);
return GRPC_CALL_OK;
}
@@ -1251,7 +1248,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
}
}
- grpc_cq_begin_op(call->cq, call, GRPC_OP_COMPLETE);
+ grpc_cq_begin_op(call->cq, call);
return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_batch,
tag);
diff --git a/src/core/surface/call.h b/src/core/surface/call.h
index 2d4c7f61e3..30d9c868a3 100644
--- a/src/core/surface/call.h
+++ b/src/core/surface/call.h
@@ -80,8 +80,7 @@ typedef struct {
grpc_ioreq_data data;
} grpc_ioreq;
-typedef void (*grpc_ioreq_completion_func)(grpc_call *call,
- grpc_op_error status,
+typedef void (*grpc_ioreq_completion_func)(grpc_call *call, int success,
void *user_data);
grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq,
diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c
index 3e9031807e..2f1d81ee84 100644
--- a/src/core/surface/completion_queue.c
+++ b/src/core/surface/completion_queue.c
@@ -51,8 +51,6 @@
function (on_finish) that is hidden from outside this module */
typedef struct event {
grpc_event base;
- grpc_event_finish_func on_finish;
- void *on_finish_user_data;
struct event *queue_next;
struct event *queue_prev;
struct event *bucket_next;
@@ -78,16 +76,8 @@ struct grpc_completion_queue {
event *queue;
/* Fixed size chained hash table of events for pluck() */
event *buckets[NUM_TAG_BUCKETS];
-
-#ifndef NDEBUG
- /* Debug support: track which operations are in flight at any given time */
- gpr_atm pending_op_count[GRPC_COMPLETION_DO_NOT_USE];
-#endif
};
-/* Default do-nothing on_finish function */
-static void null_on_finish(void *user_data, grpc_op_error error) {}
-
grpc_completion_queue *grpc_completion_queue_create(void) {
grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue));
memset(cc, 0, sizeof(*cc));
@@ -124,15 +114,11 @@ void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc) {
members can be filled in.
Requires GRPC_POLLSET_MU(&cc->pollset) locked. */
static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
- void *tag, grpc_call *call,
- grpc_event_finish_func on_finish, void *user_data) {
+ void *tag, grpc_call *call) {
event *ev = gpr_malloc(sizeof(event));
gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS;
ev->base.type = type;
ev->base.tag = tag;
- ev->base.call = call;
- ev->on_finish = on_finish ? on_finish : null_on_finish;
- ev->on_finish_user_data = user_data;
if (cc->queue == NULL) {
cc->queue = ev->queue_next = ev->queue_prev = ev;
} else {
@@ -152,22 +138,15 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type,
return ev;
}
-void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call,
- grpc_completion_type type) {
+void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call) {
gpr_ref(&cc->refs);
if (call) GRPC_CALL_INTERNAL_REF(call, "cq");
-#ifndef NDEBUG
- gpr_atm_no_barrier_fetch_add(&cc->pending_op_count[type], 1);
-#endif
}
/* Signal the end of an operation - if this is the last waiting-to-be-queued
event, then enter shutdown mode */
static void end_op_locked(grpc_completion_queue *cc,
grpc_completion_type type) {
-#ifndef NDEBUG
- GPR_ASSERT(gpr_atm_full_fetch_add(&cc->pending_op_count[type], -1) > 0);
-#endif
if (gpr_unref(&cc->refs)) {
GPR_ASSERT(!cc->shutdown);
GPR_ASSERT(cc->shutdown_called);
@@ -176,20 +155,12 @@ static void end_op_locked(grpc_completion_queue *cc,
}
}
-void grpc_cq_end_server_shutdown(grpc_completion_queue *cc, void *tag) {
- gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
- add_locked(cc, GRPC_SERVER_SHUTDOWN, tag, NULL, NULL, NULL);
- end_op_locked(cc, GRPC_SERVER_SHUTDOWN);
- gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
-}
-
void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
- grpc_event_finish_func on_finish, void *user_data,
- grpc_op_error error) {
+ int success) {
event *ev;
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
- ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call, on_finish, user_data);
- ev->base.data.op_complete = error;
+ ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call);
+ ev->base.success = success;
end_op_locked(cc, GRPC_OP_COMPLETE);
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
}
@@ -198,15 +169,14 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
static event *create_shutdown_event(void) {
event *ev = gpr_malloc(sizeof(event));
ev->base.type = GRPC_QUEUE_SHUTDOWN;
- ev->base.call = NULL;
ev->base.tag = NULL;
- ev->on_finish = null_on_finish;
return ev;
}
-grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc,
- gpr_timespec deadline) {
+grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
+ gpr_timespec deadline) {
event *ev = NULL;
+ grpc_event ret;
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {
@@ -240,12 +210,16 @@ grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc,
if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset),
GRPC_POLLSET_MU(&cc->pollset), deadline)) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
- return NULL;
+ memset(&ret, 0, sizeof(ret));
+ ret.type = GRPC_QUEUE_TIMEOUT;
+ return ret;
}
}
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
- GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
- return &ev->base;
+ ret = ev->base;
+ gpr_free(ev);
+ GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
+ return ret;
}
static event *pluck_event(grpc_completion_queue *cc, void *tag) {
@@ -277,9 +251,10 @@ static event *pluck_event(grpc_completion_queue *cc, void *tag) {
return NULL;
}
-grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
- gpr_timespec deadline) {
+grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
+ gpr_timespec deadline) {
event *ev = NULL;
+ grpc_event ret;
gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
for (;;) {
@@ -296,12 +271,16 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset),
GRPC_POLLSET_MU(&cc->pollset), deadline)) {
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
- return NULL;
+ memset(&ret, 0, sizeof(ret));
+ ret.type = GRPC_QUEUE_TIMEOUT;
+ return ret;
}
}
gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
+ ret = ev->base;
+ gpr_free(ev);
GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base);
- return &ev->base;
+ return ret;
}
/* Shutdown simply drops a ref that we reserved at creation time; if we drop
@@ -324,30 +303,6 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
grpc_cq_internal_unref(cc);
}
-void grpc_event_finish(grpc_event *base) {
- event *ev = (event *)base;
- ev->on_finish(ev->on_finish_user_data, GRPC_OP_OK);
- if (ev->base.call) {
- GRPC_CALL_INTERNAL_UNREF(ev->base.call, "cq", 1);
- }
- gpr_free(ev);
-}
-
-void grpc_cq_dump_pending_ops(grpc_completion_queue *cc) {
-#ifndef NDEBUG
- char tmp[GRPC_COMPLETION_DO_NOT_USE * (1 + GPR_LTOA_MIN_BUFSIZE)];
- char *p = tmp;
- int i;
-
- for (i = 0; i < GRPC_COMPLETION_DO_NOT_USE; i++) {
- *p++ = ' ';
- p += gpr_ltoa(cc->pending_op_count[i], p);
- }
-
- gpr_log(GPR_INFO, "pending ops:%s", tmp);
-#endif
-}
-
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
return &cc->pollset;
}
diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h
index a0d7eeaac6..239bfe4aea 100644
--- a/src/core/surface/completion_queue.h
+++ b/src/core/surface/completion_queue.h
@@ -39,17 +39,12 @@
#include "src/core/iomgr/pollset.h"
#include <grpc/grpc.h>
-/* A finish func is executed whenever the event consumer calls
- grpc_event_finish */
-typedef void (*grpc_event_finish_func)(void *user_data, grpc_op_error error);
-
void grpc_cq_internal_ref(grpc_completion_queue *cc);
void grpc_cq_internal_unref(grpc_completion_queue *cc);
/* Flag that an operation is beginning: the completion channel will not finish
shutdown until a corrensponding grpc_cq_end_* call is made */
-void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call,
- grpc_completion_type type);
+void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call);
/* grpc_cq_end_* functions pair with a grpc_cq_begin_op
@@ -64,16 +59,11 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call,
/* Queue a GRPC_OP_COMPLETED operation */
void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call,
- grpc_event_finish_func on_finish, void *user_data,
- grpc_op_error error);
-
-void grpc_cq_end_server_shutdown(grpc_completion_queue *cc, void *tag);
+ int success);
/* disable polling for some tests */
void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc);
-void grpc_cq_dump_pending_ops(grpc_completion_queue *cc);
-
grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc);
void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc);
diff --git a/src/core/surface/event_string.c b/src/core/surface/event_string.c
index 30bdff6b85..448bb1162b 100644
--- a/src/core/surface/event_string.c
+++ b/src/core/surface/event_string.c
@@ -40,23 +40,15 @@
static void addhdr(gpr_strvec *buf, grpc_event *ev) {
char *tmp;
- gpr_asprintf(&tmp, "tag:%p call:%p", ev->tag, (void *)ev->call);
+ gpr_asprintf(&tmp, "tag:%p", ev->tag);
gpr_strvec_add(buf, tmp);
}
-static const char *errstr(grpc_op_error err) {
- switch (err) {
- case GRPC_OP_OK:
- return "OK";
- case GRPC_OP_ERROR:
- return "ERROR";
- }
- return "UNKNOWN_UNKNOWN";
-}
+static const char *errstr(int success) { return success ? "OK" : "ERROR"; }
-static void adderr(gpr_strvec *buf, grpc_op_error err) {
+static void adderr(gpr_strvec *buf, int success) {
char *tmp;
- gpr_asprintf(&tmp, " err=%s", errstr(err));
+ gpr_asprintf(&tmp, " %s", errstr(success));
gpr_strvec_add(buf, tmp);
}
@@ -69,8 +61,8 @@ char *grpc_event_string(grpc_event *ev) {
gpr_strvec_init(&buf);
switch (ev->type) {
- case GRPC_SERVER_SHUTDOWN:
- gpr_strvec_add(&buf, gpr_strdup("SERVER_SHUTDOWN"));
+ case GRPC_QUEUE_TIMEOUT:
+ gpr_strvec_add(&buf, gpr_strdup("QUEUE_TIMEOUT"));
break;
case GRPC_QUEUE_SHUTDOWN:
gpr_strvec_add(&buf, gpr_strdup("QUEUE_SHUTDOWN"));
@@ -78,11 +70,7 @@ char *grpc_event_string(grpc_event *ev) {
case GRPC_OP_COMPLETE:
gpr_strvec_add(&buf, gpr_strdup("OP_COMPLETE: "));
addhdr(&buf, ev);
- adderr(&buf, ev->data.op_complete);
- break;
- case GRPC_COMPLETION_DO_NOT_USE:
- gpr_strvec_add(&buf, gpr_strdup("DO_NOT_USE (this is a bug)"));
- addhdr(&buf, ev);
+ adderr(&buf, ev->success);
break;
}
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 01644b4471..0d81170dd8 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -188,8 +188,6 @@ struct call_data {
#define SERVER_FROM_CALL_ELEM(elem) \
(((channel_data *)(elem)->channel_data)->server)
-static void do_nothing(void *unused, grpc_op_error ignored) {}
-
static void begin_call(grpc_server *server, call_data *calld,
requested_call *rc);
static void fail_call(grpc_server *server, requested_call *rc);
@@ -538,8 +536,8 @@ static void destroy_call_elem(grpc_call_element *elem) {
if (chand->server->shutdown && chand->server->lists[ALL_CALLS] == NULL) {
for (i = 0; i < chand->server->num_shutdown_tags; i++) {
for (j = 0; j < chand->server->cq_count; j++) {
- grpc_cq_end_server_shutdown(chand->server->cqs[j],
- chand->server->shutdown_tags[i]);
+ grpc_cq_end_op(chand->server->cqs[j], chand->server->shutdown_tags[i],
+ NULL, 1);
}
}
}
@@ -817,7 +815,7 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
gpr_mu_lock(&server->mu);
if (have_shutdown_tag) {
for (i = 0; i < server->cq_count; i++) {
- grpc_cq_begin_op(server->cqs[i], NULL, GRPC_SERVER_SHUTDOWN);
+ grpc_cq_begin_op(server->cqs[i], NULL);
}
server->shutdown_tags =
gpr_realloc(server->shutdown_tags,
@@ -867,7 +865,7 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag,
if (server->lists[ALL_CALLS] == NULL) {
for (i = 0; i < server->num_shutdown_tags; i++) {
for (j = 0; j < server->cq_count; j++) {
- grpc_cq_end_server_shutdown(server->cqs[j], server->shutdown_tags[i]);
+ grpc_cq_end_op(server->cqs[j], server->shutdown_tags[i], NULL, 1);
}
}
}
@@ -1018,7 +1016,7 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call,
grpc_completion_queue *cq_bind,
void *tag) {
requested_call rc;
- grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_OP_COMPLETE);
+ grpc_cq_begin_op(server->unregistered_cq, NULL);
rc.type = BATCH_CALL;
rc.tag = tag;
rc.data.batch.cq_bind = cq_bind;
@@ -1034,7 +1032,7 @@ grpc_call_error grpc_server_request_registered_call(
grpc_completion_queue *cq_bind, void *tag) {
requested_call rc;
registered_method *registered_method = rm;
- grpc_cq_begin_op(registered_method->cq, NULL, GRPC_OP_COMPLETE);
+ grpc_cq_begin_op(registered_method->cq, NULL);
rc.type = REGISTERED_CALL;
rc.tag = tag;
rc.data.registered.cq_bind = cq_bind;
@@ -1046,10 +1044,9 @@ grpc_call_error grpc_server_request_registered_call(
return queue_call_request(server, &rc);
}
-static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
+static void publish_registered_or_batch(grpc_call *call, int success,
void *tag);
-static void publish_was_not_set(grpc_call *call, grpc_op_error status,
- void *tag) {
+static void publish_was_not_set(grpc_call *call, int success, void *tag) {
abort();
}
@@ -1118,24 +1115,23 @@ static void fail_call(grpc_server *server, requested_call *rc) {
case BATCH_CALL:
*rc->data.batch.call = NULL;
rc->data.batch.initial_metadata->count = 0;
- grpc_cq_end_op(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL,
- GRPC_OP_ERROR);
+ grpc_cq_end_op(server->unregistered_cq, rc->tag, NULL, 0);
break;
case REGISTERED_CALL:
*rc->data.registered.call = NULL;
rc->data.registered.initial_metadata->count = 0;
grpc_cq_end_op(rc->data.registered.registered_method->cq, rc->tag, NULL,
- do_nothing, NULL, GRPC_OP_ERROR);
+ 0);
break;
}
}
-static void publish_registered_or_batch(grpc_call *call, grpc_op_error status,
+static void publish_registered_or_batch(grpc_call *call, int success,
void *tag) {
grpc_call_element *elem =
grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
call_data *calld = elem->call_data;
- grpc_cq_end_op(calld->cq_new, tag, call, do_nothing, NULL, status);
+ grpc_cq_end_op(calld->cq_new, tag, call, success);
}
const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {