diff options
Diffstat (limited to 'src/core/surface/call.c')
-rw-r--r-- | src/core/surface/call.c | 333 |
1 files changed, 6 insertions, 327 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c index fc07a19894..070be1b25a 100644 --- a/src/core/surface/call.c +++ b/src/core/surface/call.c @@ -47,9 +47,6 @@ #include <stdlib.h> #include <string.h> -typedef struct legacy_state legacy_state; -static void destroy_legacy_state(legacy_state *ls); - typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state; typedef enum { @@ -226,10 +223,6 @@ struct grpc_call { gpr_slice_buffer incoming_message; gpr_uint32 incoming_message_length; - - /* Data that the legacy api needs to track. To be deleted at some point - soon */ - legacy_state *legacy_state; }; #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1)) @@ -353,9 +346,6 @@ static void destroy_call(void *call, int ignored_success) { } grpc_sopb_destroy(&c->send_ops); grpc_sopb_destroy(&c->recv_ops); - if (c->legacy_state) { - destroy_legacy_state(c->legacy_state); - } grpc_bbq_destroy(&c->incoming_queue); gpr_slice_buffer_destroy(&c->incoming_message); gpr_free(c); @@ -404,12 +394,6 @@ static void set_status_details(grpc_call *call, status_source source, call->status[source].details = status; } -static grpc_call_error bind_cq(grpc_call *call, grpc_completion_queue *cq) { - if (call->cq) return GRPC_CALL_ERROR_ALREADY_INVOKED; - call->cq = cq; - return GRPC_CALL_OK; -} - static int is_op_live(grpc_call *call, grpc_ioreq_op op) { gpr_uint8 set = call->request_set[op]; reqinfo_master *master; @@ -729,6 +713,10 @@ static void call_on_done_recv(void *pc, int success) { if (call->recv_state == GRPC_STREAM_CLOSED) { GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED); call->read_state = READ_STATE_STREAM_CLOSED; + if (call->have_alarm) { + grpc_alarm_cancel(&call->alarm); + call->have_alarm = 0; + } } finish_read_ops(call); } else { @@ -1157,7 +1145,7 @@ static void set_cancelled_value(grpc_status_code status, void *dest) { } static void finish_batch(grpc_call *call, grpc_op_error result, void *tag) { - grpc_cq_end_op_complete(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK); + grpc_cq_end_op(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK); } grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, @@ -1172,7 +1160,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, if (nops == 0) { grpc_cq_begin_op(call->cq, call, GRPC_OP_COMPLETE); - grpc_cq_end_op_complete(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK); + grpc_cq_end_op(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK); return GRPC_CALL_OK; } @@ -1268,312 +1256,3 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops, return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_batch, tag); } - -/* - * LEGACY API IMPLEMENTATION - * All this code will disappear as soon as wrappings are updated - */ - -struct legacy_state { - gpr_uint8 md_out_buffer; - size_t md_out_count[2]; - size_t md_out_capacity[2]; - grpc_metadata *md_out[2]; - grpc_byte_buffer *msg_out; - - /* input buffers */ - grpc_metadata_array initial_md_in; - grpc_metadata_array trailing_md_in; - - size_t details_capacity; - char *details; - grpc_status_code status; - - char *send_details; - - size_t msg_in_read_idx; - grpc_byte_buffer *msg_in; - - void *finished_tag; -}; - -static legacy_state *get_legacy_state(grpc_call *call) { - if (call->legacy_state == NULL) { - call->legacy_state = gpr_malloc(sizeof(legacy_state)); - memset(call->legacy_state, 0, sizeof(legacy_state)); - } - return call->legacy_state; -} - -static void destroy_legacy_state(legacy_state *ls) { - size_t i, j; - for (i = 0; i < 2; i++) { - for (j = 0; j < ls->md_out_count[i]; j++) { - gpr_free((char *)ls->md_out[i][j].key); - gpr_free((char *)ls->md_out[i][j].value); - } - gpr_free(ls->md_out[i]); - } - gpr_free(ls->initial_md_in.metadata); - gpr_free(ls->trailing_md_in.metadata); - gpr_free(ls->details); - gpr_free(ls->send_details); - gpr_free(ls); -} - -grpc_call_error grpc_call_add_metadata_old(grpc_call *call, - grpc_metadata *metadata, - gpr_uint32 flags) { - legacy_state *ls; - grpc_metadata *mdout; - - lock(call); - ls = get_legacy_state(call); - - if (ls->md_out_count[ls->md_out_buffer] == - ls->md_out_capacity[ls->md_out_buffer]) { - ls->md_out_capacity[ls->md_out_buffer] = - GPR_MAX(ls->md_out_capacity[ls->md_out_buffer] * 3 / 2, - ls->md_out_capacity[ls->md_out_buffer] + 8); - ls->md_out[ls->md_out_buffer] = gpr_realloc( - ls->md_out[ls->md_out_buffer], - sizeof(grpc_metadata) * ls->md_out_capacity[ls->md_out_buffer]); - } - mdout = &ls->md_out[ls->md_out_buffer][ls->md_out_count[ls->md_out_buffer]++]; - mdout->key = gpr_strdup(metadata->key); - mdout->value = gpr_malloc(metadata->value_length); - mdout->value_length = metadata->value_length; - memcpy((char *)mdout->value, metadata->value, metadata->value_length); - - unlock(call); - - return GRPC_CALL_OK; -} - -static void finish_status(grpc_call *call, grpc_op_error status, - void *ignored) { - legacy_state *ls; - - lock(call); - ls = get_legacy_state(call); - grpc_cq_end_finished(call->cq, ls->finished_tag, call, do_nothing, NULL, - ls->status, ls->details, ls->trailing_md_in.metadata, - ls->trailing_md_in.count); - unlock(call); -} - -static void finish_recv_metadata(grpc_call *call, grpc_op_error status, - void *tag) { - legacy_state *ls; - - lock(call); - ls = get_legacy_state(call); - if (status == GRPC_OP_OK) { - grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, - ls->initial_md_in.count, - ls->initial_md_in.metadata); - - } else { - grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, 0, - NULL); - } - unlock(call); -} - -static void finish_send_metadata(grpc_call *call, grpc_op_error status, - void *tag) {} - -grpc_call_error grpc_call_invoke_old(grpc_call *call, grpc_completion_queue *cq, - void *metadata_read_tag, - void *finished_tag, gpr_uint32 flags) { - grpc_ioreq reqs[4]; - legacy_state *ls; - grpc_call_error err; - - grpc_cq_begin_op(cq, call, GRPC_CLIENT_METADATA_READ); - grpc_cq_begin_op(cq, call, GRPC_FINISHED); - - lock(call); - ls = get_legacy_state(call); - err = bind_cq(call, cq); - if (err != GRPC_CALL_OK) goto done; - - ls->finished_tag = finished_tag; - - reqs[0].op = GRPC_IOREQ_SEND_INITIAL_METADATA; - reqs[0].data.send_metadata.count = ls->md_out_count[ls->md_out_buffer]; - reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer]; - ls->md_out_buffer++; - err = start_ioreq(call, reqs, 1, finish_send_metadata, NULL); - if (err != GRPC_CALL_OK) goto done; - - reqs[0].op = GRPC_IOREQ_RECV_INITIAL_METADATA; - reqs[0].data.recv_metadata = &ls->initial_md_in; - err = start_ioreq(call, reqs, 1, finish_recv_metadata, metadata_read_tag); - if (err != GRPC_CALL_OK) goto done; - - reqs[0].op = GRPC_IOREQ_RECV_TRAILING_METADATA; - reqs[0].data.recv_metadata = &ls->trailing_md_in; - reqs[1].op = GRPC_IOREQ_RECV_STATUS; - reqs[1].data.recv_status.user_data = &ls->status; - reqs[1].data.recv_status.set_value = set_status_value_directly; - reqs[2].op = GRPC_IOREQ_RECV_STATUS_DETAILS; - reqs[2].data.recv_status_details.details = &ls->details; - reqs[2].data.recv_status_details.details_capacity = &ls->details_capacity; - reqs[3].op = GRPC_IOREQ_RECV_CLOSE; - err = start_ioreq(call, reqs, 4, finish_status, NULL); - if (err != GRPC_CALL_OK) goto done; - -done: - unlock(call); - return err; -} - -grpc_call_error grpc_call_server_accept_old(grpc_call *call, - grpc_completion_queue *cq, - void *finished_tag) { - grpc_ioreq reqs[2]; - grpc_call_error err; - legacy_state *ls; - - /* inform the completion queue of an incoming operation (corresponding to - finished_tag) */ - grpc_cq_begin_op(cq, call, GRPC_FINISHED); - - lock(call); - ls = get_legacy_state(call); - - err = bind_cq(call, cq); - if (err != GRPC_CALL_OK) { - unlock(call); - return err; - } - - ls->finished_tag = finished_tag; - - reqs[0].op = GRPC_IOREQ_RECV_STATUS; - reqs[0].data.recv_status.user_data = &ls->status; - reqs[0].data.recv_status.set_value = set_status_value_directly; - reqs[1].op = GRPC_IOREQ_RECV_CLOSE; - err = start_ioreq(call, reqs, 2, finish_status, NULL); - unlock(call); - return err; -} - -static void finish_send_initial_metadata(grpc_call *call, grpc_op_error status, - void *tag) {} - -grpc_call_error grpc_call_server_end_initial_metadata_old(grpc_call *call, - gpr_uint32 flags) { - grpc_ioreq req; - grpc_call_error err; - legacy_state *ls; - - lock(call); - ls = get_legacy_state(call); - req.op = GRPC_IOREQ_SEND_INITIAL_METADATA; - req.data.send_metadata.count = ls->md_out_count[ls->md_out_buffer]; - req.data.send_metadata.metadata = ls->md_out[ls->md_out_buffer]; - err = start_ioreq(call, &req, 1, finish_send_initial_metadata, NULL); - unlock(call); - - return err; -} - -static void finish_read_event(void *p, grpc_op_error error) { - if (p) grpc_byte_buffer_destroy(p); -} - -static void finish_read(grpc_call *call, grpc_op_error error, void *tag) { - legacy_state *ls; - grpc_byte_buffer *msg; - lock(call); - ls = get_legacy_state(call); - msg = ls->msg_in; - grpc_cq_end_read(call->cq, tag, call, finish_read_event, msg, msg); - unlock(call); -} - -grpc_call_error grpc_call_start_read_old(grpc_call *call, void *tag) { - legacy_state *ls; - grpc_ioreq req; - grpc_call_error err; - - grpc_cq_begin_op(call->cq, call, GRPC_READ); - - lock(call); - ls = get_legacy_state(call); - req.op = GRPC_IOREQ_RECV_MESSAGE; - req.data.recv_message = &ls->msg_in; - err = start_ioreq(call, &req, 1, finish_read, tag); - unlock(call); - return err; -} - -static void finish_write(grpc_call *call, grpc_op_error status, void *tag) { - lock(call); - grpc_byte_buffer_destroy(get_legacy_state(call)->msg_out); - unlock(call); - grpc_cq_end_write_accepted(call->cq, tag, call, do_nothing, NULL, status); -} - -grpc_call_error grpc_call_start_write_old(grpc_call *call, - grpc_byte_buffer *byte_buffer, - void *tag, gpr_uint32 flags) { - grpc_ioreq req; - legacy_state *ls; - grpc_call_error err; - - grpc_cq_begin_op(call->cq, call, GRPC_WRITE_ACCEPTED); - - lock(call); - ls = get_legacy_state(call); - ls->msg_out = grpc_byte_buffer_copy(byte_buffer); - req.op = GRPC_IOREQ_SEND_MESSAGE; - req.data.send_message = ls->msg_out; - err = start_ioreq(call, &req, 1, finish_write, tag); - unlock(call); - - return err; -} - -static void finish_finish(grpc_call *call, grpc_op_error status, void *tag) { - grpc_cq_end_finish_accepted(call->cq, tag, call, do_nothing, NULL, status); -} - -grpc_call_error grpc_call_writes_done_old(grpc_call *call, void *tag) { - grpc_ioreq req; - grpc_call_error err; - grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED); - - lock(call); - req.op = GRPC_IOREQ_SEND_CLOSE; - err = start_ioreq(call, &req, 1, finish_finish, tag); - unlock(call); - - return err; -} - -grpc_call_error grpc_call_start_write_status_old(grpc_call *call, - grpc_status_code status, - const char *details, - void *tag) { - grpc_ioreq reqs[3]; - grpc_call_error err; - legacy_state *ls; - grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED); - - lock(call); - ls = get_legacy_state(call); - reqs[0].op = GRPC_IOREQ_SEND_TRAILING_METADATA; - reqs[0].data.send_metadata.count = ls->md_out_count[ls->md_out_buffer]; - reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer]; - reqs[1].op = GRPC_IOREQ_SEND_STATUS; - reqs[1].data.send_status.code = status; - reqs[1].data.send_status.details = ls->send_details = gpr_strdup(details); - reqs[2].op = GRPC_IOREQ_SEND_CLOSE; - err = start_ioreq(call, reqs, 3, finish_finish, tag); - unlock(call); - - return err; -} |