aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--include/grpc++/impl/codegen/sync_stream.h20
-rw-r--r--src/core/census/grpc_filter.c4
-rw-r--r--src/core/channel/http_client_filter.c4
-rw-r--r--src/core/channel/http_server_filter.c4
-rw-r--r--src/core/channel/subchannel_call_holder.c6
-rw-r--r--src/core/security/server_auth_filter.c4
-rw-r--r--src/core/surface/call.c154
-rw-r--r--src/core/surface/lame_client.c3
-rw-r--r--src/core/surface/server.c4
-rw-r--r--src/core/transport/chttp2/internal.h2
-rw-r--r--src/core/transport/chttp2_transport.c15
-rw-r--r--src/core/transport/transport.c1
-rw-r--r--src/core/transport/transport.h5
-rw-r--r--src/ruby/spec/client_server_spec.rb3
-rw-r--r--test/core/fling/client.c12
-rw-r--r--test/distrib/python/distribtest.py2
16 files changed, 158 insertions, 85 deletions
diff --git a/include/grpc++/impl/codegen/sync_stream.h b/include/grpc++/impl/codegen/sync_stream.h
index 33d25e837c..9ae48bd23d 100644
--- a/include/grpc++/impl/codegen/sync_stream.h
+++ b/include/grpc++/impl/codegen/sync_stream.h
@@ -193,6 +193,15 @@ class ClientWriter : public ClientWriterInterface<W> {
cq_.Pluck(&ops);
}
+ void WaitForInitialMetadata() {
+ GPR_ASSERT(!context_->initial_metadata_received_);
+
+ CallOpSet<CallOpRecvInitialMetadata> ops;
+ ops.RecvInitialMetadata(context_);
+ call_.PerformOps(&ops);
+ cq_.Pluck(&ops); // status ignored
+ }
+
using WriterInterface<W>::Write;
bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
CallOpSet<CallOpSendMessage> ops;
@@ -213,6 +222,9 @@ class ClientWriter : public ClientWriterInterface<W> {
/// Read the final response and wait for the final status.
Status Finish() GRPC_OVERRIDE {
Status status;
+ if (!context_->initial_metadata_received_) {
+ finish_ops_.RecvInitialMetadata(context_);
+ }
finish_ops_.ClientRecvStatus(context_, &status);
call_.PerformOps(&finish_ops_);
GPR_ASSERT(cq_.Pluck(&finish_ops_));
@@ -221,7 +233,8 @@ class ClientWriter : public ClientWriterInterface<W> {
private:
ClientContext* context_;
- CallOpSet<CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage,
+ CallOpClientRecvStatus> finish_ops_;
CompletionQueue cq_;
Call call_;
};
@@ -292,7 +305,10 @@ class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
}
Status Finish() GRPC_OVERRIDE {
- CallOpSet<CallOpClientRecvStatus> ops;
+ CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> ops;
+ if (!context_->initial_metadata_received_) {
+ ops.RecvInitialMetadata(context_);
+ }
Status status;
ops.ClientRecvStatus(context_, &status);
call_.PerformOps(&ops);
diff --git a/src/core/census/grpc_filter.c b/src/core/census/grpc_filter.c
index a8db32b9d5..c8aaf31e2d 100644
--- a/src/core/census/grpc_filter.c
+++ b/src/core/census/grpc_filter.c
@@ -107,8 +107,8 @@ static void server_mutate_op(grpc_call_element *elem,
if (op->recv_initial_metadata) {
/* substitute our callback for the op callback */
calld->recv_initial_metadata = op->recv_initial_metadata;
- calld->on_done_recv = op->on_complete;
- op->on_complete = &calld->finish_recv;
+ calld->on_done_recv = op->recv_initial_metadata_ready;
+ op->recv_initial_metadata_ready = &calld->finish_recv;
}
}
diff --git a/src/core/channel/http_client_filter.c b/src/core/channel/http_client_filter.c
index 43eee046b8..1aa27208c2 100644
--- a/src/core/channel/http_client_filter.c
+++ b/src/core/channel/http_client_filter.c
@@ -127,8 +127,8 @@ static void hc_mutate_op(grpc_call_element *elem,
if (op->recv_initial_metadata != NULL) {
/* substitute our callback for the higher callback */
calld->recv_initial_metadata = op->recv_initial_metadata;
- calld->on_done_recv = op->on_complete;
- op->on_complete = &calld->hc_on_recv;
+ calld->on_done_recv = op->recv_initial_metadata_ready;
+ op->recv_initial_metadata_ready = &calld->hc_on_recv;
}
}
diff --git a/src/core/channel/http_server_filter.c b/src/core/channel/http_server_filter.c
index bb75323933..370f8dbe42 100644
--- a/src/core/channel/http_server_filter.c
+++ b/src/core/channel/http_server_filter.c
@@ -186,8 +186,8 @@ static void hs_mutate_op(grpc_call_element *elem,
if (op->recv_initial_metadata) {
/* substitute our callback for the higher callback */
calld->recv_initial_metadata = op->recv_initial_metadata;
- calld->on_done_recv = op->on_complete;
- op->on_complete = &calld->hs_on_recv;
+ calld->on_done_recv = op->recv_initial_metadata_ready;
+ op->recv_initial_metadata_ready = &calld->hs_on_recv;
}
}
diff --git a/src/core/channel/subchannel_call_holder.c b/src/core/channel/subchannel_call_holder.c
index 3ad9fd9efb..81297c8d44 100644
--- a/src/core/channel/subchannel_call_holder.c
+++ b/src/core/channel/subchannel_call_holder.c
@@ -241,10 +241,8 @@ static void fail_locked(grpc_exec_ctx *exec_ctx,
grpc_subchannel_call_holder *holder) {
size_t i;
for (i = 0; i < holder->waiting_ops_count; i++) {
- grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].on_complete, false,
- NULL);
- grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].recv_message_ready,
- false, NULL);
+ grpc_transport_stream_op_finish_with_failure(exec_ctx,
+ &holder->waiting_ops[i]);
}
holder->waiting_ops_count = 0;
}
diff --git a/src/core/security/server_auth_filter.c b/src/core/security/server_auth_filter.c
index 4c78711387..3d8e5e8d35 100644
--- a/src/core/security/server_auth_filter.c
+++ b/src/core/security/server_auth_filter.c
@@ -176,8 +176,8 @@ static void set_recv_ops_md_callbacks(grpc_call_element *elem,
if (op->recv_initial_metadata != NULL) {
/* substitute our callback for the higher callback */
calld->recv_initial_metadata = op->recv_initial_metadata;
- calld->on_done_recv = op->on_complete;
- op->on_complete = &calld->auth_on_recv;
+ calld->on_done_recv = op->recv_initial_metadata_ready;
+ op->recv_initial_metadata_ready = &calld->auth_on_recv;
calld->transport_op = *op;
}
}
diff --git a/src/core/surface/call.c b/src/core/surface/call.c
index 9495e748b5..1b117aa6b8 100644
--- a/src/core/surface/call.c
+++ b/src/core/surface/call.c
@@ -159,6 +159,9 @@ struct grpc_call {
uint8_t receiving_message;
uint8_t received_final_op;
+ /* have we received initial metadata */
+ bool has_initial_md_been_received;
+
batch_control active_batches[MAX_CONCURRENT_BATCHES];
/* first idx: is_receiving, second idx: is_trailing */
@@ -200,6 +203,7 @@ struct grpc_call {
gpr_slice receiving_slice;
grpc_closure receiving_slice_ready;
grpc_closure receiving_stream_ready;
+ grpc_closure receiving_initial_metadata_ready;
uint32_t test_only_last_message_flags;
union {
@@ -212,6 +216,11 @@ struct grpc_call {
int *cancelled;
} server;
} final_op;
+
+ struct {
+ void *bctlp;
+ bool success;
+ } saved_receiving_stream_ready_ctx;
};
#define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
@@ -993,6 +1002,94 @@ static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
}
}
+static void process_data_after_md(grpc_exec_ctx *exec_ctx, batch_control *bctl,
+ bool success) {
+ grpc_call *call = bctl->call;
+ if (call->receiving_stream == NULL) {
+ *call->receiving_buffer = NULL;
+ call->receiving_message = 0;
+ if (gpr_unref(&bctl->steps_to_complete)) {
+ post_batch_completion(exec_ctx, bctl);
+ }
+ } else if (call->receiving_stream->length >
+ grpc_channel_get_max_message_length(call->channel)) {
+ cancel_with_status(exec_ctx, call, GRPC_STATUS_INTERNAL,
+ "Max message size exceeded");
+ grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
+ call->receiving_stream = NULL;
+ *call->receiving_buffer = NULL;
+ call->receiving_message = 0;
+ if (gpr_unref(&bctl->steps_to_complete)) {
+ post_batch_completion(exec_ctx, bctl);
+ }
+ } else {
+ call->test_only_last_message_flags = call->receiving_stream->flags;
+ if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
+ (call->compression_algorithm > GRPC_COMPRESS_NONE)) {
+ *call->receiving_buffer = grpc_raw_compressed_byte_buffer_create(
+ NULL, 0, call->compression_algorithm);
+ } else {
+ *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0);
+ }
+ grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready,
+ bctl);
+ continue_receiving_slices(exec_ctx, bctl);
+ /* early out */
+ return;
+ }
+}
+
+static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
+ bool success) {
+ batch_control *bctl = bctlp;
+ grpc_call *call = bctl->call;
+
+ gpr_mu_lock(&bctl->call->mu);
+ if (bctl->call->has_initial_md_been_received) {
+ gpr_mu_unlock(&bctl->call->mu);
+ process_data_after_md(exec_ctx, bctlp, success);
+ } else {
+ call->saved_receiving_stream_ready_ctx.bctlp = bctlp;
+ call->saved_receiving_stream_ready_ctx.success = success;
+ gpr_mu_unlock(&bctl->call->mu);
+ }
+}
+
+static void receiving_initial_metadata_ready(grpc_exec_ctx *exec_ctx,
+ void *bctlp, bool success) {
+ batch_control *bctl = bctlp;
+ grpc_call *call = bctl->call;
+
+ gpr_mu_lock(&call->mu);
+
+ grpc_metadata_batch *md =
+ &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
+ grpc_metadata_batch_filter(md, recv_initial_filter, call);
+ call->has_initial_md_been_received = true;
+
+ if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
+ 0 &&
+ !call->is_client) {
+ GPR_TIMER_BEGIN("set_deadline_alarm", 0);
+ set_deadline_alarm(exec_ctx, call, md->deadline);
+ GPR_TIMER_END("set_deadline_alarm", 0);
+ }
+
+ if (call->saved_receiving_stream_ready_ctx.bctlp != NULL) {
+ grpc_closure *saved_rsr_closure = grpc_closure_create(
+ receiving_stream_ready, call->saved_receiving_stream_ready_ctx.bctlp);
+ grpc_exec_ctx_enqueue(exec_ctx, saved_rsr_closure,
+ call->saved_receiving_stream_ready_ctx.success, NULL);
+ call->saved_receiving_stream_ready_ctx.bctlp = NULL;
+ }
+
+ gpr_mu_unlock(&call->mu);
+
+ if (gpr_unref(&bctl->steps_to_complete)) {
+ post_batch_completion(exec_ctx, bctl);
+ }
+}
+
static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
batch_control *bctl = bctlp;
grpc_call *call = bctl->call;
@@ -1011,19 +1108,6 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
grpc_metadata_batch_destroy(
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
}
- if (bctl->recv_initial_metadata) {
- grpc_metadata_batch *md =
- &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
- grpc_metadata_batch_filter(md, recv_initial_filter, call);
-
- if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
- 0 &&
- !call->is_client) {
- GPR_TIMER_BEGIN("set_deadline_alarm", 0);
- set_deadline_alarm(exec_ctx, call, md->deadline);
- GPR_TIMER_END("set_deadline_alarm", 0);
- }
- }
if (bctl->recv_final_op) {
grpc_metadata_batch *md =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
@@ -1065,45 +1149,6 @@ static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, bool success) {
}
}
-static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
- bool success) {
- batch_control *bctl = bctlp;
- grpc_call *call = bctl->call;
-
- if (call->receiving_stream == NULL) {
- *call->receiving_buffer = NULL;
- call->receiving_message = 0;
- if (gpr_unref(&bctl->steps_to_complete)) {
- post_batch_completion(exec_ctx, bctl);
- }
- } else if (call->receiving_stream->length >
- grpc_channel_get_max_message_length(call->channel)) {
- cancel_with_status(exec_ctx, call, GRPC_STATUS_INTERNAL,
- "Max message size exceeded");
- grpc_byte_stream_destroy(exec_ctx, call->receiving_stream);
- call->receiving_stream = NULL;
- *call->receiving_buffer = NULL;
- call->receiving_message = 0;
- if (gpr_unref(&bctl->steps_to_complete)) {
- post_batch_completion(exec_ctx, bctl);
- }
- } else {
- call->test_only_last_message_flags = call->receiving_stream->flags;
- if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
- (call->compression_algorithm > GRPC_COMPRESS_NONE)) {
- *call->receiving_buffer = grpc_raw_compressed_byte_buffer_create(
- NULL, 0, call->compression_algorithm);
- } else {
- *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0);
- }
- grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready,
- bctl);
- continue_receiving_slices(exec_ctx, bctl);
- /* early out */
- return;
- }
-}
-
static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
grpc_call *call, const grpc_op *ops,
size_t nops, void *notify_tag,
@@ -1273,9 +1318,14 @@ static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
}
call->received_initial_metadata = 1;
call->buffered_metadata[0] = op->data.recv_initial_metadata;
+ grpc_closure_init(&call->receiving_initial_metadata_ready,
+ receiving_initial_metadata_ready, bctl);
bctl->recv_initial_metadata = 1;
stream_op.recv_initial_metadata =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
+ stream_op.recv_initial_metadata_ready =
+ &call->receiving_initial_metadata_ready;
+ num_completion_callbacks_needed++;
break;
case GRPC_OP_RECV_MESSAGE:
/* Flag validation: currently allow no flags */
diff --git a/src/core/surface/lame_client.c b/src/core/surface/lame_client.c
index 705996cad3..537069e984 100644
--- a/src/core/surface/lame_client.c
+++ b/src/core/surface/lame_client.c
@@ -78,8 +78,7 @@ static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
} else if (op->recv_trailing_metadata != NULL) {
fill_metadata(elem, op->recv_trailing_metadata);
}
- grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, false, NULL);
- grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, false, NULL);
+ grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
}
static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
diff --git a/src/core/surface/server.c b/src/core/surface/server.c
index 42cffccb4c..fb5e0d4b9e 100644
--- a/src/core/surface/server.c
+++ b/src/core/surface/server.c
@@ -596,8 +596,8 @@ static void server_mutate_op(grpc_call_element *elem,
if (op->recv_initial_metadata != NULL) {
calld->recv_initial_metadata = op->recv_initial_metadata;
- calld->on_done_recv_initial_metadata = op->on_complete;
- op->on_complete = &calld->server_on_recv_initial_metadata;
+ calld->on_done_recv_initial_metadata = op->recv_initial_metadata_ready;
+ op->recv_initial_metadata_ready = &calld->server_on_recv_initial_metadata;
}
}
diff --git a/src/core/transport/chttp2/internal.h b/src/core/transport/chttp2/internal.h
index c611496e7e..0e1e2c4265 100644
--- a/src/core/transport/chttp2/internal.h
+++ b/src/core/transport/chttp2/internal.h
@@ -385,7 +385,7 @@ typedef struct {
grpc_closure *send_trailing_metadata_finished;
grpc_metadata_batch *recv_initial_metadata;
- grpc_closure *recv_initial_metadata_finished;
+ grpc_closure *recv_initial_metadata_ready;
grpc_byte_stream **recv_message;
grpc_closure *recv_message_ready;
grpc_metadata_batch *recv_trailing_metadata;
diff --git a/src/core/transport/chttp2_transport.c b/src/core/transport/chttp2_transport.c
index 9298573c7f..617d98875c 100644
--- a/src/core/transport/chttp2_transport.c
+++ b/src/core/transport/chttp2_transport.c
@@ -544,7 +544,7 @@ static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
GPR_ASSERT(s->global.send_initial_metadata_finished == NULL);
GPR_ASSERT(s->global.send_message_finished == NULL);
GPR_ASSERT(s->global.send_trailing_metadata_finished == NULL);
- GPR_ASSERT(s->global.recv_initial_metadata_finished == NULL);
+ GPR_ASSERT(s->global.recv_initial_metadata_ready == NULL);
GPR_ASSERT(s->global.recv_message_ready == NULL);
GPR_ASSERT(s->global.recv_trailing_metadata_finished == NULL);
grpc_chttp2_data_parser_destroy(exec_ctx, &s->parsing.data_parser);
@@ -863,9 +863,9 @@ static void perform_stream_op_locked(
}
if (op->recv_initial_metadata != NULL) {
- GPR_ASSERT(stream_global->recv_initial_metadata_finished == NULL);
- stream_global->recv_initial_metadata_finished =
- add_closure_barrier(on_complete);
+ GPR_ASSERT(stream_global->recv_initial_metadata_ready == NULL);
+ stream_global->recv_initial_metadata_ready =
+ op->recv_initial_metadata_ready;
stream_global->recv_initial_metadata = op->recv_initial_metadata;
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}
@@ -1009,13 +1009,14 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
grpc_byte_stream *bs;
while (
grpc_chttp2_list_pop_check_read_ops(transport_global, &stream_global)) {
- if (stream_global->recv_initial_metadata_finished != NULL &&
+ if (stream_global->recv_initial_metadata_ready != NULL &&
stream_global->published_initial_metadata) {
grpc_chttp2_incoming_metadata_buffer_publish(
&stream_global->received_initial_metadata,
stream_global->recv_initial_metadata);
- grpc_chttp2_complete_closure_step(
- exec_ctx, &stream_global->recv_initial_metadata_finished, 1);
+ grpc_exec_ctx_enqueue(
+ exec_ctx, stream_global->recv_initial_metadata_ready, true, NULL);
+ stream_global->recv_initial_metadata_ready = NULL;
}
if (stream_global->recv_message_ready != NULL) {
if (stream_global->incoming_frames.head != NULL) {
diff --git a/src/core/transport/transport.c b/src/core/transport/transport.c
index 08d685668c..6e154b629a 100644
--- a/src/core/transport/transport.c
+++ b/src/core/transport/transport.c
@@ -126,6 +126,7 @@ char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
void grpc_transport_stream_op_finish_with_failure(
grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op) {
grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, false, NULL);
+ grpc_exec_ctx_enqueue(exec_ctx, op->recv_initial_metadata_ready, false, NULL);
grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, false, NULL);
}
diff --git a/src/core/transport/transport.h b/src/core/transport/transport.h
index f5cac77adc..8902c5d2f6 100644
--- a/src/core/transport/transport.h
+++ b/src/core/transport/transport.h
@@ -92,6 +92,8 @@ typedef struct grpc_transport_stream_op {
/** Receive initial metadata from the stream, into provided metadata batch. */
grpc_metadata_batch *recv_initial_metadata;
+ /** Should be enqueued when initial metadata is ready to be processed. */
+ grpc_closure *recv_initial_metadata_ready;
/** Receive message data from the stream, into provided byte stream. */
grpc_byte_stream **recv_message;
@@ -103,7 +105,8 @@ typedef struct grpc_transport_stream_op {
grpc_metadata_batch *recv_trailing_metadata;
/** Should be enqueued when all requested operations (excluding recv_message
- which has its own closure) in a given batch have been completed. */
+ and recv_initial_metadata which have their own closures) in a given batch
+ have been completed. */
grpc_closure *on_complete;
/** If != GRPC_STATUS_OK, cancel this stream */
diff --git a/src/ruby/spec/client_server_spec.rb b/src/ruby/spec/client_server_spec.rb
index 594fda1cd3..7ef534571f 100644
--- a/src/ruby/spec/client_server_spec.rb
+++ b/src/ruby/spec/client_server_spec.rb
@@ -1,4 +1,4 @@
-# Copyright 2015, Google Inc.
+# Copyright 2015-2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
@@ -198,6 +198,7 @@ shared_examples 'basic GRPC message delivery is OK' do
# confirm the client can receive the server response and status.
client_ops = {
CallOps::SEND_CLOSE_FROM_CLIENT => nil,
+ CallOps::RECV_INITIAL_METADATA => nil,
CallOps::RECV_MESSAGE => nil,
CallOps::RECV_STATUS_ON_CLIENT => nil
}
diff --git a/test/core/fling/client.c b/test/core/fling/client.c
index 02db681cfd..b36aef3093 100644
--- a/test/core/fling/client.c
+++ b/test/core/fling/client.c
@@ -51,7 +51,7 @@ static grpc_channel *channel;
static grpc_completion_queue *cq;
static grpc_call *call;
static grpc_op ops[6];
-static grpc_op stream_init_op;
+static grpc_op stream_init_ops[2];
static grpc_op stream_step_ops[2];
static grpc_metadata_array initial_metadata_recv;
static grpc_metadata_array trailing_metadata_recv;
@@ -105,13 +105,17 @@ static void step_ping_pong_request(void) {
}
static void init_ping_pong_stream(void) {
+ grpc_metadata_array_init(&initial_metadata_recv);
+
grpc_call_error error;
call = grpc_channel_create_call(channel, NULL, GRPC_PROPAGATE_DEFAULTS, cq,
"/Reflector/reflectStream", "localhost",
gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
- stream_init_op.op = GRPC_OP_SEND_INITIAL_METADATA;
- stream_init_op.data.send_initial_metadata.count = 0;
- error = grpc_call_start_batch(call, &stream_init_op, 1, (void *)1, NULL);
+ stream_init_ops[0].op = GRPC_OP_SEND_INITIAL_METADATA;
+ stream_init_ops[0].data.send_initial_metadata.count = 0;
+ stream_init_ops[1].op = GRPC_OP_RECV_INITIAL_METADATA;
+ stream_init_ops[1].data.recv_initial_metadata = &initial_metadata_recv;
+ error = grpc_call_start_batch(call, stream_init_ops, 2, (void *)1, NULL);
GPR_ASSERT(GRPC_CALL_OK == error);
grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
diff --git a/test/distrib/python/distribtest.py b/test/distrib/python/distribtest.py
index 66c1f88796..dc20688140 100644
--- a/test/distrib/python/distribtest.py
+++ b/test/distrib/python/distribtest.py
@@ -1,4 +1,4 @@
-# Copyright 2015-2016, Google Inc.
+# Copyright 2016, Google Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without