diff options
author | Yuchen Zeng <zyc@google.com> | 2016-10-10 15:28:50 -0700 |
---|---|---|
committer | Yuchen Zeng <zyc@google.com> | 2016-10-10 15:28:50 -0700 |
commit | 3efcb48c2621403e7cd89cc347e4fdcc501d8ed3 (patch) | |
tree | fa4c536b13ffeaa20a70a3860ffa1e02e6863f0d /src/core/ext | |
parent | ac8bc42c8f8f835047550e70850e1f5fb2d56f97 (diff) | |
parent | 062ba7b8baefdc76f74fffb9aa3e2134ba047ea6 (diff) |
Merge remote-tracking branch 'upstream/master' into call_holder_add_pollent
Diffstat (limited to 'src/core/ext')
-rw-r--r-- | src/core/ext/client_config/client_channel.c | 4 | ||||
-rw-r--r-- | src/core/ext/client_config/lb_policy.h | 23 | ||||
-rw-r--r-- | src/core/ext/lb_policy/grpclb/grpclb.c | 2 | ||||
-rw-r--r-- | src/core/ext/transport/cronet/transport/cronet_transport.c | 33 |
4 files changed, 44 insertions, 18 deletions
diff --git a/src/core/ext/client_config/client_channel.c b/src/core/ext/client_config/client_channel.c index a332f47413..cbf79afa17 100644 --- a/src/core/ext/client_config/client_channel.c +++ b/src/core/ext/client_config/client_channel.c @@ -111,10 +111,10 @@ static void set_channel_connectivity_state_locked(grpc_exec_ctx *exec_ctx, if ((state == GRPC_CHANNEL_TRANSIENT_FAILURE || state == GRPC_CHANNEL_SHUTDOWN) && chand->lb_policy != NULL) { - /* cancel fail-fast picks */ + /* cancel picks with wait_for_ready=false */ grpc_lb_policy_cancel_picks( exec_ctx, chand->lb_policy, - /* mask= */ GRPC_INITIAL_METADATA_IGNORE_CONNECTIVITY, + /* mask= */ GRPC_INITIAL_METADATA_WAIT_FOR_READY, /* check= */ 0, GRPC_ERROR_REF(error)); } grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state, error, diff --git a/src/core/ext/client_config/lb_policy.h b/src/core/ext/client_config/lb_policy.h index ef228d0ebc..de424cd105 100644 --- a/src/core/ext/client_config/lb_policy.h +++ b/src/core/ext/client_config/lb_policy.h @@ -139,16 +139,19 @@ void grpc_lb_policy_weak_unref(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy); void grpc_lb_policy_init(grpc_lb_policy *policy, const grpc_lb_policy_vtable *vtable); -/** Find an appropriate target for this call, based on \a pick_args. - Picking can be synchronous or asynchronous. In the synchronous case, when a - pick is readily available, it'll be returned in \a target and a non-zero - value will be returned. - In the asynchronous case, zero is returned and \a on_complete will be called - once \a target and \a user_data have been set. Any IO should be done under - the \a interested_parties \a grpc_pollset_set in the \a grpc_lb_policy - struct. The opaque \a user_data output argument corresponds to information - that may need be propagated from the LB policy. It may be NULL. Errors are - signaled receiving a NULL \a *target. */ +/** Finds an appropriate subchannel for a call, based on \a pick_args. + + \a target will be set to the selected subchannel, or NULL on failure. + Upon success, \a user_data will be set to whatever opaque information + may need to be propagated from the LB policy, or NULL if not needed. + + If the pick succeeds and a result is known immediately, a non-zero + value will be returned. Otherwise, \a on_complete will be invoked + once the pick is complete with its error argument set to indicate + success or failure. + + Any IO should be done under the \a interested_parties \a grpc_pollset_set + in the \a grpc_lb_policy struct. */ int grpc_lb_policy_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *policy, const grpc_lb_policy_pick_args *pick_args, grpc_connected_subchannel **target, void **user_data, diff --git a/src/core/ext/lb_policy/grpclb/grpclb.c b/src/core/ext/lb_policy/grpclb/grpclb.c index 21074b5d31..29d848589a 100644 --- a/src/core/ext/lb_policy/grpclb/grpclb.c +++ b/src/core/ext/lb_policy/grpclb/grpclb.c @@ -760,7 +760,7 @@ static int glb_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, GRPC_ERROR_CREATE("No mdelem storage for the LB token. Load reporting " "won't work without it. Failing"), NULL); - return 1; + return 0; } glb_lb_policy *glb_policy = (glb_lb_policy *)pol; diff --git a/src/core/ext/transport/cronet/transport/cronet_transport.c b/src/core/ext/transport/cronet/transport/cronet_transport.c index 366690acf2..25ad40b935 100644 --- a/src/core/ext/transport/cronet/transport/cronet_transport.c +++ b/src/core/ext/transport/cronet/transport/cronet_transport.c @@ -239,6 +239,14 @@ static const char *op_id_string(enum e_op_id i) { return "UNKNOWN"; } +static void free_read_buffer(stream_obj *s) { + if (s->state.rs.read_buffer && + s->state.rs.read_buffer != s->state.rs.grpc_header_bytes) { + gpr_free(s->state.rs.read_buffer); + s->state.rs.read_buffer = NULL; + } +} + /* Add a new stream op to op storage. */ @@ -341,6 +349,7 @@ static void on_failed(cronet_bidirectional_stream *stream, int net_error) { gpr_free(s->state.ws.write_buffer); s->state.ws.write_buffer = NULL; } + free_read_buffer(s); gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -363,6 +372,7 @@ static void on_canceled(cronet_bidirectional_stream *stream) { gpr_free(s->state.ws.write_buffer); s->state.ws.write_buffer = NULL; } + free_read_buffer(s); gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -377,6 +387,7 @@ static void on_succeeded(cronet_bidirectional_stream *stream) { cronet_bidirectional_stream_destroy(s->cbs); s->state.state_callback_received[OP_SUCCEEDED] = true; s->cbs = NULL; + free_read_buffer(s); gpr_mu_unlock(&s->mu); execute_from_storage(s); } @@ -531,7 +542,8 @@ static void create_grpc_frame(gpr_slice_buffer *write_slice_buffer, */ static void convert_metadata_to_cronet_headers( grpc_linked_mdelem *head, const char *host, char **pp_url, - cronet_bidirectional_stream_header **pp_headers, size_t *p_num_headers) { + cronet_bidirectional_stream_header **pp_headers, size_t *p_num_headers, + const char **method) { grpc_linked_mdelem *curr = head; /* Walk the linked list and get number of header fields */ size_t num_headers_available = 0; @@ -558,11 +570,20 @@ static void convert_metadata_to_cronet_headers( curr = curr->next; const char *key = grpc_mdstr_as_c_string(mdelem->key); const char *value = grpc_mdstr_as_c_string(mdelem->value); - if (mdelem->key == GRPC_MDSTR_METHOD || mdelem->key == GRPC_MDSTR_SCHEME || + if (mdelem->key == GRPC_MDSTR_SCHEME || mdelem->key == GRPC_MDSTR_AUTHORITY) { /* Cronet populates these fields on its own */ continue; } + if (mdelem->key == GRPC_MDSTR_METHOD) { + if (mdelem->value == GRPC_MDSTR_PUT) { + *method = "PUT"; + } else { + /* POST method in default*/ + *method = "POST"; + } + continue; + } if (mdelem->key == GRPC_MDSTR_PATH) { /* Create URL by appending :path value to the hostname */ gpr_asprintf(pp_url, "https://%s%s", host, value); @@ -759,15 +780,16 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, s->cbs = cronet_bidirectional_stream_create(s->curr_ct.engine, s->curr_gs, &cronet_callbacks); CRONET_LOG(GPR_DEBUG, "%p = cronet_bidirectional_stream_create()", s->cbs); - char *url; + char *url = NULL; + const char *method = "POST"; s->header_array.headers = NULL; convert_metadata_to_cronet_headers( stream_op->send_initial_metadata->list.head, s->curr_ct.host, &url, - &s->header_array.headers, &s->header_array.count); + &s->header_array.headers, &s->header_array.count, &method); s->header_array.capacity = s->header_array.count; CRONET_LOG(GPR_DEBUG, "cronet_bidirectional_stream_start(%p, %s)", s->cbs, url); - cronet_bidirectional_stream_start(s->cbs, url, 0, "POST", &s->header_array, + cronet_bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false); stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true; result = ACTION_TAKEN_WITH_CALLBACK; @@ -901,6 +923,7 @@ static enum e_op_result execute_stream_op(grpc_exec_ctx *exec_ctx, uint8_t *dst_p = GPR_SLICE_START_PTR(read_data_slice); memcpy(dst_p, stream_state->rs.read_buffer, (size_t)stream_state->rs.length_field); + free_read_buffer(s); gpr_slice_buffer_init(&stream_state->rs.read_slice_buffer); gpr_slice_buffer_add(&stream_state->rs.read_slice_buffer, read_data_slice); |