aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/surface/call.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/surface/call.c')
-rw-r--r--src/core/surface/call.c333
1 files changed, 6 insertions, 327 deletions
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index fc07a19894..070be1b25a 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -47,9 +47,6 @@
#include <stdlib.h>
#include <string.h>
-typedef struct legacy_state legacy_state;
-static void destroy_legacy_state(legacy_state *ls);
-
typedef enum { REQ_INITIAL = 0, REQ_READY, REQ_DONE } req_state;
typedef enum {
@@ -226,10 +223,6 @@ struct grpc_call {
gpr_slice_buffer incoming_message;
gpr_uint32 incoming_message_length;
-
- /* Data that the legacy api needs to track. To be deleted at some point
- soon */
- legacy_state *legacy_state;
};
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
@@ -353,9 +346,6 @@ static void destroy_call(void *call, int ignored_success) {
}
grpc_sopb_destroy(&c->send_ops);
grpc_sopb_destroy(&c->recv_ops);
- if (c->legacy_state) {
- destroy_legacy_state(c->legacy_state);
- }
grpc_bbq_destroy(&c->incoming_queue);
gpr_slice_buffer_destroy(&c->incoming_message);
gpr_free(c);
@@ -404,12 +394,6 @@ static void set_status_details(grpc_call *call, status_source source,
call->status[source].details = status;
}
-static grpc_call_error bind_cq(grpc_call *call, grpc_completion_queue *cq) {
- if (call->cq) return GRPC_CALL_ERROR_ALREADY_INVOKED;
- call->cq = cq;
- return GRPC_CALL_OK;
-}
-
static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
gpr_uint8 set = call->request_set[op];
reqinfo_master *master;
@@ -729,6 +713,10 @@ static void call_on_done_recv(void *pc, int success) {
if (call->recv_state == GRPC_STREAM_CLOSED) {
GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED);
call->read_state = READ_STATE_STREAM_CLOSED;
+ if (call->have_alarm) {
+ grpc_alarm_cancel(&call->alarm);
+ call->have_alarm = 0;
+ }
}
finish_read_ops(call);
} else {
@@ -1157,7 +1145,7 @@ static void set_cancelled_value(grpc_status_code status, void *dest) {
}
static void finish_batch(grpc_call *call, grpc_op_error result, void *tag) {
- grpc_cq_end_op_complete(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK);
+ grpc_cq_end_op(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK);
}
grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
@@ -1172,7 +1160,7 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
if (nops == 0) {
grpc_cq_begin_op(call->cq, call, GRPC_OP_COMPLETE);
- grpc_cq_end_op_complete(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK);
+ grpc_cq_end_op(call->cq, tag, call, do_nothing, NULL, GRPC_OP_OK);
return GRPC_CALL_OK;
}
@@ -1268,312 +1256,3 @@ grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
return grpc_call_start_ioreq_and_call_back(call, reqs, out, finish_batch,
tag);
}
-
-/*
- * LEGACY API IMPLEMENTATION
- * All this code will disappear as soon as wrappings are updated
- */
-
-struct legacy_state {
- gpr_uint8 md_out_buffer;
- size_t md_out_count[2];
- size_t md_out_capacity[2];
- grpc_metadata *md_out[2];
- grpc_byte_buffer *msg_out;
-
- /* input buffers */
- grpc_metadata_array initial_md_in;
- grpc_metadata_array trailing_md_in;
-
- size_t details_capacity;
- char *details;
- grpc_status_code status;
-
- char *send_details;
-
- size_t msg_in_read_idx;
- grpc_byte_buffer *msg_in;
-
- void *finished_tag;
-};
-
-static legacy_state *get_legacy_state(grpc_call *call) {
- if (call->legacy_state == NULL) {
- call->legacy_state = gpr_malloc(sizeof(legacy_state));
- memset(call->legacy_state, 0, sizeof(legacy_state));
- }
- return call->legacy_state;
-}
-
-static void destroy_legacy_state(legacy_state *ls) {
- size_t i, j;
- for (i = 0; i < 2; i++) {
- for (j = 0; j < ls->md_out_count[i]; j++) {
- gpr_free((char *)ls->md_out[i][j].key);
- gpr_free((char *)ls->md_out[i][j].value);
- }
- gpr_free(ls->md_out[i]);
- }
- gpr_free(ls->initial_md_in.metadata);
- gpr_free(ls->trailing_md_in.metadata);
- gpr_free(ls->details);
- gpr_free(ls->send_details);
- gpr_free(ls);
-}
-
-grpc_call_error grpc_call_add_metadata_old(grpc_call *call,
- grpc_metadata *metadata,
- gpr_uint32 flags) {
- legacy_state *ls;
- grpc_metadata *mdout;
-
- lock(call);
- ls = get_legacy_state(call);
-
- if (ls->md_out_count[ls->md_out_buffer] ==
- ls->md_out_capacity[ls->md_out_buffer]) {
- ls->md_out_capacity[ls->md_out_buffer] =
- GPR_MAX(ls->md_out_capacity[ls->md_out_buffer] * 3 / 2,
- ls->md_out_capacity[ls->md_out_buffer] + 8);
- ls->md_out[ls->md_out_buffer] = gpr_realloc(
- ls->md_out[ls->md_out_buffer],
- sizeof(grpc_metadata) * ls->md_out_capacity[ls->md_out_buffer]);
- }
- mdout = &ls->md_out[ls->md_out_buffer][ls->md_out_count[ls->md_out_buffer]++];
- mdout->key = gpr_strdup(metadata->key);
- mdout->value = gpr_malloc(metadata->value_length);
- mdout->value_length = metadata->value_length;
- memcpy((char *)mdout->value, metadata->value, metadata->value_length);
-
- unlock(call);
-
- return GRPC_CALL_OK;
-}
-
-static void finish_status(grpc_call *call, grpc_op_error status,
- void *ignored) {
- legacy_state *ls;
-
- lock(call);
- ls = get_legacy_state(call);
- grpc_cq_end_finished(call->cq, ls->finished_tag, call, do_nothing, NULL,
- ls->status, ls->details, ls->trailing_md_in.metadata,
- ls->trailing_md_in.count);
- unlock(call);
-}
-
-static void finish_recv_metadata(grpc_call *call, grpc_op_error status,
- void *tag) {
- legacy_state *ls;
-
- lock(call);
- ls = get_legacy_state(call);
- if (status == GRPC_OP_OK) {
- grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL,
- ls->initial_md_in.count,
- ls->initial_md_in.metadata);
-
- } else {
- grpc_cq_end_client_metadata_read(call->cq, tag, call, do_nothing, NULL, 0,
- NULL);
- }
- unlock(call);
-}
-
-static void finish_send_metadata(grpc_call *call, grpc_op_error status,
- void *tag) {}
-
-grpc_call_error grpc_call_invoke_old(grpc_call *call, grpc_completion_queue *cq,
- void *metadata_read_tag,
- void *finished_tag, gpr_uint32 flags) {
- grpc_ioreq reqs[4];
- legacy_state *ls;
- grpc_call_error err;
-
- grpc_cq_begin_op(cq, call, GRPC_CLIENT_METADATA_READ);
- grpc_cq_begin_op(cq, call, GRPC_FINISHED);
-
- lock(call);
- ls = get_legacy_state(call);
- err = bind_cq(call, cq);
- if (err != GRPC_CALL_OK) goto done;
-
- ls->finished_tag = finished_tag;
-
- reqs[0].op = GRPC_IOREQ_SEND_INITIAL_METADATA;
- reqs[0].data.send_metadata.count = ls->md_out_count[ls->md_out_buffer];
- reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer];
- ls->md_out_buffer++;
- err = start_ioreq(call, reqs, 1, finish_send_metadata, NULL);
- if (err != GRPC_CALL_OK) goto done;
-
- reqs[0].op = GRPC_IOREQ_RECV_INITIAL_METADATA;
- reqs[0].data.recv_metadata = &ls->initial_md_in;
- err = start_ioreq(call, reqs, 1, finish_recv_metadata, metadata_read_tag);
- if (err != GRPC_CALL_OK) goto done;
-
- reqs[0].op = GRPC_IOREQ_RECV_TRAILING_METADATA;
- reqs[0].data.recv_metadata = &ls->trailing_md_in;
- reqs[1].op = GRPC_IOREQ_RECV_STATUS;
- reqs[1].data.recv_status.user_data = &ls->status;
- reqs[1].data.recv_status.set_value = set_status_value_directly;
- reqs[2].op = GRPC_IOREQ_RECV_STATUS_DETAILS;
- reqs[2].data.recv_status_details.details = &ls->details;
- reqs[2].data.recv_status_details.details_capacity = &ls->details_capacity;
- reqs[3].op = GRPC_IOREQ_RECV_CLOSE;
- err = start_ioreq(call, reqs, 4, finish_status, NULL);
- if (err != GRPC_CALL_OK) goto done;
-
-done:
- unlock(call);
- return err;
-}
-
-grpc_call_error grpc_call_server_accept_old(grpc_call *call,
- grpc_completion_queue *cq,
- void *finished_tag) {
- grpc_ioreq reqs[2];
- grpc_call_error err;
- legacy_state *ls;
-
- /* inform the completion queue of an incoming operation (corresponding to
- finished_tag) */
- grpc_cq_begin_op(cq, call, GRPC_FINISHED);
-
- lock(call);
- ls = get_legacy_state(call);
-
- err = bind_cq(call, cq);
- if (err != GRPC_CALL_OK) {
- unlock(call);
- return err;
- }
-
- ls->finished_tag = finished_tag;
-
- reqs[0].op = GRPC_IOREQ_RECV_STATUS;
- reqs[0].data.recv_status.user_data = &ls->status;
- reqs[0].data.recv_status.set_value = set_status_value_directly;
- reqs[1].op = GRPC_IOREQ_RECV_CLOSE;
- err = start_ioreq(call, reqs, 2, finish_status, NULL);
- unlock(call);
- return err;
-}
-
-static void finish_send_initial_metadata(grpc_call *call, grpc_op_error status,
- void *tag) {}
-
-grpc_call_error grpc_call_server_end_initial_metadata_old(grpc_call *call,
- gpr_uint32 flags) {
- grpc_ioreq req;
- grpc_call_error err;
- legacy_state *ls;
-
- lock(call);
- ls = get_legacy_state(call);
- req.op = GRPC_IOREQ_SEND_INITIAL_METADATA;
- req.data.send_metadata.count = ls->md_out_count[ls->md_out_buffer];
- req.data.send_metadata.metadata = ls->md_out[ls->md_out_buffer];
- err = start_ioreq(call, &req, 1, finish_send_initial_metadata, NULL);
- unlock(call);
-
- return err;
-}
-
-static void finish_read_event(void *p, grpc_op_error error) {
- if (p) grpc_byte_buffer_destroy(p);
-}
-
-static void finish_read(grpc_call *call, grpc_op_error error, void *tag) {
- legacy_state *ls;
- grpc_byte_buffer *msg;
- lock(call);
- ls = get_legacy_state(call);
- msg = ls->msg_in;
- grpc_cq_end_read(call->cq, tag, call, finish_read_event, msg, msg);
- unlock(call);
-}
-
-grpc_call_error grpc_call_start_read_old(grpc_call *call, void *tag) {
- legacy_state *ls;
- grpc_ioreq req;
- grpc_call_error err;
-
- grpc_cq_begin_op(call->cq, call, GRPC_READ);
-
- lock(call);
- ls = get_legacy_state(call);
- req.op = GRPC_IOREQ_RECV_MESSAGE;
- req.data.recv_message = &ls->msg_in;
- err = start_ioreq(call, &req, 1, finish_read, tag);
- unlock(call);
- return err;
-}
-
-static void finish_write(grpc_call *call, grpc_op_error status, void *tag) {
- lock(call);
- grpc_byte_buffer_destroy(get_legacy_state(call)->msg_out);
- unlock(call);
- grpc_cq_end_write_accepted(call->cq, tag, call, do_nothing, NULL, status);
-}
-
-grpc_call_error grpc_call_start_write_old(grpc_call *call,
- grpc_byte_buffer *byte_buffer,
- void *tag, gpr_uint32 flags) {
- grpc_ioreq req;
- legacy_state *ls;
- grpc_call_error err;
-
- grpc_cq_begin_op(call->cq, call, GRPC_WRITE_ACCEPTED);
-
- lock(call);
- ls = get_legacy_state(call);
- ls->msg_out = grpc_byte_buffer_copy(byte_buffer);
- req.op = GRPC_IOREQ_SEND_MESSAGE;
- req.data.send_message = ls->msg_out;
- err = start_ioreq(call, &req, 1, finish_write, tag);
- unlock(call);
-
- return err;
-}
-
-static void finish_finish(grpc_call *call, grpc_op_error status, void *tag) {
- grpc_cq_end_finish_accepted(call->cq, tag, call, do_nothing, NULL, status);
-}
-
-grpc_call_error grpc_call_writes_done_old(grpc_call *call, void *tag) {
- grpc_ioreq req;
- grpc_call_error err;
- grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED);
-
- lock(call);
- req.op = GRPC_IOREQ_SEND_CLOSE;
- err = start_ioreq(call, &req, 1, finish_finish, tag);
- unlock(call);
-
- return err;
-}
-
-grpc_call_error grpc_call_start_write_status_old(grpc_call *call,
- grpc_status_code status,
- const char *details,
- void *tag) {
- grpc_ioreq reqs[3];
- grpc_call_error err;
- legacy_state *ls;
- grpc_cq_begin_op(call->cq, call, GRPC_FINISH_ACCEPTED);
-
- lock(call);
- ls = get_legacy_state(call);
- reqs[0].op = GRPC_IOREQ_SEND_TRAILING_METADATA;
- reqs[0].data.send_metadata.count = ls->md_out_count[ls->md_out_buffer];
- reqs[0].data.send_metadata.metadata = ls->md_out[ls->md_out_buffer];
- reqs[1].op = GRPC_IOREQ_SEND_STATUS;
- reqs[1].data.send_status.code = status;
- reqs[1].data.send_status.details = ls->send_details = gpr_strdup(details);
- reqs[2].op = GRPC_IOREQ_SEND_CLOSE;
- err = start_ioreq(call, reqs, 3, finish_finish, tag);
- unlock(call);
-
- return err;
-}