aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/core/ext
diff options
context:
space:
mode:
authorGravatar Makarand Dharmapurikar <makarandd@google.com>2016-08-10 17:47:25 -0700
committerGravatar Makarand Dharmapurikar <makarandd@google.com>2016-08-10 17:47:25 -0700
commitf66a37b63aed77732c8d7d253af89296a23bb72e (patch)
treed92b05857fff50bf5ef780d9a8244b52bf592c2a /src/core/ext
parent35da822b442a60d5e9d0711cc046f9c578d63ad9 (diff)
WIP
Diffstat (limited to 'src/core/ext')
-rw-r--r--src/core/ext/transport/cronet/transport/cronet_transport.c597
1 files changed, 356 insertions, 241 deletions
diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c
index 7e65def4de..1d75603980 100644
--- a/src/core/ext/transport/cronet/transport/cronet_transport.c
+++ b/src/core/ext/transport/cronet/transport/cronet_transport.c
@@ -50,8 +50,17 @@
#include "third_party/objective_c/Cronet/cronet_c_for_grpc.h"
#define GRPC_HEADER_SIZE_IN_BYTES 5
+// maximum ops in a batch. There is not much thinking behind this limit, except
+// that it seems to be enough for most use cases.
+#define MAX_PENDING_OPS 100
+
+#define CRONET_LOG(...) \
+ { \
+ if (grpc_cronet_trace) gpr_log(__VA_ARGS__); \
+ }
-#define CRONET_LOG(...) {if (1) gpr_log(__VA_ARGS__);}
+// TODO (makdharma): Hook up into the wider tracing mechanism
+int grpc_cronet_trace = 1;
enum OP_RESULT {
ACTION_TAKEN_WITH_CALLBACK,
@@ -59,11 +68,10 @@ enum OP_RESULT {
NO_ACTION_POSSIBLE
};
-const char *OP_RESULT_STRING[] = {
- "ACTION_TAKEN_WITH_CALLBACK",
- "ACTION_TAKEN_NO_CALLBACK",
- "NO_ACTION_POSSIBLE"
-};
+// Used for printing debug
+const char *op_result_string[] = {"ACTION_TAKEN_WITH_CALLBACK",
+ "ACTION_TAKEN_NO_CALLBACK",
+ "NO_ACTION_POSSIBLE"};
enum OP_ID {
OP_SEND_INITIAL_METADATA = 0,
@@ -82,32 +90,31 @@ enum OP_ID {
OP_NUM_OPS
};
-const char *op_id_string[] = {
- "OP_SEND_INITIAL_METADATA",
- "OP_SEND_MESSAGE",
- "OP_SEND_TRAILING_METADATA",
- "OP_RECV_MESSAGE",
- "OP_RECV_INITIAL_METADATA",
- "OP_RECV_TRAILING_METADATA",
- "OP_CANCEL_ERROR",
- "OP_ON_COMPLETE",
- "OP_FAILED",
- "OP_SUCCEEDED",
- "OP_CANCELED",
- "OP_RECV_MESSAGE_AND_ON_COMPLETE",
- "OP_READ_REQ_MADE",
- "OP_NUM_OPS"
-};
+const char *op_id_string[] = {"OP_SEND_INITIAL_METADATA",
+ "OP_SEND_MESSAGE",
+ "OP_SEND_TRAILING_METADATA",
+ "OP_RECV_MESSAGE",
+ "OP_RECV_INITIAL_METADATA",
+ "OP_RECV_TRAILING_METADATA",
+ "OP_CANCEL_ERROR",
+ "OP_ON_COMPLETE",
+ "OP_FAILED",
+ "OP_SUCCEEDED",
+ "OP_CANCELED",
+ "OP_RECV_MESSAGE_AND_ON_COMPLETE",
+ "OP_READ_REQ_MADE",
+ "OP_NUM_OPS"};
/* Cronet callbacks */
static void on_request_headers_sent(cronet_bidirectional_stream *);
-static void on_response_headers_received(cronet_bidirectional_stream *,
- const cronet_bidirectional_stream_header_array *,
- const char *);
+static void on_response_headers_received(
+ cronet_bidirectional_stream *,
+ const cronet_bidirectional_stream_header_array *, const char *);
static void on_write_completed(cronet_bidirectional_stream *, const char *);
static void on_read_completed(cronet_bidirectional_stream *, char *, int);
-static void on_response_trailers_received(cronet_bidirectional_stream *,
+static void on_response_trailers_received(
+ cronet_bidirectional_stream *,
const cronet_bidirectional_stream_header_array *);
static void on_succeeded(cronet_bidirectional_stream *);
static void on_failed(cronet_bidirectional_stream *, int);
@@ -120,8 +127,7 @@ static cronet_bidirectional_stream_callback cronet_callbacks = {
on_response_trailers_received,
on_succeeded,
on_failed,
- on_canceled
-};
+ on_canceled};
// Cronet transport object
struct grpc_cronet_transport {
@@ -132,7 +138,7 @@ struct grpc_cronet_transport {
typedef struct grpc_cronet_transport grpc_cronet_transport;
struct read_state {
- // vars to store data coming from cronet
+ /* vars to store data coming from server */
char *read_buffer;
bool length_field_received;
int received_bytes;
@@ -141,15 +147,15 @@ struct read_state {
char grpc_header_bytes[GRPC_HEADER_SIZE_IN_BYTES];
char *payload_field;
- // vars for holding data destined for the application
+ /* vars for holding data destined for the application */
struct grpc_slice_buffer_stream sbs;
gpr_slice_buffer read_slice_buffer;
- // vars for trailing metadata
+ /* vars for trailing metadata */
grpc_chttp2_incoming_metadata_buffer trailing_metadata;
bool trailing_metadata_valid;
- // vars for initial metadata
+ /* vars for initial metadata */
grpc_chttp2_incoming_metadata_buffer initial_metadata;
};
@@ -157,16 +163,13 @@ struct write_state {
char *write_buffer;
};
-// maximum ops in a batch.. There is not much thinking behind this limit, except
-// that it seems to be enough for most use cases.
-#define MAX_PENDING_OPS 100
-
+/* track state of one stream op */
struct op_state {
bool state_op_done[OP_NUM_OPS];
bool state_callback_received[OP_NUM_OPS];
- // data structure for storing data coming from server
+ /* data structure for storing data coming from server */
struct read_state rs;
- // data structure for storing data going to the server
+ /* data structure for storing data going to the server */
struct write_state ws;
};
@@ -174,7 +177,7 @@ struct op_and_state {
grpc_transport_stream_op op;
struct op_state state;
bool done;
- struct stream_obj *s; // Pointer back to the stream object
+ struct stream_obj *s; /* Pointer back to the stream object */
};
struct op_storage {
@@ -190,73 +193,74 @@ struct stream_obj {
grpc_cronet_transport curr_ct;
grpc_stream *curr_gs;
cronet_bidirectional_stream *cbs;
+ cronet_bidirectional_stream_header_array header_array;
- // Used for executing callbacks for ops
+ /* Used for executing callbacks for ops */
grpc_exec_ctx exec_ctx;
- // This holds the state that is at stream level (response and req metadata)
+ /* Stream level state. Some state will be tracked both at stream and stream_op
+ * level */
struct op_state state;
- //struct op_state state;
- // OP storage
+ /* OP storage */
struct op_storage storage;
- // Mutex to protect execute_curr_streaming_op
+ /* Mutex to protect storage */
gpr_mu mu;
};
typedef struct stream_obj stream_obj;
-/* Globals */
-cronet_bidirectional_stream_header_array header_array;
-
-//
static enum OP_RESULT execute_stream_op(struct op_and_state *oas);
-/*************************************************************
- Op Storage
+/*
+ Add a new stream op to op storage.
*/
-
static void add_to_storage(struct stream_obj *s, grpc_transport_stream_op *op) {
+ gpr_mu_lock(&s->mu);
struct op_storage *storage = &s->storage;
GPR_ASSERT(storage->num_pending_ops < MAX_PENDING_OPS);
storage->num_pending_ops++;
CRONET_LOG(GPR_DEBUG, "adding new op @wrptr=%d. %d in the queue.",
- storage->wrptr, storage->num_pending_ops);
- memcpy(&storage->pending_ops[storage->wrptr].op, op, sizeof(grpc_transport_stream_op));
- memset(&storage->pending_ops[storage->wrptr].state, 0, sizeof(storage->pending_ops[storage->wrptr].state));
+ storage->wrptr, storage->num_pending_ops);
+ memcpy(&storage->pending_ops[storage->wrptr].op, op,
+ sizeof(grpc_transport_stream_op));
+ memset(&storage->pending_ops[storage->wrptr].state, 0,
+ sizeof(storage->pending_ops[storage->wrptr].state));
storage->pending_ops[storage->wrptr].done = false;
storage->pending_ops[storage->wrptr].s = s;
- storage->wrptr = (storage->wrptr + 1) % MAX_PENDING_OPS;
+ storage->wrptr = (storage->wrptr + 1) % MAX_PENDING_OPS;
+ gpr_mu_unlock(&s->mu);
}
+/*
+ Cycle through ops and try to take next action. Break when either
+ an action with callback is taken, or no action is possible.
+ This can be executed from the Cronet network thread via cronet callback
+ or on the application supplied thread via the perform_stream_op function.
+*/
static void execute_from_storage(stream_obj *s) {
- // Cycle through ops and try to take next action. Break when either
- // an action with callback is taken, or no action is possible.
- // This can be executed from the Cronet network thread via cronet callback
- // or on the application supplied thread via the perform_stream_op function.
- if (1) {//gpr_mu_lock(&s->mu) == 0) {
- gpr_mu_lock(&s->mu);
- for (int i = 0; i < s->storage.wrptr; ) {
- CRONET_LOG(GPR_DEBUG, "calling execute_stream_op[%d]. done = %d", i, s->storage.pending_ops[i].done);
- if (s->storage.pending_ops[i].done) {
- i++;
- continue;
- }
- enum OP_RESULT result = execute_stream_op(&s->storage.pending_ops[i]);
- CRONET_LOG(GPR_DEBUG, "%s = execute_stream_op[%d]", OP_RESULT_STRING[result], i);
- if (result == NO_ACTION_POSSIBLE) {
- i++;
- } else if (result == ACTION_TAKEN_WITH_CALLBACK) {
- break;
- }
+ gpr_mu_lock(&s->mu);
+ for (int i = 0; i < s->storage.wrptr;) {
+ CRONET_LOG(GPR_DEBUG, "calling execute_stream_op[%d]. done = %d", i,
+ s->storage.pending_ops[i].done);
+ if (s->storage.pending_ops[i].done) {
+ i++;
+ continue;
+ }
+ enum OP_RESULT result = execute_stream_op(&s->storage.pending_ops[i]);
+ CRONET_LOG(GPR_DEBUG, "%s = execute_stream_op[%d]",
+ op_result_string[result], i);
+ if (result == NO_ACTION_POSSIBLE) {
+ i++;
+ } else if (result == ACTION_TAKEN_WITH_CALLBACK) {
+ break;
}
- gpr_mu_unlock(&s->mu);
}
+ gpr_mu_unlock(&s->mu);
grpc_exec_ctx_finish(&s->exec_ctx);
}
-
-/*************************************************************
-Cronet Callback Ipmlementation
+/*
+ Cronet callback
*/
static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
CRONET_LOG(GPR_DEBUG, "on_failed(%p, %d)", stream, net_error);
@@ -267,6 +271,9 @@ static void on_failed(cronet_bidirectional_stream *stream, int net_error) {
execute_from_storage(s);
}
+/*
+ Cronet callback
+*/
static void on_canceled(cronet_bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "on_canceled(%p)", stream);
stream_obj *s = (stream_obj *)stream->annotation;
@@ -276,6 +283,9 @@ static void on_canceled(cronet_bidirectional_stream *stream) {
execute_from_storage(s);
}
+/*
+ Cronet callback
+*/
static void on_succeeded(cronet_bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "on_succeeded(%p)", stream);
stream_obj *s = (stream_obj *)stream->annotation;
@@ -285,22 +295,33 @@ static void on_succeeded(cronet_bidirectional_stream *stream) {
execute_from_storage(s);
}
+/*
+ Cronet callback
+*/
static void on_request_headers_sent(cronet_bidirectional_stream *stream) {
CRONET_LOG(GPR_DEBUG, "W: on_request_headers_sent(%p)", stream);
stream_obj *s = (stream_obj *)stream->annotation;
s->state.state_op_done[OP_SEND_INITIAL_METADATA] = true;
s->state.state_callback_received[OP_SEND_INITIAL_METADATA] = true;
+ /* Free the memory allocated for headers */
+ if (s->header_array.headers) {
+ gpr_free(s->header_array.headers);
+ }
execute_from_storage(s);
}
+/*
+ Cronet callback
+*/
static void on_response_headers_received(
cronet_bidirectional_stream *stream,
const cronet_bidirectional_stream_header_array *headers,
const char *negotiated_protocol) {
CRONET_LOG(GPR_DEBUG, "R: on_response_headers_received(%p, %p, %s)", stream,
- headers, negotiated_protocol);
+ headers, negotiated_protocol);
stream_obj *s = (stream_obj *)stream->annotation;
- memset(&s->state.rs.initial_metadata, 0, sizeof(s->state.rs.initial_metadata));
+ memset(&s->state.rs.initial_metadata, 0,
+ sizeof(s->state.rs.initial_metadata));
grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.initial_metadata);
unsigned int i = 0;
for (i = 0; i < headers->count; i++) {
@@ -314,6 +335,9 @@ static void on_response_headers_received(
execute_from_storage(s);
}
+/*
+ Cronet callback
+*/
static void on_write_completed(cronet_bidirectional_stream *stream,
const char *data) {
stream_obj *s = (stream_obj *)stream->annotation;
@@ -326,17 +350,23 @@ static void on_write_completed(cronet_bidirectional_stream *stream,
execute_from_storage(s);
}
+/*
+ Cronet callback
+*/
static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
int count) {
stream_obj *s = (stream_obj *)stream->annotation;
- CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data, count);
+ CRONET_LOG(GPR_DEBUG, "R: on_read_completed(%p, %p, %d)", stream, data,
+ count);
if (count > 0) {
s->state.rs.received_bytes += count;
s->state.rs.remaining_bytes -= count;
if (s->state.rs.remaining_bytes > 0) {
- //CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read");
- s->state.state_op_done[OP_READ_REQ_MADE] = true; // If at least one read request has been made
- cronet_bidirectional_stream_read(s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes, s->state.rs.remaining_bytes);
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read");
+ s->state.state_op_done[OP_READ_REQ_MADE] = true;
+ cronet_bidirectional_stream_read(
+ s->cbs, s->state.rs.read_buffer + s->state.rs.received_bytes,
+ s->state.rs.remaining_bytes);
} else {
execute_from_storage(s);
}
@@ -344,76 +374,82 @@ static void on_read_completed(cronet_bidirectional_stream *stream, char *data,
}
}
+/*
+ Cronet callback
+*/
static void on_response_trailers_received(
cronet_bidirectional_stream *stream,
const cronet_bidirectional_stream_header_array *trailers) {
CRONET_LOG(GPR_DEBUG, "R: on_response_trailers_received(%p,%p)", stream,
- trailers);
+ trailers);
stream_obj *s = (stream_obj *)stream->annotation;
- memset(&s->state.rs.trailing_metadata, 0, sizeof(s->state.rs.trailing_metadata));
+ memset(&s->state.rs.trailing_metadata, 0,
+ sizeof(s->state.rs.trailing_metadata));
s->state.rs.trailing_metadata_valid = false;
grpc_chttp2_incoming_metadata_buffer_init(&s->state.rs.trailing_metadata);
unsigned int i = 0;
for (i = 0; i < trailers->count; i++) {
CRONET_LOG(GPR_DEBUG, "trailer key=%s, value=%s", trailers->headers[i].key,
- trailers->headers[i].value);
+ trailers->headers[i].value);
grpc_chttp2_incoming_metadata_buffer_add(
- &s->state.rs.trailing_metadata, grpc_mdelem_from_metadata_strings(
- grpc_mdstr_from_string(trailers->headers[i].key),
- grpc_mdstr_from_string(trailers->headers[i].value)));
+ &s->state.rs.trailing_metadata,
+ grpc_mdelem_from_metadata_strings(
+ grpc_mdstr_from_string(trailers->headers[i].key),
+ grpc_mdstr_from_string(trailers->headers[i].value)));
s->state.rs.trailing_metadata_valid = true;
}
s->state.state_callback_received[OP_RECV_TRAILING_METADATA] = true;
execute_from_storage(s);
}
-/*************************************************************
-Utility functions. Can be in their own file
+/*
+ Utility function that takes the data from s->write_slice_buffer and assembles
+ into a contiguous byte stream with 5 byte gRPC header prepended.
*/
-// This function takes the data from s->write_slice_buffer and assembles into
-// a contiguous byte stream with 5 byte gRPC header prepended.
static void create_grpc_frame(gpr_slice_buffer *write_slice_buffer,
- char **pp_write_buffer, int *p_write_buffer_size) {
+ char **pp_write_buffer,
+ int *p_write_buffer_size) {
gpr_slice slice = gpr_slice_buffer_take_first(write_slice_buffer);
size_t length = GPR_SLICE_LENGTH(slice);
- // TODO (makdharma): FREE THIS!! HACK!
*p_write_buffer_size = (int)length + GRPC_HEADER_SIZE_IN_BYTES;
+ /* This is freed in the on_write_completed callback */
char *write_buffer = gpr_malloc(length + GRPC_HEADER_SIZE_IN_BYTES);
*pp_write_buffer = write_buffer;
uint8_t *p = (uint8_t *)write_buffer;
- // Append 5 byte header
+ /* Append 5 byte header */
*p++ = 0;
*p++ = (uint8_t)(length >> 24);
*p++ = (uint8_t)(length >> 16);
*p++ = (uint8_t)(length >> 8);
*p++ = (uint8_t)(length);
- // append actual data
+ /* append actual data */
memcpy(p, GPR_SLICE_START_PTR(slice), length);
}
+/*
+ Convert metadata in a format that Cronet can consume
+*/
static void convert_metadata_to_cronet_headers(
- grpc_linked_mdelem *head,
- const char *host,
- char **pp_url,
- cronet_bidirectional_stream_header **pp_headers,
- size_t *p_num_headers) {
+ grpc_linked_mdelem *head, const char *host, char **pp_url,
+ cronet_bidirectional_stream_header **pp_headers, size_t *p_num_headers) {
grpc_linked_mdelem *curr = head;
- // Walk the linked list and get number of header fields
+ /* Walk the linked list and get number of header fields */
uint32_t num_headers_available = 0;
while (curr != NULL) {
curr = curr->next;
num_headers_available++;
}
- // Allocate enough memory. TODO (makdharma): FREE MEMORY! HACK HACK
- cronet_bidirectional_stream_header *headers =
- (cronet_bidirectional_stream_header *)gpr_malloc(
- sizeof(cronet_bidirectional_stream_header) * num_headers_available);
+ /* Allocate enough memory. It is freed in the on_request_headers_sent callback
+ */
+ cronet_bidirectional_stream_header *headers =
+ (cronet_bidirectional_stream_header *)gpr_malloc(
+ sizeof(cronet_bidirectional_stream_header) * num_headers_available);
*pp_headers = headers;
- // Walk the linked list again, this time copying the header fields.
- // s->num_headers
- // can be less than num_headers_available, as some headers are not used for
- // cronet
+ /* Walk the linked list again, this time copying the header fields.
+ s->num_headers can be less than num_headers_available, as some headers
+ are not used for cronet
+ */
curr = head;
int num_headers = 0;
while (num_headers < num_headers_available) {
@@ -423,15 +459,15 @@ static void convert_metadata_to_cronet_headers(
const char *value = grpc_mdstr_as_c_string(mdelem->value);
if (strcmp(key, ":scheme") == 0 || strcmp(key, ":method") == 0 ||
strcmp(key, ":authority") == 0) {
- // Cronet populates these fields on its own.
+ /* Cronet populates these fields on its own */
continue;
}
if (strcmp(key, ":path") == 0) {
- // Create URL by appending :path value to the hostname
+ /* Create URL by appending :path value to the hostname */
gpr_asprintf(pp_url, "https://%s%s", host, value);
continue;
}
- gpr_log(GPR_DEBUG, "header %s = %s", key, value);
+ CRONET_LOG(GPR_DEBUG, "header %s = %s", key, value);
headers[num_headers].key = key;
headers[num_headers].value = value;
num_headers++;
@@ -453,261 +489,342 @@ static int parse_grpc_header(const uint8_t *data) {
}
/*
-Op Execution
+ Op Execution: Decide if one of the actions contained in the stream op can be
+ executed. This is the heart of the state machine.
*/
-static bool op_can_be_run(grpc_transport_stream_op *curr_op, struct op_state *stream_state, struct op_state *op_state, enum OP_ID op_id) {
- // We use op's own state for most state, except for metadata related callbacks, which
- // are at the stream level. TODO: WTF does this comment mean?
+static bool op_can_be_run(grpc_transport_stream_op *curr_op,
+ struct op_state *stream_state,
+ struct op_state *op_state, enum OP_ID op_id) {
bool result = true;
- // When call is canceled, every op can be run
- if (stream_state->state_op_done[OP_CANCEL_ERROR] || stream_state->state_callback_received[OP_FAILED]) {
+ /* When call is canceled, every op can be run, except under following
+ conditions
+ */
+ if (stream_state->state_op_done[OP_CANCEL_ERROR] ||
+ stream_state->state_callback_received[OP_FAILED]) {
if (op_id == OP_SEND_INITIAL_METADATA) result = false;
if (op_id == OP_SEND_MESSAGE) result = false;
if (op_id == OP_SEND_TRAILING_METADATA) result = false;
if (op_id == OP_CANCEL_ERROR) result = false;
- // already executed
- if (op_id == OP_RECV_INITIAL_METADATA && stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) result = false;
- if (op_id == OP_RECV_MESSAGE && stream_state->state_op_done[OP_RECV_MESSAGE]) result = false;
- if (op_id == OP_RECV_TRAILING_METADATA && stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) result = false;
+ /* already executed */
+ if (op_id == OP_RECV_INITIAL_METADATA &&
+ stream_state->state_op_done[OP_RECV_INITIAL_METADATA])
+ result = false;
+ if (op_id == OP_RECV_MESSAGE &&
+ stream_state->state_op_done[OP_RECV_MESSAGE])
+ result = false;
+ if (op_id == OP_RECV_TRAILING_METADATA &&
+ stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
+ result = false;
} else if (op_id == OP_SEND_INITIAL_METADATA) {
- // already executed
+ /* already executed */
if (stream_state->state_op_done[OP_SEND_INITIAL_METADATA]) result = false;
} else if (op_id == OP_RECV_INITIAL_METADATA) {
- // already executed
+ /* already executed */
if (stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) result = false;
- // we haven't sent headers yet.
- else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) result = false;
- // we haven't received headers yet.
- else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA]) result = false;
+ /* we haven't sent headers yet. */
+ else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
+ result = false;
+ /* we haven't received headers yet. */
+ else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA])
+ result = false;
} else if (op_id == OP_SEND_MESSAGE) {
- // already executed (note we're checking op specific state, not stream state)
+ /* already executed (note we're checking op specific state, not stream
+ state) */
if (op_state->state_op_done[OP_SEND_MESSAGE]) result = false;
- // we haven't sent headers yet.
- else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) result = false;
+ /* we haven't sent headers yet. */
+ else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
+ result = false;
} else if (op_id == OP_RECV_MESSAGE) {
- // already executed
+ /* already executed */
if (op_state->state_op_done[OP_RECV_MESSAGE]) result = false;
- // we haven't received headers yet.
- else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA]) result = false;
+ /* we haven't received headers yet. */
+ else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA])
+ result = false;
} else if (op_id == OP_RECV_TRAILING_METADATA) {
- // already executed
+ /* already executed */
if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) result = false;
- // we have asked for but haven't received message yet.
- else if (stream_state->state_op_done[OP_READ_REQ_MADE] && !stream_state->state_op_done[OP_RECV_MESSAGE]) result = false;
- // we haven't received trailers yet.
- else if (!stream_state->state_callback_received[OP_RECV_TRAILING_METADATA]) result = false;
+ /* we have asked for but haven't received message yet. */
+ else if (stream_state->state_op_done[OP_READ_REQ_MADE] &&
+ !stream_state->state_op_done[OP_RECV_MESSAGE])
+ result = false;
+ /* we haven't received trailers yet. */
+ else if (!stream_state->state_callback_received[OP_RECV_TRAILING_METADATA])
+ result = false;
} else if (op_id == OP_SEND_TRAILING_METADATA) {
- // already executed
+ /* already executed */
if (stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) result = false;
- // we haven't sent initial metadata yet
- else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) result = false;
- // we haven't sent message yet
- // TODO: Streaming Write case is a problem. What if there is an outstanding write (2nd, 3rd,..) present.
- else if (curr_op->send_message && !stream_state->state_op_done[OP_SEND_MESSAGE]) result = false;
- // we haven't got on_write_completed for the send yet
- else if (stream_state->state_op_done[OP_SEND_MESSAGE] && !stream_state->state_callback_received[OP_SEND_MESSAGE]) result = false;
+ /* we haven't sent initial metadata yet */
+ else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
+ result = false;
+ /* we haven't sent message yet */
+ else if (curr_op->send_message &&
+ !stream_state->state_op_done[OP_SEND_MESSAGE])
+ result = false;
+ /* we haven't got on_write_completed for the send yet */
+ else if (stream_state->state_op_done[OP_SEND_MESSAGE] &&
+ !stream_state->state_callback_received[OP_SEND_MESSAGE])
+ result = false;
} else if (op_id == OP_CANCEL_ERROR) {
- // already executed
+ /* already executed */
if (stream_state->state_op_done[OP_CANCEL_ERROR]) result = false;
} else if (op_id == OP_ON_COMPLETE) {
- // already executed (note we're checking op specific state, not stream state)
+ /* already executed (note we're checking op specific state, not stream
+ state) */
if (op_state->state_op_done[OP_ON_COMPLETE]) result = false;
- // Check if every op that was asked for is done.
- else if (curr_op->send_initial_metadata && !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) result = false;
- else if (curr_op->send_message && !stream_state->state_op_done[OP_SEND_MESSAGE]) result = false;
- else if (curr_op->send_message && !stream_state->state_callback_received[OP_SEND_MESSAGE]) result = false;
- else if (curr_op->send_trailing_metadata && !stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) result = false;
- else if (curr_op->recv_initial_metadata && !stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) result = false;
- else if (curr_op->recv_message && !stream_state->state_op_done[OP_RECV_MESSAGE]) result = false;
+ /* Check if every op that was asked for is done. */
+ else if (curr_op->send_initial_metadata &&
+ !stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
+ result = false;
+ else if (curr_op->send_message &&
+ !stream_state->state_op_done[OP_SEND_MESSAGE])
+ result = false;
+ else if (curr_op->send_message &&
+ !stream_state->state_callback_received[OP_SEND_MESSAGE])
+ result = false;
+ else if (curr_op->send_trailing_metadata &&
+ !stream_state->state_op_done[OP_SEND_TRAILING_METADATA])
+ result = false;
+ else if (curr_op->recv_initial_metadata &&
+ !stream_state->state_op_done[OP_RECV_INITIAL_METADATA])
+ result = false;
+ else if (curr_op->recv_message &&
+ !stream_state->state_op_done[OP_RECV_MESSAGE])
+ result = false;
else if (curr_op->recv_trailing_metadata) {
- //if (!stream_state->state_op_done[OP_SUCCEEDED]) result = false; gpr_log(GPR_DEBUG, "HACK!!");
- // We aren't done with trailing metadata yet
- if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) result = false;
- // We've asked for actual message in an earlier op, and it hasn't been delivered yet.
- // (TODO: What happens when multiple messages are asked for? How do you know when last message arrived?)
+ /* We aren't done with trailing metadata yet */
+ if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
+ result = false;
+ /* We've asked for actual message in an earlier op, and it hasn't been
+ delivered yet. */
else if (stream_state->state_op_done[OP_READ_REQ_MADE]) {
- // If this op is not the one asking for read, (which means some earlier op has asked), and the
- // read hasn't been delivered.
- if(!curr_op->recv_message && !stream_state->state_op_done[OP_SUCCEEDED]) result = false;
+ /* If this op is not the one asking for read, (which means some earlier
+ op has asked), and the read hasn't been delivered. */
+ if (!curr_op->recv_message &&
+ !stream_state->state_op_done[OP_SUCCEEDED])
+ result = false;
}
}
- // We should see at least one on_write_completed for the trailers that we sent
- else if (curr_op->send_trailing_metadata && !stream_state->state_callback_received[OP_SEND_MESSAGE]) result = false;
+ /* We should see at least one on_write_completed for the trailers that we
+ sent */
+ else if (curr_op->send_trailing_metadata &&
+ !stream_state->state_callback_received[OP_SEND_MESSAGE])
+ result = false;
}
- CRONET_LOG(GPR_DEBUG, "op_can_be_run %s : %s", op_id_string[op_id], result? "YES":"NO");
+ CRONET_LOG(GPR_DEBUG, "op_can_be_run %s : %s", op_id_string[op_id],
+ result ? "YES" : "NO");
return result;
}
static enum OP_RESULT execute_stream_op(struct op_and_state *oas) {
- // TODO TODO : This can be called from network thread and main thread. add a mutex.
grpc_transport_stream_op *stream_op = &oas->op;
struct stream_obj *s = oas->s;
struct op_state *stream_state = &s->state;
- //CRONET_LOG(GPR_DEBUG, "execute_stream_op");
enum OP_RESULT result = NO_ACTION_POSSIBLE;
- if (stream_op->send_initial_metadata && op_can_be_run(stream_op, stream_state, &oas->state, OP_SEND_INITIAL_METADATA)) {
+ if (stream_op->send_initial_metadata &&
+ op_can_be_run(stream_op, stream_state, &oas->state,
+ OP_SEND_INITIAL_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas);
- // This OP is the beginning. Reset various states
+ /* This OP is the beginning. Reset various states */
memset(&stream_state->rs, 0, sizeof(stream_state->rs));
memset(&stream_state->ws, 0, sizeof(stream_state->ws));
memset(stream_state->state_op_done, 0, sizeof(stream_state->state_op_done));
- memset(stream_state->state_callback_received, 0, sizeof(stream_state->state_callback_received));
- // Start new cronet stream
+ memset(stream_state->state_callback_received, 0,
+ sizeof(stream_state->state_callback_received));
+ /* Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
+ * on_failed */
GPR_ASSERT(s->cbs == NULL);
CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_create");
- s->cbs = cronet_bidirectional_stream_create(s->curr_ct.engine, s->curr_gs, &cronet_callbacks);
+ s->cbs = cronet_bidirectional_stream_create(s->curr_ct.engine, s->curr_gs,
+ &cronet_callbacks);
char *url;
- convert_metadata_to_cronet_headers(stream_op->send_initial_metadata->list.head,
- s->curr_ct.host, &url, &header_array.headers, &header_array.count);
- header_array.capacity = header_array.count;
+ convert_metadata_to_cronet_headers(
+ stream_op->send_initial_metadata->list.head, s->curr_ct.host, &url,
+ &s->header_array.headers, &s->header_array.count);
+ s->header_array.capacity = s->header_array.count;
CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_start %s", url);
- cronet_bidirectional_stream_start(s->cbs, url, 0, "POST", &header_array, false);
+ cronet_bidirectional_stream_start(s->cbs, url, 0, "POST", &s->header_array,
+ false);
stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
result = ACTION_TAKEN_WITH_CALLBACK;
} else if (stream_op->recv_initial_metadata &&
- op_can_be_run(stream_op, stream_state, &oas->state, OP_RECV_INITIAL_METADATA)) {
+ op_can_be_run(stream_op, stream_state, &oas->state,
+ OP_RECV_INITIAL_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_INITIAL_METADATA", oas);
if (!stream_state->state_op_done[OP_CANCEL_ERROR]) {
- grpc_chttp2_incoming_metadata_buffer_publish(&oas->s->state.rs.initial_metadata,
- stream_op->recv_initial_metadata);
- grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_initial_metadata_ready, GRPC_ERROR_NONE, NULL);
+ grpc_chttp2_incoming_metadata_buffer_publish(
+ &oas->s->state.rs.initial_metadata, stream_op->recv_initial_metadata);
+ grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_initial_metadata_ready,
+ GRPC_ERROR_NONE, NULL);
} else {
- grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_initial_metadata_ready, GRPC_ERROR_CANCELLED, NULL);
+ grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_initial_metadata_ready,
+ GRPC_ERROR_CANCELLED, NULL);
}
stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
result = ACTION_TAKEN_NO_CALLBACK;
- } else if (stream_op->send_message && op_can_be_run(stream_op, stream_state, &oas->state, OP_SEND_MESSAGE)) {
+ } else if (stream_op->send_message &&
+ op_can_be_run(stream_op, stream_state, &oas->state,
+ OP_SEND_MESSAGE)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_MESSAGE", oas);
- // TODO (makdharma): Make into a standalone function
gpr_slice_buffer write_slice_buffer;
gpr_slice slice;
gpr_slice_buffer_init(&write_slice_buffer);
grpc_byte_stream_next(NULL, stream_op->send_message, &slice,
stream_op->send_message->length, NULL);
- // Check that compression flag is not ON. We don't support compression yet.
- // TODO (makdharma): add compression support
+ /* Check that compression flag is OFF. We don't support compression yet. */
GPR_ASSERT(stream_op->send_message->flags == 0);
gpr_slice_buffer_add(&write_slice_buffer, slice);
- GPR_ASSERT(write_slice_buffer.count == 1); // Empty request not handled yet
+ GPR_ASSERT(write_slice_buffer.count ==
+ 1); /* Empty request not handled yet */
if (write_slice_buffer.count > 0) {
int write_buffer_size;
- create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer, &write_buffer_size);
- CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p)", stream_state->ws.write_buffer);
+ create_grpc_frame(&write_slice_buffer, &stream_state->ws.write_buffer,
+ &write_buffer_size);
+ CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (%p)",
+ stream_state->ws.write_buffer);
stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
cronet_bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
- write_buffer_size, false); // TODO: What if this is not the last write?
+ write_buffer_size, false);
result = ACTION_TAKEN_WITH_CALLBACK;
}
stream_state->state_op_done[OP_SEND_MESSAGE] = true;
oas->state.state_op_done[OP_SEND_MESSAGE] = true;
- } else if (stream_op->recv_message && op_can_be_run(stream_op, stream_state, &oas->state, OP_RECV_MESSAGE)) {
+ } else if (stream_op->recv_message &&
+ op_can_be_run(stream_op, stream_state, &oas->state,
+ OP_RECV_MESSAGE)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_MESSAGE", oas);
if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
- grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_message_ready, GRPC_ERROR_CANCELLED, NULL);
+ grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_message_ready,
+ GRPC_ERROR_CANCELLED, NULL);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
} else if (stream_state->rs.length_field_received == false) {
- if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES && stream_state->rs.remaining_bytes == 0) {
- // Start a read operation for data
+ if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES &&
+ stream_state->rs.remaining_bytes == 0) {
+ /* Start a read operation for data */
stream_state->rs.length_field_received = true;
stream_state->rs.length_field = stream_state->rs.remaining_bytes =
- parse_grpc_header((const uint8_t *)stream_state->rs.read_buffer);
- CRONET_LOG(GPR_DEBUG, "length field = %d", stream_state->rs.length_field);
+ parse_grpc_header((const uint8_t *)stream_state->rs.read_buffer);
+ CRONET_LOG(GPR_DEBUG, "length field = %d",
+ stream_state->rs.length_field);
if (stream_state->rs.length_field > 0) {
- stream_state->rs.read_buffer = gpr_malloc(stream_state->rs.length_field);
+ stream_state->rs.read_buffer =
+ gpr_malloc(stream_state->rs.length_field);
GPR_ASSERT(stream_state->rs.read_buffer);
stream_state->rs.remaining_bytes = stream_state->rs.length_field;
stream_state->rs.received_bytes = 0;
CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read");
- stream_state->state_op_done[OP_READ_REQ_MADE] = true; // If at least one read request has been made
+ stream_state->state_op_done[OP_READ_REQ_MADE] =
+ true; /* Indicates that at least one read request has been made */
cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
- stream_state->rs.remaining_bytes);
+ stream_state->rs.remaining_bytes);
result = ACTION_TAKEN_WITH_CALLBACK;
} else {
stream_state->rs.remaining_bytes = 0;
CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response.");
gpr_slice_buffer_init(&stream_state->rs.read_slice_buffer);
- grpc_slice_buffer_stream_init(&stream_state->rs.sbs, &stream_state->rs.read_slice_buffer, 0);
- *((grpc_byte_buffer **)stream_op->recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs;
- grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_message_ready, GRPC_ERROR_NONE, NULL);
+ grpc_slice_buffer_stream_init(&stream_state->rs.sbs,
+ &stream_state->rs.read_slice_buffer, 0);
+ *((grpc_byte_buffer **)stream_op->recv_message) =
+ (grpc_byte_buffer *)&stream_state->rs.sbs;
+ grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_message_ready,
+ GRPC_ERROR_NONE, NULL);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
- oas->state.state_op_done[OP_RECV_MESSAGE] = true; // Also set per op state.
+ oas->state.state_op_done[OP_RECV_MESSAGE] = true;
result = ACTION_TAKEN_NO_CALLBACK;
}
} else if (stream_state->rs.remaining_bytes == 0) {
- // Start a read operation for first 5 bytes (GRPC header)
+ /* Start a read operation for first 5 bytes (GRPC header) */
stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
stream_state->rs.received_bytes = 0;
CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_read");
- stream_state->state_op_done[OP_READ_REQ_MADE] = true; // If at least one read request has been made
+ stream_state->state_op_done[OP_READ_REQ_MADE] =
+ true; /* Indicates that at least one read request has been made */
cronet_bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
stream_state->rs.remaining_bytes);
}
result = ACTION_TAKEN_WITH_CALLBACK;
} else if (stream_state->rs.remaining_bytes == 0) {
CRONET_LOG(GPR_DEBUG, "read operation complete");
- gpr_slice read_data_slice = gpr_slice_malloc((uint32_t)stream_state->rs.length_field);
+ gpr_slice read_data_slice =
+ gpr_slice_malloc((uint32_t)stream_state->rs.length_field);
uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice);
- memcpy(dst_p, stream_state->rs.read_buffer, (size_t)stream_state->rs.length_field);
+ memcpy(dst_p, stream_state->rs.read_buffer,
+ (size_t)stream_state->rs.length_field);
gpr_slice_buffer_init(&stream_state->rs.read_slice_buffer);
- gpr_slice_buffer_add(&stream_state->rs.read_slice_buffer, read_data_slice);
- grpc_slice_buffer_stream_init(&stream_state->rs.sbs, &stream_state->rs.read_slice_buffer, 0);
- *((grpc_byte_buffer **)stream_op->recv_message) = (grpc_byte_buffer *)&stream_state->rs.sbs;
- grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_message_ready, GRPC_ERROR_NONE, NULL);
+ gpr_slice_buffer_add(&stream_state->rs.read_slice_buffer,
+ read_data_slice);
+ grpc_slice_buffer_stream_init(&stream_state->rs.sbs,
+ &stream_state->rs.read_slice_buffer, 0);
+ *((grpc_byte_buffer **)stream_op->recv_message) =
+ (grpc_byte_buffer *)&stream_state->rs.sbs;
+ grpc_exec_ctx_sched(&s->exec_ctx, stream_op->recv_message_ready,
+ GRPC_ERROR_NONE, NULL);
stream_state->state_op_done[OP_RECV_MESSAGE] = true;
- oas->state.state_op_done[OP_RECV_MESSAGE] = true; // Also set per op state.
- // Clear read state of the stream, so next read op (if it were to come) will work
- stream_state->rs.received_bytes = stream_state->rs.remaining_bytes = stream_state->rs.length_field_received = 0;
+ oas->state.state_op_done[OP_RECV_MESSAGE] = true;
+ /* Clear read state of the stream, so next read op (if it were to come)
+ * will work */
+ stream_state->rs.received_bytes = stream_state->rs.remaining_bytes =
+ stream_state->rs.length_field_received = 0;
result = ACTION_TAKEN_NO_CALLBACK;
}
} else if (stream_op->recv_trailing_metadata &&
- op_can_be_run(stream_op, stream_state, &oas->state, OP_RECV_TRAILING_METADATA)) {
+ op_can_be_run(stream_op, stream_state, &oas->state,
+ OP_RECV_TRAILING_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_RECV_TRAILING_METADATA", oas);
if (oas->s->state.rs.trailing_metadata_valid) {
grpc_chttp2_incoming_metadata_buffer_publish(
- &oas->s->state.rs.trailing_metadata, stream_op->recv_trailing_metadata);
+ &oas->s->state.rs.trailing_metadata,
+ stream_op->recv_trailing_metadata);
stream_state->rs.trailing_metadata_valid = false;
}
stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
result = ACTION_TAKEN_NO_CALLBACK;
- } else if (stream_op->send_trailing_metadata && op_can_be_run(stream_op, stream_state, &oas->state, OP_SEND_TRAILING_METADATA)) {
+ } else if (stream_op->send_trailing_metadata &&
+ op_can_be_run(stream_op, stream_state, &oas->state,
+ OP_SEND_TRAILING_METADATA)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_TRAILING_METADATA", oas);
CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_write (0)");
stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
cronet_bidirectional_stream_write(s->cbs, "", 0, true);
stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
result = ACTION_TAKEN_WITH_CALLBACK;
- } else if (stream_op->cancel_error && op_can_be_run(stream_op, stream_state, &oas->state, OP_CANCEL_ERROR)) {
+ } else if (stream_op->cancel_error &&
+ op_can_be_run(stream_op, stream_state, &oas->state,
+ OP_CANCEL_ERROR)) {
CRONET_LOG(GPR_DEBUG, "running: %p OP_CANCEL_ERROR", oas);
CRONET_LOG(GPR_DEBUG, "W: cronet_bidirectional_stream_cancel(%p)", s->cbs);
- // Cancel might have come before creation of stream
if (s->cbs) {
cronet_bidirectional_stream_cancel(s->cbs);
}
stream_state->state_op_done[OP_CANCEL_ERROR] = true;
result = ACTION_TAKEN_WITH_CALLBACK;
- } else if (stream_op->on_complete && op_can_be_run(stream_op, stream_state, &oas->state, OP_ON_COMPLETE)) {
- // All ops are complete. Call the on_complete callback
+ } else if (stream_op->on_complete &&
+ op_can_be_run(stream_op, stream_state, &oas->state,
+ OP_ON_COMPLETE)) {
+ /* All actions in this stream_op are complete. Call the on_complete callback
+ */
CRONET_LOG(GPR_DEBUG, "running: %p OP_ON_COMPLETE", oas);
- //CRONET_LOG(GPR_DEBUG, "calling on_complete");
- grpc_exec_ctx_sched(&s->exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE, NULL);
- // Instead of setting stream state, use the op state as on_complete is on per op basis
+ grpc_exec_ctx_sched(&s->exec_ctx, stream_op->on_complete, GRPC_ERROR_NONE,
+ NULL);
oas->state.state_op_done[OP_ON_COMPLETE] = true;
- oas->done = true; // Mark this op as completed
- // reset any send or receive message state.
+ oas->done = true;
+ /* reset any send or receive message state. */
stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
stream_state->state_op_done[OP_SEND_MESSAGE] = false;
result = ACTION_TAKEN_NO_CALLBACK;
- // If this is the on_complete callback being called for a received message - make a note
- if (stream_op->recv_message) stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true;
+ /* If this is the on_complete callback being called for a received message -
+ make a note */
+ if (stream_op->recv_message)
+ stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true;
} else {
- //CRONET_LOG(GPR_DEBUG, "No op ready to run");
result = NO_ACTION_POSSIBLE;
}
return result;
}
-////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+/*
+ Functions used by upper layers to access transport functionality.
+*/
static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
grpc_stream *gs, grpc_stream_refcount *refcount,
@@ -720,7 +837,8 @@ static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
memset(&s->state.rs, 0, sizeof(s->state.rs));
memset(&s->state.ws, 0, sizeof(s->state.ws));
memset(s->state.state_op_done, 0, sizeof(s->state.state_op_done));
- memset(s->state.state_callback_received, 0, sizeof(s->state.state_callback_received));
+ memset(s->state.state_callback_received, 0,
+ sizeof(s->state.state_callback_received));
gpr_mu_init(&s->mu);
s->exec_ctx = *exec_ctx;
return 0;
@@ -744,19 +862,16 @@ static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
}
static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_stream *gs, void *and_free_memory) {
-}
+ grpc_stream *gs, void *and_free_memory) {}
-static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
-}
+static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {}
static char *get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
return NULL;
}
static void perform_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
- grpc_transport_op *op) {
-}
+ grpc_transport_op *op) {}
const grpc_transport_vtable grpc_cronet_vtable = {sizeof(stream_obj),
"cronet_http",