diff options
author | 2015-05-04 14:53:51 -0700 | |
---|---|---|
committer | 2015-05-04 14:53:51 -0700 | |
commit | 64be9f7a30a4bcb9ce3647f11ba9e06994aa3bb7 (patch) | |
tree | 42a4af35a2fe0f3a79573ff37130fd6b74c55cb9 /src/core | |
parent | c112d146a2dcc5e90d5f5cca10f55f212f9492c6 (diff) |
C Core API cleanup.
Simplify grpc_event into something that can be non-heap allocated.
Deprecate grpc_event_finish.
Remove grpc_op_error - use an int as this is more idiomatic C style.
Diffstat (limited to 'src/core')
-rw-r--r-- | src/core/surface/call.c | 81 | ||||
-rw-r--r-- | src/core/surface/call.h | 3 | ||||
-rw-r--r-- | src/core/surface/completion_queue.c | 93 | ||||
-rw-r--r-- | src/core/surface/completion_queue.h | 14 | ||||
-rw-r--r-- | src/core/surface/event_string.c | 26 | ||||
-rw-r--r-- | src/core/surface/server.c | 28 |
6 files changed, 85 insertions, 160 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index 070be1b25a..3b8d514b4f 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -62,7 +62,7 @@ typedef enum { typedef struct { grpc_ioreq_completion_func on_complete; void *user_data; - grpc_op_error status; + int success; } completed_request; /* See request_set in grpc_call below for a description */ @@ -74,7 +74,7 @@ typedef struct { typedef struct { /* Overall status of the operation: starts OK, may degrade to non-OK */ - grpc_op_error status; + int success; /* Completion function to call at the end of the operation */ grpc_ioreq_completion_func on_complete; void *user_data; @@ -239,7 +239,6 @@ struct grpc_call { y = temp; \ } while (0) -static void do_nothing(void *ignored, grpc_op_error also_ignored) {} static void set_deadline_alarm(grpc_call *call, gpr_timespec deadline); static void call_on_done_recv(void *call, int success); static void call_on_done_send(void *call, int success); @@ -460,7 +459,7 @@ static void unlock(grpc_call *call) { if (completing_requests > 0) { for (i = 0; i < completing_requests; i++) { - completed_requests[i].on_complete(call, completed_requests[i].status, + completed_requests[i].on_complete(call, completed_requests[i].success, completed_requests[i].user_data); } lock(call); @@ -520,7 +519,7 @@ no_details: } static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, - grpc_op_error status) { + int success) { completed_request *cr; gpr_uint8 master_set = call->request_set[op]; reqinfo_master *master; @@ -528,8 +527,8 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, /* ioreq is live: we need to do something */ master = &call->masters[master_set]; master->complete_mask |= 1u << op; - if (status != GRPC_OP_OK) { - master->status = status; + if (!success) { + master->success = 0; } if (master->complete_mask == master->need_mask) { for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) { @@ -540,7 +539,7 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, switch ((grpc_ioreq_op)i) { case GRPC_IOREQ_RECV_MESSAGE: case GRPC_IOREQ_SEND_MESSAGE: - if (master->status == GRPC_OP_OK) { + if (master->success) { call->request_set[i] = REQSET_EMPTY; } else { call->write_state = WRITE_STATE_WRITE_CLOSED; @@ -575,33 +574,31 @@ static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op, } } cr = &call->completed_requests[call->num_completed_requests++]; - cr->status = master->status; + cr->success = master->success; cr->on_complete = master->on_complete; cr->user_data = master->user_data; } } -static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, - grpc_op_error status) { +static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, int success) { if (is_op_live(call, op)) { - finish_live_ioreq_op(call, op, status); + finish_live_ioreq_op(call, op, success); } } static void call_on_done_send(void *pc, int success) { grpc_call *call = pc; - grpc_op_error error = success ? GRPC_OP_OK : GRPC_OP_ERROR; lock(call); if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) { - finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, error); + finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, success); } if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_MESSAGE)) { - finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, error); + finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, success); } if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_CLOSE)) { - finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, error); - finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, error); - finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_OK); + finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, success); + finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, success); + finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1); } call->last_send_contains = 0; call->sending = 0; @@ -720,12 +717,12 @@ static void call_on_done_recv(void *pc, int success) { } finish_read_ops(call); } else { - finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_ERROR); - finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_ERROR); - finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_ERROR); - finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_ERROR); - finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_ERROR); - finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, GRPC_OP_ERROR); + finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, 0); + finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, 0); + finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, 0); + finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, 0); + finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, 0); + finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, 0); } call->recv_ops.nops = 0; unlock(call); @@ -877,7 +874,7 @@ static void finish_read_ops(grpc_call *call) { (NULL == (*call->request_data[GRPC_IOREQ_RECV_MESSAGE].recv_message = grpc_bbq_pop(&call->incoming_queue))); if (!empty) { - finish_live_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); + finish_live_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, 1); empty = grpc_bbq_empty(&call->incoming_queue); } } else { @@ -887,19 +884,19 @@ static void finish_read_ops(grpc_call *call) { switch (call->read_state) { case READ_STATE_STREAM_CLOSED: if (empty) { - finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, GRPC_OP_OK); + finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, 1); } /* fallthrough */ case READ_STATE_READ_CLOSED: if (empty) { - finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, GRPC_OP_OK); + finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, 1); } - finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, GRPC_OP_OK); - finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, GRPC_OP_OK); - finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, GRPC_OP_OK); + finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, 1); + finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, 1); + finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, 1); /* fallthrough */ case READ_STATE_GOT_INITIAL_METADATA: - finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, GRPC_OP_OK); + finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, 1); /* fallthrough */ case READ_STATE_INITIAL: /* do nothing */ @@ -910,13 +907,13 @@ static void finish_read_ops(grpc_call *call) { static void early_out_write_ops(grpc_call *call) { switch (call->write_state) { case WRITE_STATE_WRITE_CLOSED: - finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, GRPC_OP_ERROR); - finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, GRPC_OP_ERROR); - finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, GRPC_OP_ERROR); - finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, GRPC_OP_OK); + finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0); + finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0); + finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0); + finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1); /* fallthrough */ case WRITE_STATE_STARTED: - finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, GRPC_OP_ERROR); + finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0); /* fallthrough */ case WRITE_STATE_INITIAL: /* do nothing */ @@ -957,7 +954,7 @@ static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs, } master = &call->masters[set]; - master->status = GRPC_OP_OK; + master->success = 1; master->need_mask = have_ops; master->complete_mask = 0; master->on_complete = completion; @@ -1144,8 +1141,8 @@ static void set_cancelled_value(grpc_status_code status, void *dest) { *(grpc_status_code *)dest = (status != GRPC_STATUS_OK); } -static void finish_batch(grpc_call *call, grpc_op_error result, void *tag) { - grpc_cq_end_op(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK); +static void finish_batch(grpc_call *call, int success, void *tag) { + grpc_cq_end_op(call->cq, tag, call, 1); } grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, @@ -1159,8 +1156,8 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag); if (nops == 0) { - grpc_cq_begin_op(call->cq, call, GRPC_OP_COMPLETE); - grpc_cq_end_op(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK); + grpc_cq_begin_op(call->cq, call); + grpc_cq_end_op(call->cq, tag, call, 1); return GRPC_CALL_OK; } @@ -1251,7 +1248,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, } } - grpc_cq_begin_op(call->cq, call, GRPC_OP_COMPLETE); + grpc_cq_begin_op(call->cq, call); return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_batch, tag); diff --git a/src/core/surface/call.h b/src/core/surface/call.h index 2d4c7f61e3..30d9c868a3 100644 --- a/src/core/surface/call.h +++ b/src/core/surface/call.h @@ -80,8 +80,7 @@ typedef struct { grpc_ioreq_data data; } grpc_ioreq; -typedef void (*grpc_ioreq_completion_func)(grpc_call *call, - grpc_op_error status, +typedef void (*grpc_ioreq_completion_func)(grpc_call *call, int success, void *user_data); grpc_call *grpc_call_create(grpc_channel *channel, grpc_completion_queue *cq, diff --git a/src/core/surface/completion_queue.c b/src/core/surface/completion_queue.c index 3e9031807e..2f1d81ee84 100644 --- a/src/core/surface/completion_queue.c +++ b/src/core/surface/completion_queue.c @@ -51,8 +51,6 @@ function (on_finish) that is hidden from outside this module */ typedef struct event { grpc_event base; - grpc_event_finish_func on_finish; - void *on_finish_user_data; struct event *queue_next; struct event *queue_prev; struct event *bucket_next; @@ -78,16 +76,8 @@ struct grpc_completion_queue { event *queue; /* Fixed size chained hash table of events for pluck() */ event *buckets[NUM_TAG_BUCKETS]; - -#ifndef NDEBUG - /* Debug support: track which operations are in flight at any given time */ - gpr_atm pending_op_count[GRPC_COMPLETION_DO_NOT_USE]; -#endif }; -/* Default do-nothing on_finish function */ -static void null_on_finish(void *user_data, grpc_op_error error) {} - grpc_completion_queue *grpc_completion_queue_create(void) { grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue)); memset(cc, 0, sizeof(*cc)); @@ -124,15 +114,11 @@ void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc) { members can be filled in. Requires GRPC_POLLSET_MU(&cc->pollset) locked. */ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type, - void *tag, grpc_call *call, - grpc_event_finish_func on_finish, void *user_data) { + void *tag, grpc_call *call) { event *ev = gpr_malloc(sizeof(event)); gpr_uintptr bucket = ((gpr_uintptr)tag) % NUM_TAG_BUCKETS; ev->base.type = type; ev->base.tag = tag; - ev->base.call = call; - ev->on_finish = on_finish ? on_finish : null_on_finish; - ev->on_finish_user_data = user_data; if (cc->queue == NULL) { cc->queue = ev->queue_next = ev->queue_prev = ev; } else { @@ -152,22 +138,15 @@ static event *add_locked(grpc_completion_queue *cc, grpc_completion_type type, return ev; } -void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call, - grpc_completion_type type) { +void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call) { gpr_ref(&cc->refs); if (call) GRPC_CALL_INTERNAL_REF(call, "cq"); -#ifndef NDEBUG - gpr_atm_no_barrier_fetch_add(&cc->pending_op_count[type], 1); -#endif } /* Signal the end of an operation - if this is the last waiting-to-be-queued event, then enter shutdown mode */ static void end_op_locked(grpc_completion_queue *cc, grpc_completion_type type) { -#ifndef NDEBUG - GPR_ASSERT(gpr_atm_full_fetch_add(&cc->pending_op_count[type], -1) > 0); -#endif if (gpr_unref(&cc->refs)) { GPR_ASSERT(!cc->shutdown); GPR_ASSERT(cc->shutdown_called); @@ -176,20 +155,12 @@ static void end_op_locked(grpc_completion_queue *cc, } } -void grpc_cq_end_server_shutdown(grpc_completion_queue *cc, void *tag) { - gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - add_locked(cc, GRPC_SERVER_SHUTDOWN, tag, NULL, NULL, NULL); - end_op_locked(cc, GRPC_SERVER_SHUTDOWN); - gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); -} - void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call, - grpc_event_finish_func on_finish, void *user_data, - grpc_op_error error) { + int success) { event *ev; gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); - ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call, on_finish, user_data); - ev->base.data.op_complete = error; + ev = add_locked(cc, GRPC_OP_COMPLETE, tag, call); + ev->base.success = success; end_op_locked(cc, GRPC_OP_COMPLETE); gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); } @@ -198,15 +169,14 @@ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call, static event *create_shutdown_event(void) { event *ev = gpr_malloc(sizeof(event)); ev->base.type = GRPC_QUEUE_SHUTDOWN; - ev->base.call = NULL; ev->base.tag = NULL; - ev->on_finish = null_on_finish; return ev; } -grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc, - gpr_timespec deadline) { +grpc_event grpc_completion_queue_next(grpc_completion_queue *cc, + gpr_timespec deadline) { event *ev = NULL; + grpc_event ret; gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); for (;;) { @@ -240,12 +210,16 @@ grpc_event *grpc_completion_queue_next(grpc_completion_queue *cc, if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset), GRPC_POLLSET_MU(&cc->pollset), deadline)) { gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); - return NULL; + memset(&ret, 0, sizeof(ret)); + ret.type = GRPC_QUEUE_TIMEOUT; + return ret; } } gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); - GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base); - return &ev->base; + ret = ev->base; + gpr_free(ev); + GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret); + return ret; } static event *pluck_event(grpc_completion_queue *cc, void *tag) { @@ -277,9 +251,10 @@ static event *pluck_event(grpc_completion_queue *cc, void *tag) { return NULL; } -grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, - gpr_timespec deadline) { +grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, + gpr_timespec deadline) { event *ev = NULL; + grpc_event ret; gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset)); for (;;) { @@ -296,12 +271,16 @@ grpc_event *grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag, if (gpr_cv_wait(GRPC_POLLSET_CV(&cc->pollset), GRPC_POLLSET_MU(&cc->pollset), deadline)) { gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); - return NULL; + memset(&ret, 0, sizeof(ret)); + ret.type = GRPC_QUEUE_TIMEOUT; + return ret; } } gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset)); + ret = ev->base; + gpr_free(ev); GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ev->base); - return &ev->base; + return ret; } /* Shutdown simply drops a ref that we reserved at creation time; if we drop @@ -324,30 +303,6 @@ void grpc_completion_queue_destroy(grpc_completion_queue *cc) { grpc_cq_internal_unref(cc); } -void grpc_event_finish(grpc_event *base) { - event *ev = (event *)base; - ev->on_finish(ev->on_finish_user_data, GRPC_OP_OK); - if (ev->base.call) { - GRPC_CALL_INTERNAL_UNREF(ev->base.call, "cq", 1); - } - gpr_free(ev); -} - -void grpc_cq_dump_pending_ops(grpc_completion_queue *cc) { -#ifndef NDEBUG - char tmp[GRPC_COMPLETION_DO_NOT_USE * (1 + GPR_LTOA_MIN_BUFSIZE)]; - char *p = tmp; - int i; - - for (i = 0; i < GRPC_COMPLETION_DO_NOT_USE; i++) { - *p++ = ' '; - p += gpr_ltoa(cc->pending_op_count[i], p); - } - - gpr_log(GPR_INFO, "pending ops:%s", tmp); -#endif -} - grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) { return &cc->pollset; } diff --git a/src/core/surface/completion_queue.h b/src/core/surface/completion_queue.h index a0d7eeaac6..239bfe4aea 100644 --- a/src/core/surface/completion_queue.h +++ b/src/core/surface/completion_queue.h @@ -39,17 +39,12 @@ #include "src/core/iomgr/pollset.h" #include <grpc/grpc.h> -/* A finish func is executed whenever the event consumer calls - grpc_event_finish */ -typedef void (*grpc_event_finish_func)(void *user_data, grpc_op_error error); - void grpc_cq_internal_ref(grpc_completion_queue *cc); void grpc_cq_internal_unref(grpc_completion_queue *cc); /* Flag that an operation is beginning: the completion channel will not finish shutdown until a corrensponding grpc_cq_end_* call is made */ -void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call, - grpc_completion_type type); +void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call); /* grpc_cq_end_* functions pair with a grpc_cq_begin_op @@ -64,16 +59,11 @@ void grpc_cq_begin_op(grpc_completion_queue *cc, grpc_call *call, /* Queue a GRPC_OP_COMPLETED operation */ void grpc_cq_end_op(grpc_completion_queue *cc, void *tag, grpc_call *call, - grpc_event_finish_func on_finish, void *user_data, - grpc_op_error error); - -void grpc_cq_end_server_shutdown(grpc_completion_queue *cc, void *tag); + int success); /* disable polling for some tests */ void grpc_completion_queue_dont_poll_test_only(grpc_completion_queue *cc); -void grpc_cq_dump_pending_ops(grpc_completion_queue *cc); - grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc); void grpc_cq_hack_spin_pollset(grpc_completion_queue *cc); diff --git a/src/core/surface/event_string.c b/src/core/surface/event_string.c index 30bdff6b85..448bb1162b 100644 --- a/src/core/surface/event_string.c +++ b/src/core/surface/event_string.c @@ -40,23 +40,15 @@ static void addhdr(gpr_strvec *buf, grpc_event *ev) { char *tmp; - gpr_asprintf(&tmp, "tag:%p call:%p", ev->tag, (void *)ev->call); + gpr_asprintf(&tmp, "tag:%p", ev->tag); gpr_strvec_add(buf, tmp); } -static const char *errstr(grpc_op_error err) { - switch (err) { - case GRPC_OP_OK: - return "OK"; - case GRPC_OP_ERROR: - return "ERROR"; - } - return "UNKNOWN_UNKNOWN"; -} +static const char *errstr(int success) { return success ? "OK" : "ERROR"; } -static void adderr(gpr_strvec *buf, grpc_op_error err) { +static void adderr(gpr_strvec *buf, int success) { char *tmp; - gpr_asprintf(&tmp, " err=%s", errstr(err)); + gpr_asprintf(&tmp, " %s", errstr(success)); gpr_strvec_add(buf, tmp); } @@ -69,8 +61,8 @@ char *grpc_event_string(grpc_event *ev) { gpr_strvec_init(&buf); switch (ev->type) { - case GRPC_SERVER_SHUTDOWN: - gpr_strvec_add(&buf, gpr_strdup("SERVER_SHUTDOWN")); + case GRPC_QUEUE_TIMEOUT: + gpr_strvec_add(&buf, gpr_strdup("QUEUE_TIMEOUT")); break; case GRPC_QUEUE_SHUTDOWN: gpr_strvec_add(&buf, gpr_strdup("QUEUE_SHUTDOWN")); @@ -78,11 +70,7 @@ char *grpc_event_string(grpc_event *ev) { case GRPC_OP_COMPLETE: gpr_strvec_add(&buf, gpr_strdup("OP_COMPLETE: ")); addhdr(&buf, ev); - adderr(&buf, ev->data.op_complete); - break; - case GRPC_COMPLETION_DO_NOT_USE: - gpr_strvec_add(&buf, gpr_strdup("DO_NOT_USE (this is a bug)")); - addhdr(&buf, ev); + adderr(&buf, ev->success); break; } diff --git a/src/core/surface/server.c b/src/core/surface/server.c index 01644b4471..0d81170dd8 100644 --- a/src/core/surface/server.c +++ b/src/core/surface/server.c @@ -188,8 +188,6 @@ struct call_data { #define SERVER_FROM_CALL_ELEM(elem) \ (((channel_data *)(elem)->channel_data)->server) -static void do_nothing(void *unused, grpc_op_error ignored) {} - static void begin_call(grpc_server *server, call_data *calld, requested_call *rc); static void fail_call(grpc_server *server, requested_call *rc); @@ -538,8 +536,8 @@ static void destroy_call_elem(grpc_call_element *elem) { if (chand->server->shutdown && chand->server->lists[ALL_CALLS] == NULL) { for (i = 0; i < chand->server->num_shutdown_tags; i++) { for (j = 0; j < chand->server->cq_count; j++) { - grpc_cq_end_server_shutdown(chand->server->cqs[j], - chand->server->shutdown_tags[i]); + grpc_cq_end_op(chand->server->cqs[j], chand->server->shutdown_tags[i], + NULL, 1); } } } @@ -817,7 +815,7 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, gpr_mu_lock(&server->mu); if (have_shutdown_tag) { for (i = 0; i < server->cq_count; i++) { - grpc_cq_begin_op(server->cqs[i], NULL, GRPC_SERVER_SHUTDOWN); + grpc_cq_begin_op(server->cqs[i], NULL); } server->shutdown_tags = gpr_realloc(server->shutdown_tags, @@ -867,7 +865,7 @@ static void shutdown_internal(grpc_server *server, gpr_uint8 have_shutdown_tag, if (server->lists[ALL_CALLS] == NULL) { for (i = 0; i < server->num_shutdown_tags; i++) { for (j = 0; j < server->cq_count; j++) { - grpc_cq_end_server_shutdown(server->cqs[j], server->shutdown_tags[i]); + grpc_cq_end_op(server->cqs[j], server->shutdown_tags[i], NULL, 1); } } } @@ -1018,7 +1016,7 @@ grpc_call_error grpc_server_request_call(grpc_server *server, grpc_call **call, grpc_completion_queue *cq_bind, void *tag) { requested_call rc; - grpc_cq_begin_op(server->unregistered_cq, NULL, GRPC_OP_COMPLETE); + grpc_cq_begin_op(server->unregistered_cq, NULL); rc.type = BATCH_CALL; rc.tag = tag; rc.data.batch.cq_bind = cq_bind; @@ -1034,7 +1032,7 @@ grpc_call_error grpc_server_request_registered_call( grpc_completion_queue *cq_bind, void *tag) { requested_call rc; registered_method *registered_method = rm; - grpc_cq_begin_op(registered_method->cq, NULL, GRPC_OP_COMPLETE); + grpc_cq_begin_op(registered_method->cq, NULL); rc.type = REGISTERED_CALL; rc.tag = tag; rc.data.registered.cq_bind = cq_bind; @@ -1046,10 +1044,9 @@ grpc_call_error grpc_server_request_registered_call( return queue_call_request(server, &rc); } -static void publish_registered_or_batch(grpc_call *call, grpc_op_error status, +static void publish_registered_or_batch(grpc_call *call, int success, void *tag); -static void publish_was_not_set(grpc_call *call, grpc_op_error status, - void *tag) { +static void publish_was_not_set(grpc_call *call, int success, void *tag) { abort(); } @@ -1118,24 +1115,23 @@ static void fail_call(grpc_server *server, requested_call *rc) { case BATCH_CALL: *rc->data.batch.call = NULL; rc->data.batch.initial_metadata->count = 0; - grpc_cq_end_op(server->unregistered_cq, rc->tag, NULL, do_nothing, NULL, - GRPC_OP_ERROR); + grpc_cq_end_op(server->unregistered_cq, rc->tag, NULL, 0); break; case REGISTERED_CALL: *rc->data.registered.call = NULL; rc->data.registered.initial_metadata->count = 0; grpc_cq_end_op(rc->data.registered.registered_method->cq, rc->tag, NULL, - do_nothing, NULL, GRPC_OP_ERROR); + 0); break; } } -static void publish_registered_or_batch(grpc_call *call, grpc_op_error status, +static void publish_registered_or_batch(grpc_call *call, int success, void *tag) { grpc_call_element *elem = grpc_call_stack_element(grpc_call_get_call_stack(call), 0); call_data *calld = elem->call_data; - grpc_cq_end_op(calld->cq_new, tag, call, do_nothing, NULL, status); + grpc_cq_end_op(calld->cq_new, tag, call, success); } const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) { |