aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--Makefile60
-rw-r--r--build.json28
-rw-r--r--examples/tips/publisher_test.cc2
-rw-r--r--examples/tips/subscriber_test.cc2
-rw-r--r--include/grpc++/async_server_context.h95
-rw-r--r--include/grpc++/call.h (renamed from test/cpp/end2end/async_test_server.h)80
-rw-r--r--include/grpc++/channel_interface.h24
-rw-r--r--include/grpc++/completion_queue.h69
-rw-r--r--include/grpc++/config.h3
-rw-r--r--include/grpc++/impl/client_unary_call.h67
-rw-r--r--include/grpc++/impl/rpc_method.h4
-rw-r--r--include/grpc++/impl/rpc_service_method.h27
-rw-r--r--include/grpc++/impl/service_type.h (renamed from include/grpc++/async_server.h)41
-rw-r--r--include/grpc++/server.h13
-rw-r--r--include/grpc++/server_builder.h9
-rw-r--r--include/grpc++/server_context.h8
-rw-r--r--include/grpc++/stream.h408
-rw-r--r--include/grpc/support/cpu.h (renamed from src/core/support/cpu.h)8
-rw-r--r--src/compiler/cpp_generator.cc200
-rw-r--r--src/core/statistics/census_log.c2
-rw-r--r--src/core/support/cpu_linux.c2
-rw-r--r--src/cpp/client/channel.cc113
-rw-r--r--src/cpp/client/channel.h17
-rw-r--r--src/cpp/client/client_unary_call.cc (renamed from src/cpp/server/server_rpc_handler.h)46
-rw-r--r--src/cpp/common/call.cc (renamed from include/grpc++/stream_context_interface.h)31
-rw-r--r--src/cpp/common/completion_queue.cc89
-rw-r--r--src/cpp/server/async_server.cc89
-rw-r--r--src/cpp/server/server.cc85
-rw-r--r--src/cpp/server/server_builder.cc38
-rw-r--r--src/cpp/server/server_rpc_handler.cc140
-rw-r--r--src/cpp/server/thread_pool.h4
-rw-r--r--src/cpp/stream/stream_context.cc179
-rw-r--r--src/cpp/stream/stream_context.h99
-rw-r--r--test/core/statistics/census_log_tests.c2
-rw-r--r--test/cpp/end2end/async_test_server.cc154
-rw-r--r--test/cpp/end2end/end2end_test.cc14
-rw-r--r--test/cpp/end2end/sync_client_async_server_test.cc236
-rw-r--r--test/cpp/interop/client.cc10
-rw-r--r--test/cpp/interop/server.cc2
-rw-r--r--test/cpp/qps/server.cc2
-rw-r--r--tools/run_tests/tests.json4
41 files changed, 913 insertions, 1593 deletions
diff --git a/Makefile b/Makefile
index 1547b49616..3a61f79fb3 100644
--- a/Makefile
+++ b/Makefile
@@ -391,7 +391,6 @@ qps_client: bins/$(CONFIG)/qps_client
qps_server: bins/$(CONFIG)/qps_server
ruby_plugin: bins/$(CONFIG)/ruby_plugin
status_test: bins/$(CONFIG)/status_test
-sync_client_async_server_test: bins/$(CONFIG)/sync_client_async_server_test
thread_pool_test: bins/$(CONFIG)/thread_pool_test
tips_client: bins/$(CONFIG)/tips_client
tips_publisher_test: bins/$(CONFIG)/tips_publisher_test
@@ -725,7 +724,7 @@ buildtests: buildtests_c buildtests_cxx
buildtests_c: privatelibs_c bins/$(CONFIG)/alarm_heap_test bins/$(CONFIG)/alarm_list_test bins/$(CONFIG)/alarm_test bins/$(CONFIG)/alpn_test bins/$(CONFIG)/bin_encoder_test bins/$(CONFIG)/census_hash_table_test bins/$(CONFIG)/census_statistics_multiple_writers_circular_buffer_test bins/$(CONFIG)/census_statistics_multiple_writers_test bins/$(CONFIG)/census_statistics_performance_test bins/$(CONFIG)/census_statistics_quick_test bins/$(CONFIG)/census_statistics_small_log_test bins/$(CONFIG)/census_stub_test bins/$(CONFIG)/census_window_stats_test bins/$(CONFIG)/chttp2_status_conversion_test bins/$(CONFIG)/chttp2_stream_encoder_test bins/$(CONFIG)/chttp2_stream_map_test bins/$(CONFIG)/chttp2_transport_end2end_test bins/$(CONFIG)/dualstack_socket_test bins/$(CONFIG)/echo_client bins/$(CONFIG)/echo_server bins/$(CONFIG)/echo_test bins/$(CONFIG)/fd_posix_test bins/$(CONFIG)/fling_client bins/$(CONFIG)/fling_server bins/$(CONFIG)/fling_stream_test bins/$(CONFIG)/fling_test bins/$(CONFIG)/gpr_cancellable_test bins/$(CONFIG)/gpr_cmdline_test bins/$(CONFIG)/gpr_env_test bins/$(CONFIG)/gpr_file_test bins/$(CONFIG)/gpr_histogram_test bins/$(CONFIG)/gpr_host_port_test bins/$(CONFIG)/gpr_log_test bins/$(CONFIG)/gpr_slice_buffer_test bins/$(CONFIG)/gpr_slice_test bins/$(CONFIG)/gpr_string_test bins/$(CONFIG)/gpr_sync_test bins/$(CONFIG)/gpr_thd_test bins/$(CONFIG)/gpr_time_test bins/$(CONFIG)/gpr_useful_test bins/$(CONFIG)/grpc_base64_test bins/$(CONFIG)/grpc_byte_buffer_reader_test bins/$(CONFIG)/grpc_channel_stack_test bins/$(CONFIG)/grpc_completion_queue_test bins/$(CONFIG)/grpc_credentials_test bins/$(CONFIG)/grpc_json_token_test bins/$(CONFIG)/grpc_stream_op_test bins/$(CONFIG)/hpack_parser_test bins/$(CONFIG)/hpack_table_test bins/$(CONFIG)/httpcli_format_request_test bins/$(CONFIG)/httpcli_parser_test bins/$(CONFIG)/httpcli_test bins/$(CONFIG)/json_rewrite bins/$(CONFIG)/json_rewrite_test bins/$(CONFIG)/json_test bins/$(CONFIG)/lame_client_test bins/$(CONFIG)/message_compress_test bins/$(CONFIG)/metadata_buffer_test bins/$(CONFIG)/murmur_hash_test bins/$(CONFIG)/no_server_test bins/$(CONFIG)/poll_kick_posix_test bins/$(CONFIG)/resolve_address_test bins/$(CONFIG)/secure_endpoint_test bins/$(CONFIG)/sockaddr_utils_test bins/$(CONFIG)/tcp_client_posix_test bins/$(CONFIG)/tcp_posix_test bins/$(CONFIG)/tcp_server_posix_test bins/$(CONFIG)/time_averaged_stats_test bins/$(CONFIG)/time_test bins/$(CONFIG)/timeout_encoding_test bins/$(CONFIG)/transport_metadata_test bins/$(CONFIG)/chttp2_fake_security_cancel_after_accept_test bins/$(CONFIG)/chttp2_fake_security_cancel_after_accept_and_writes_closed_test bins/$(CONFIG)/chttp2_fake_security_cancel_after_invoke_test bins/$(CONFIG)/chttp2_fake_security_cancel_before_invoke_test bins/$(CONFIG)/chttp2_fake_security_cancel_in_a_vacuum_test bins/$(CONFIG)/chttp2_fake_security_census_simple_request_test bins/$(CONFIG)/chttp2_fake_security_disappearing_server_test bins/$(CONFIG)/chttp2_fake_security_early_server_shutdown_finishes_inflight_calls_test bins/$(CONFIG)/chttp2_fake_security_early_server_shutdown_finishes_tags_test bins/$(CONFIG)/chttp2_fake_security_graceful_server_shutdown_test bins/$(CONFIG)/chttp2_fake_security_invoke_large_request_test bins/$(CONFIG)/chttp2_fake_security_max_concurrent_streams_test bins/$(CONFIG)/chttp2_fake_security_no_op_test bins/$(CONFIG)/chttp2_fake_security_ping_pong_streaming_test bins/$(CONFIG)/chttp2_fake_security_request_response_with_binary_metadata_and_payload_test bins/$(CONFIG)/chttp2_fake_security_request_response_with_metadata_and_payload_test bins/$(CONFIG)/chttp2_fake_security_request_response_with_payload_test bins/$(CONFIG)/chttp2_fake_security_request_with_large_metadata_test bins/$(CONFIG)/chttp2_fake_security_request_with_payload_test bins/$(CONFIG)/chttp2_fake_security_simple_delayed_request_test bins/$(CONFIG)/chttp2_fake_security_simple_request_test bins/$(CONFIG)/chttp2_fake_security_thread_stress_test bins/$(CONFIG)/chttp2_fake_security_writes_done_hangs_with_pending_read_test bins/$(CONFIG)/chttp2_fake_security_cancel_after_accept_legacy_test bins/$(CONFIG)/chttp2_fake_security_cancel_after_accept_and_writes_closed_legacy_test bins/$(CONFIG)/chttp2_fake_security_cancel_after_invoke_legacy_test bins/$(CONFIG)/chttp2_fake_security_cancel_before_invoke_legacy_test bins/$(CONFIG)/chttp2_fake_security_cancel_in_a_vacuum_legacy_test bins/$(CONFIG)/chttp2_fake_security_census_simple_request_legacy_test bins/$(CONFIG)/chttp2_fake_security_disappearing_server_legacy_test bins/$(CONFIG)/chttp2_fake_security_early_server_shutdown_finishes_inflight_calls_legacy_test bins/$(CONFIG)/chttp2_fake_security_early_server_shutdown_finishes_tags_legacy_test bins/$(CONFIG)/chttp2_fake_security_graceful_server_shutdown_legacy_test bins/$(CONFIG)/chttp2_fake_security_invoke_large_request_legacy_test bins/$(CONFIG)/chttp2_fake_security_max_concurrent_streams_legacy_test bins/$(CONFIG)/chttp2_fake_security_no_op_legacy_test bins/$(CONFIG)/chttp2_fake_security_ping_pong_streaming_legacy_test bins/$(CONFIG)/chttp2_fake_security_request_response_with_binary_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_fake_security_request_response_with_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_fake_security_request_response_with_payload_legacy_test bins/$(CONFIG)/chttp2_fake_security_request_response_with_trailing_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_fake_security_request_with_large_metadata_legacy_test bins/$(CONFIG)/chttp2_fake_security_request_with_payload_legacy_test bins/$(CONFIG)/chttp2_fake_security_simple_delayed_request_legacy_test bins/$(CONFIG)/chttp2_fake_security_simple_request_legacy_test bins/$(CONFIG)/chttp2_fake_security_thread_stress_legacy_test bins/$(CONFIG)/chttp2_fake_security_writes_done_hangs_with_pending_read_legacy_test bins/$(CONFIG)/chttp2_fullstack_cancel_after_accept_test bins/$(CONFIG)/chttp2_fullstack_cancel_after_accept_and_writes_closed_test bins/$(CONFIG)/chttp2_fullstack_cancel_after_invoke_test bins/$(CONFIG)/chttp2_fullstack_cancel_before_invoke_test bins/$(CONFIG)/chttp2_fullstack_cancel_in_a_vacuum_test bins/$(CONFIG)/chttp2_fullstack_census_simple_request_test bins/$(CONFIG)/chttp2_fullstack_disappearing_server_test bins/$(CONFIG)/chttp2_fullstack_early_server_shutdown_finishes_inflight_calls_test bins/$(CONFIG)/chttp2_fullstack_early_server_shutdown_finishes_tags_test bins/$(CONFIG)/chttp2_fullstack_graceful_server_shutdown_test bins/$(CONFIG)/chttp2_fullstack_invoke_large_request_test bins/$(CONFIG)/chttp2_fullstack_max_concurrent_streams_test bins/$(CONFIG)/chttp2_fullstack_no_op_test bins/$(CONFIG)/chttp2_fullstack_ping_pong_streaming_test bins/$(CONFIG)/chttp2_fullstack_request_response_with_binary_metadata_and_payload_test bins/$(CONFIG)/chttp2_fullstack_request_response_with_metadata_and_payload_test bins/$(CONFIG)/chttp2_fullstack_request_response_with_payload_test bins/$(CONFIG)/chttp2_fullstack_request_with_large_metadata_test bins/$(CONFIG)/chttp2_fullstack_request_with_payload_test bins/$(CONFIG)/chttp2_fullstack_simple_delayed_request_test bins/$(CONFIG)/chttp2_fullstack_simple_request_test bins/$(CONFIG)/chttp2_fullstack_thread_stress_test bins/$(CONFIG)/chttp2_fullstack_writes_done_hangs_with_pending_read_test bins/$(CONFIG)/chttp2_fullstack_cancel_after_accept_legacy_test bins/$(CONFIG)/chttp2_fullstack_cancel_after_accept_and_writes_closed_legacy_test bins/$(CONFIG)/chttp2_fullstack_cancel_after_invoke_legacy_test bins/$(CONFIG)/chttp2_fullstack_cancel_before_invoke_legacy_test bins/$(CONFIG)/chttp2_fullstack_cancel_in_a_vacuum_legacy_test bins/$(CONFIG)/chttp2_fullstack_census_simple_request_legacy_test bins/$(CONFIG)/chttp2_fullstack_disappearing_server_legacy_test bins/$(CONFIG)/chttp2_fullstack_early_server_shutdown_finishes_inflight_calls_legacy_test bins/$(CONFIG)/chttp2_fullstack_early_server_shutdown_finishes_tags_legacy_test bins/$(CONFIG)/chttp2_fullstack_graceful_server_shutdown_legacy_test bins/$(CONFIG)/chttp2_fullstack_invoke_large_request_legacy_test bins/$(CONFIG)/chttp2_fullstack_max_concurrent_streams_legacy_test bins/$(CONFIG)/chttp2_fullstack_no_op_legacy_test bins/$(CONFIG)/chttp2_fullstack_ping_pong_streaming_legacy_test bins/$(CONFIG)/chttp2_fullstack_request_response_with_binary_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_fullstack_request_response_with_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_fullstack_request_response_with_payload_legacy_test bins/$(CONFIG)/chttp2_fullstack_request_response_with_trailing_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_fullstack_request_with_large_metadata_legacy_test bins/$(CONFIG)/chttp2_fullstack_request_with_payload_legacy_test bins/$(CONFIG)/chttp2_fullstack_simple_delayed_request_legacy_test bins/$(CONFIG)/chttp2_fullstack_simple_request_legacy_test bins/$(CONFIG)/chttp2_fullstack_thread_stress_legacy_test bins/$(CONFIG)/chttp2_fullstack_writes_done_hangs_with_pending_read_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_cancel_after_accept_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_cancel_after_accept_and_writes_closed_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_cancel_after_invoke_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_cancel_before_invoke_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_cancel_in_a_vacuum_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_census_simple_request_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_disappearing_server_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_early_server_shutdown_finishes_inflight_calls_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_early_server_shutdown_finishes_tags_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_graceful_server_shutdown_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_invoke_large_request_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_max_concurrent_streams_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_no_op_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_ping_pong_streaming_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_request_response_with_binary_metadata_and_payload_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_request_response_with_metadata_and_payload_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_request_response_with_payload_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_request_with_large_metadata_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_request_with_payload_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_simple_delayed_request_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_simple_request_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_thread_stress_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_writes_done_hangs_with_pending_read_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_cancel_after_accept_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_cancel_after_accept_and_writes_closed_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_cancel_after_invoke_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_cancel_before_invoke_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_cancel_in_a_vacuum_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_census_simple_request_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_disappearing_server_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_early_server_shutdown_finishes_inflight_calls_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_early_server_shutdown_finishes_tags_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_graceful_server_shutdown_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_invoke_large_request_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_max_concurrent_streams_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_no_op_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_ping_pong_streaming_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_request_response_with_binary_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_request_response_with_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_request_response_with_payload_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_request_response_with_trailing_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_request_with_large_metadata_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_request_with_payload_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_simple_delayed_request_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_simple_request_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_thread_stress_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_fullstack_writes_done_hangs_with_pending_read_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_cancel_after_accept_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_cancel_after_accept_and_writes_closed_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_cancel_after_invoke_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_cancel_before_invoke_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_cancel_in_a_vacuum_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_census_simple_request_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_disappearing_server_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_early_server_shutdown_finishes_inflight_calls_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_early_server_shutdown_finishes_tags_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_graceful_server_shutdown_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_invoke_large_request_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_max_concurrent_streams_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_no_op_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_ping_pong_streaming_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_request_response_with_binary_metadata_and_payload_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_request_response_with_metadata_and_payload_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_request_response_with_payload_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_request_with_large_metadata_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_request_with_payload_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_simple_delayed_request_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_simple_request_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_thread_stress_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_writes_done_hangs_with_pending_read_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_cancel_after_accept_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_cancel_after_accept_and_writes_closed_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_cancel_after_invoke_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_cancel_before_invoke_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_cancel_in_a_vacuum_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_census_simple_request_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_disappearing_server_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_early_server_shutdown_finishes_inflight_calls_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_early_server_shutdown_finishes_tags_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_graceful_server_shutdown_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_invoke_large_request_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_max_concurrent_streams_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_no_op_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_ping_pong_streaming_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_request_response_with_binary_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_request_response_with_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_request_response_with_payload_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_request_response_with_trailing_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_request_with_large_metadata_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_request_with_payload_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_simple_delayed_request_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_simple_request_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_thread_stress_legacy_test bins/$(CONFIG)/chttp2_simple_ssl_with_oauth2_fullstack_writes_done_hangs_with_pending_read_legacy_test bins/$(CONFIG)/chttp2_socket_pair_cancel_after_accept_test bins/$(CONFIG)/chttp2_socket_pair_cancel_after_accept_and_writes_closed_test bins/$(CONFIG)/chttp2_socket_pair_cancel_after_invoke_test bins/$(CONFIG)/chttp2_socket_pair_cancel_before_invoke_test bins/$(CONFIG)/chttp2_socket_pair_cancel_in_a_vacuum_test bins/$(CONFIG)/chttp2_socket_pair_census_simple_request_test bins/$(CONFIG)/chttp2_socket_pair_disappearing_server_test bins/$(CONFIG)/chttp2_socket_pair_early_server_shutdown_finishes_inflight_calls_test bins/$(CONFIG)/chttp2_socket_pair_early_server_shutdown_finishes_tags_test bins/$(CONFIG)/chttp2_socket_pair_graceful_server_shutdown_test bins/$(CONFIG)/chttp2_socket_pair_invoke_large_request_test bins/$(CONFIG)/chttp2_socket_pair_max_concurrent_streams_test bins/$(CONFIG)/chttp2_socket_pair_no_op_test bins/$(CONFIG)/chttp2_socket_pair_ping_pong_streaming_test bins/$(CONFIG)/chttp2_socket_pair_request_response_with_binary_metadata_and_payload_test bins/$(CONFIG)/chttp2_socket_pair_request_response_with_metadata_and_payload_test bins/$(CONFIG)/chttp2_socket_pair_request_response_with_payload_test bins/$(CONFIG)/chttp2_socket_pair_request_with_large_metadata_test bins/$(CONFIG)/chttp2_socket_pair_request_with_payload_test bins/$(CONFIG)/chttp2_socket_pair_simple_delayed_request_test bins/$(CONFIG)/chttp2_socket_pair_simple_request_test bins/$(CONFIG)/chttp2_socket_pair_thread_stress_test bins/$(CONFIG)/chttp2_socket_pair_writes_done_hangs_with_pending_read_test bins/$(CONFIG)/chttp2_socket_pair_cancel_after_accept_legacy_test bins/$(CONFIG)/chttp2_socket_pair_cancel_after_accept_and_writes_closed_legacy_test bins/$(CONFIG)/chttp2_socket_pair_cancel_after_invoke_legacy_test bins/$(CONFIG)/chttp2_socket_pair_cancel_before_invoke_legacy_test bins/$(CONFIG)/chttp2_socket_pair_cancel_in_a_vacuum_legacy_test bins/$(CONFIG)/chttp2_socket_pair_census_simple_request_legacy_test bins/$(CONFIG)/chttp2_socket_pair_disappearing_server_legacy_test bins/$(CONFIG)/chttp2_socket_pair_early_server_shutdown_finishes_inflight_calls_legacy_test bins/$(CONFIG)/chttp2_socket_pair_early_server_shutdown_finishes_tags_legacy_test bins/$(CONFIG)/chttp2_socket_pair_graceful_server_shutdown_legacy_test bins/$(CONFIG)/chttp2_socket_pair_invoke_large_request_legacy_test bins/$(CONFIG)/chttp2_socket_pair_max_concurrent_streams_legacy_test bins/$(CONFIG)/chttp2_socket_pair_no_op_legacy_test bins/$(CONFIG)/chttp2_socket_pair_ping_pong_streaming_legacy_test bins/$(CONFIG)/chttp2_socket_pair_request_response_with_binary_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_socket_pair_request_response_with_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_socket_pair_request_response_with_payload_legacy_test bins/$(CONFIG)/chttp2_socket_pair_request_response_with_trailing_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_socket_pair_request_with_large_metadata_legacy_test bins/$(CONFIG)/chttp2_socket_pair_request_with_payload_legacy_test bins/$(CONFIG)/chttp2_socket_pair_simple_delayed_request_legacy_test bins/$(CONFIG)/chttp2_socket_pair_simple_request_legacy_test bins/$(CONFIG)/chttp2_socket_pair_thread_stress_legacy_test bins/$(CONFIG)/chttp2_socket_pair_writes_done_hangs_with_pending_read_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_cancel_after_accept_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_cancel_after_accept_and_writes_closed_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_cancel_after_invoke_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_cancel_before_invoke_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_cancel_in_a_vacuum_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_census_simple_request_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_disappearing_server_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_early_server_shutdown_finishes_inflight_calls_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_early_server_shutdown_finishes_tags_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_graceful_server_shutdown_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_invoke_large_request_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_max_concurrent_streams_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_no_op_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_ping_pong_streaming_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_request_response_with_binary_metadata_and_payload_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_request_response_with_metadata_and_payload_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_request_response_with_payload_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_request_with_large_metadata_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_request_with_payload_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_simple_delayed_request_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_simple_request_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_thread_stress_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_writes_done_hangs_with_pending_read_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_cancel_after_accept_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_cancel_after_accept_and_writes_closed_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_cancel_after_invoke_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_cancel_before_invoke_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_cancel_in_a_vacuum_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_census_simple_request_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_disappearing_server_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_early_server_shutdown_finishes_inflight_calls_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_early_server_shutdown_finishes_tags_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_graceful_server_shutdown_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_invoke_large_request_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_max_concurrent_streams_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_no_op_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_ping_pong_streaming_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_request_response_with_binary_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_request_response_with_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_request_response_with_payload_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_request_response_with_trailing_metadata_and_payload_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_request_with_large_metadata_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_request_with_payload_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_simple_delayed_request_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_simple_request_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_thread_stress_legacy_test bins/$(CONFIG)/chttp2_socket_pair_one_byte_at_a_time_writes_done_hangs_with_pending_read_legacy_test
-buildtests_cxx: privatelibs_cxx bins/$(CONFIG)/channel_arguments_test bins/$(CONFIG)/credentials_test bins/$(CONFIG)/end2end_test bins/$(CONFIG)/interop_client bins/$(CONFIG)/interop_server bins/$(CONFIG)/qps_client bins/$(CONFIG)/qps_server bins/$(CONFIG)/status_test bins/$(CONFIG)/sync_client_async_server_test bins/$(CONFIG)/thread_pool_test bins/$(CONFIG)/tips_client bins/$(CONFIG)/tips_publisher_test bins/$(CONFIG)/tips_subscriber_test
+buildtests_cxx: privatelibs_cxx bins/$(CONFIG)/channel_arguments_test bins/$(CONFIG)/credentials_test bins/$(CONFIG)/end2end_test bins/$(CONFIG)/interop_client bins/$(CONFIG)/interop_server bins/$(CONFIG)/qps_client bins/$(CONFIG)/qps_server bins/$(CONFIG)/status_test bins/$(CONFIG)/thread_pool_test bins/$(CONFIG)/tips_client bins/$(CONFIG)/tips_publisher_test bins/$(CONFIG)/tips_subscriber_test
test: test_c test_cxx
@@ -1439,8 +1438,6 @@ test_cxx: buildtests_cxx
$(Q) ./bins/$(CONFIG)/qps_server || ( echo test qps_server failed ; exit 1 )
$(E) "[RUN] Testing status_test"
$(Q) ./bins/$(CONFIG)/status_test || ( echo test status_test failed ; exit 1 )
- $(E) "[RUN] Testing sync_client_async_server_test"
- $(Q) ./bins/$(CONFIG)/sync_client_async_server_test || ( echo test sync_client_async_server_test failed ; exit 1 )
$(E) "[RUN] Testing thread_pool_test"
$(Q) ./bins/$(CONFIG)/thread_pool_test || ( echo test thread_pool_test failed ; exit 1 )
$(E) "[RUN] Testing tips_publisher_test"
@@ -2611,27 +2608,23 @@ LIBGRPC++_SRC = \
src/cpp/client/channel.cc \
src/cpp/client/channel_arguments.cc \
src/cpp/client/client_context.cc \
+ src/cpp/client/client_unary_call.cc \
src/cpp/client/create_channel.cc \
src/cpp/client/credentials.cc \
src/cpp/client/internal_stub.cc \
+ src/cpp/common/call.cc \
src/cpp/common/completion_queue.cc \
src/cpp/common/rpc_method.cc \
src/cpp/proto/proto_utils.cc \
- src/cpp/server/async_server.cc \
- src/cpp/server/async_server_context.cc \
src/cpp/server/server.cc \
src/cpp/server/server_builder.cc \
src/cpp/server/server_context_impl.cc \
src/cpp/server/server_credentials.cc \
- src/cpp/server/server_rpc_handler.cc \
src/cpp/server/thread_pool.cc \
- src/cpp/stream/stream_context.cc \
src/cpp/util/status.cc \
src/cpp/util/time.cc \
PUBLIC_HEADERS_CXX += \
- include/grpc++/async_server.h \
- include/grpc++/async_server_context.h \
include/grpc++/channel_arguments.h \
include/grpc++/channel_interface.h \
include/grpc++/client_context.h \
@@ -2639,6 +2632,7 @@ PUBLIC_HEADERS_CXX += \
include/grpc++/config.h \
include/grpc++/create_channel.h \
include/grpc++/credentials.h \
+ include/grpc++/impl/client_unary_call.h \
include/grpc++/impl/internal_stub.h \
include/grpc++/impl/rpc_method.h \
include/grpc++/impl/rpc_service_method.h \
@@ -2670,21 +2664,19 @@ ifneq ($(OPENSSL_DEP),)
src/cpp/client/channel.cc: $(OPENSSL_DEP)
src/cpp/client/channel_arguments.cc: $(OPENSSL_DEP)
src/cpp/client/client_context.cc: $(OPENSSL_DEP)
+src/cpp/client/client_unary_call.cc: $(OPENSSL_DEP)
src/cpp/client/create_channel.cc: $(OPENSSL_DEP)
src/cpp/client/credentials.cc: $(OPENSSL_DEP)
src/cpp/client/internal_stub.cc: $(OPENSSL_DEP)
+src/cpp/common/call.cc: $(OPENSSL_DEP)
src/cpp/common/completion_queue.cc: $(OPENSSL_DEP)
src/cpp/common/rpc_method.cc: $(OPENSSL_DEP)
src/cpp/proto/proto_utils.cc: $(OPENSSL_DEP)
-src/cpp/server/async_server.cc: $(OPENSSL_DEP)
-src/cpp/server/async_server_context.cc: $(OPENSSL_DEP)
src/cpp/server/server.cc: $(OPENSSL_DEP)
src/cpp/server/server_builder.cc: $(OPENSSL_DEP)
src/cpp/server/server_context_impl.cc: $(OPENSSL_DEP)
src/cpp/server/server_credentials.cc: $(OPENSSL_DEP)
-src/cpp/server/server_rpc_handler.cc: $(OPENSSL_DEP)
src/cpp/server/thread_pool.cc: $(OPENSSL_DEP)
-src/cpp/stream/stream_context.cc: $(OPENSSL_DEP)
src/cpp/util/status.cc: $(OPENSSL_DEP)
src/cpp/util/time.cc: $(OPENSSL_DEP)
endif
@@ -2730,21 +2722,19 @@ endif
objs/$(CONFIG)/src/cpp/client/channel.o:
objs/$(CONFIG)/src/cpp/client/channel_arguments.o:
objs/$(CONFIG)/src/cpp/client/client_context.o:
+objs/$(CONFIG)/src/cpp/client/client_unary_call.o:
objs/$(CONFIG)/src/cpp/client/create_channel.o:
objs/$(CONFIG)/src/cpp/client/credentials.o:
objs/$(CONFIG)/src/cpp/client/internal_stub.o:
+objs/$(CONFIG)/src/cpp/common/call.o:
objs/$(CONFIG)/src/cpp/common/completion_queue.o:
objs/$(CONFIG)/src/cpp/common/rpc_method.o:
objs/$(CONFIG)/src/cpp/proto/proto_utils.o:
-objs/$(CONFIG)/src/cpp/server/async_server.o:
-objs/$(CONFIG)/src/cpp/server/async_server_context.o:
objs/$(CONFIG)/src/cpp/server/server.o:
objs/$(CONFIG)/src/cpp/server/server_builder.o:
objs/$(CONFIG)/src/cpp/server/server_context_impl.o:
objs/$(CONFIG)/src/cpp/server/server_credentials.o:
-objs/$(CONFIG)/src/cpp/server/server_rpc_handler.o:
objs/$(CONFIG)/src/cpp/server/thread_pool.o:
-objs/$(CONFIG)/src/cpp/stream/stream_context.o:
objs/$(CONFIG)/src/cpp/util/status.o:
objs/$(CONFIG)/src/cpp/util/time.o:
@@ -2753,7 +2743,6 @@ LIBGRPC++_TEST_UTIL_SRC = \
gens/test/cpp/util/messages.pb.cc \
gens/test/cpp/util/echo.pb.cc \
gens/test/cpp/util/echo_duplicate.pb.cc \
- test/cpp/end2end/async_test_server.cc \
test/cpp/util/create_test_channel.cc \
@@ -2772,7 +2761,6 @@ ifneq ($(OPENSSL_DEP),)
test/cpp/util/messages.proto: $(OPENSSL_DEP)
test/cpp/util/echo.proto: $(OPENSSL_DEP)
test/cpp/util/echo_duplicate.proto: $(OPENSSL_DEP)
-test/cpp/end2end/async_test_server.cc: $(OPENSSL_DEP)
test/cpp/util/create_test_channel.cc: $(OPENSSL_DEP)
endif
@@ -2800,7 +2788,6 @@ endif
-objs/$(CONFIG)/test/cpp/end2end/async_test_server.o: gens/test/cpp/util/messages.pb.cc gens/test/cpp/util/echo.pb.cc gens/test/cpp/util/echo_duplicate.pb.cc
objs/$(CONFIG)/test/cpp/util/create_test_channel.o: gens/test/cpp/util/messages.pb.cc gens/test/cpp/util/echo.pb.cc gens/test/cpp/util/echo_duplicate.pb.cc
@@ -7088,37 +7075,6 @@ endif
endif
-SYNC_CLIENT_ASYNC_SERVER_TEST_SRC = \
- test/cpp/end2end/sync_client_async_server_test.cc \
-
-SYNC_CLIENT_ASYNC_SERVER_TEST_OBJS = $(addprefix objs/$(CONFIG)/, $(addsuffix .o, $(basename $(SYNC_CLIENT_ASYNC_SERVER_TEST_SRC))))
-
-ifeq ($(NO_SECURE),true)
-
-# You can't build secure targets if you don't have OpenSSL with ALPN.
-
-bins/$(CONFIG)/sync_client_async_server_test: openssl_dep_error
-
-else
-
-bins/$(CONFIG)/sync_client_async_server_test: $(SYNC_CLIENT_ASYNC_SERVER_TEST_OBJS) libs/$(CONFIG)/libgrpc++_test_util.a libs/$(CONFIG)/libgrpc_test_util.a libs/$(CONFIG)/libgrpc++.a libs/$(CONFIG)/libgrpc.a libs/$(CONFIG)/libgpr_test_util.a libs/$(CONFIG)/libgpr.a
- $(E) "[LD] Linking $@"
- $(Q) mkdir -p `dirname $@`
- $(Q) $(LDXX) $(LDFLAGS) $(SYNC_CLIENT_ASYNC_SERVER_TEST_OBJS) $(GTEST_LIB) libs/$(CONFIG)/libgrpc++_test_util.a libs/$(CONFIG)/libgrpc_test_util.a libs/$(CONFIG)/libgrpc++.a libs/$(CONFIG)/libgrpc.a libs/$(CONFIG)/libgpr_test_util.a libs/$(CONFIG)/libgpr.a $(LDLIBSXX) $(LDLIBS) $(LDLIBS_SECURE) -o bins/$(CONFIG)/sync_client_async_server_test
-
-endif
-
-objs/$(CONFIG)/test/cpp/end2end/sync_client_async_server_test.o: libs/$(CONFIG)/libgrpc++_test_util.a libs/$(CONFIG)/libgrpc_test_util.a libs/$(CONFIG)/libgrpc++.a libs/$(CONFIG)/libgrpc.a libs/$(CONFIG)/libgpr_test_util.a libs/$(CONFIG)/libgpr.a
-
-deps_sync_client_async_server_test: $(SYNC_CLIENT_ASYNC_SERVER_TEST_OBJS:.o=.dep)
-
-ifneq ($(NO_SECURE),true)
-ifneq ($(NO_DEPS),true)
--include $(SYNC_CLIENT_ASYNC_SERVER_TEST_OBJS:.o=.dep)
-endif
-endif
-
-
THREAD_POOL_TEST_SRC = \
test/cpp/server/thread_pool_test.cc \
diff --git a/build.json b/build.json
index 68110e4702..fdb32ebf71 100644
--- a/build.json
+++ b/build.json
@@ -378,8 +378,6 @@
"build": "all",
"language": "c++",
"public_headers": [
- "include/grpc++/async_server.h",
- "include/grpc++/async_server_context.h",
"include/grpc++/channel_arguments.h",
"include/grpc++/channel_interface.h",
"include/grpc++/client_context.h",
@@ -387,6 +385,7 @@
"include/grpc++/config.h",
"include/grpc++/create_channel.h",
"include/grpc++/credentials.h",
+ "include/grpc++/impl/client_unary_call.h",
"include/grpc++/impl/internal_stub.h",
"include/grpc++/impl/rpc_method.h",
"include/grpc++/impl/rpc_service_method.h",
@@ -401,30 +400,26 @@
"headers": [
"src/cpp/client/channel.h",
"src/cpp/proto/proto_utils.h",
- "src/cpp/server/server_rpc_handler.h",
"src/cpp/server/thread_pool.h",
- "src/cpp/stream/stream_context.h",
"src/cpp/util/time.h"
],
"src": [
"src/cpp/client/channel.cc",
"src/cpp/client/channel_arguments.cc",
"src/cpp/client/client_context.cc",
+ "src/cpp/client/client_unary_call.cc",
"src/cpp/client/create_channel.cc",
"src/cpp/client/credentials.cc",
"src/cpp/client/internal_stub.cc",
+ "src/cpp/common/call.cc",
"src/cpp/common/completion_queue.cc",
"src/cpp/common/rpc_method.cc",
"src/cpp/proto/proto_utils.cc",
- "src/cpp/server/async_server.cc",
- "src/cpp/server/async_server_context.cc",
"src/cpp/server/server.cc",
"src/cpp/server/server_builder.cc",
"src/cpp/server/server_context_impl.cc",
"src/cpp/server/server_credentials.cc",
- "src/cpp/server/server_rpc_handler.cc",
"src/cpp/server/thread_pool.cc",
- "src/cpp/stream/stream_context.cc",
"src/cpp/util/status.cc",
"src/cpp/util/time.cc"
],
@@ -442,7 +437,6 @@
"test/cpp/util/messages.proto",
"test/cpp/util/echo.proto",
"test/cpp/util/echo_duplicate.proto",
- "test/cpp/end2end/async_test_server.cc",
"test/cpp/util/create_test_channel.cc"
]
},
@@ -1680,22 +1674,6 @@
]
},
{
- "name": "sync_client_async_server_test",
- "build": "test",
- "language": "c++",
- "src": [
- "test/cpp/end2end/sync_client_async_server_test.cc"
- ],
- "deps": [
- "grpc++_test_util",
- "grpc_test_util",
- "grpc++",
- "grpc",
- "gpr_test_util",
- "gpr"
- ]
- },
- {
"name": "thread_pool_test",
"build": "test",
"language": "c++",
diff --git a/examples/tips/publisher_test.cc b/examples/tips/publisher_test.cc
index 34737ae6ed..db3e3784da 100644
--- a/examples/tips/publisher_test.cc
+++ b/examples/tips/publisher_test.cc
@@ -107,7 +107,7 @@ class PublisherTest : public ::testing::Test {
server_address_ << "localhost:" << port;
ServerBuilder builder;
builder.AddPort(server_address_.str());
- builder.RegisterService(service_.service());
+ builder.RegisterService(&service_);
server_ = builder.BuildAndStart();
channel_ = CreateChannel(server_address_.str(), ChannelArguments());
diff --git a/examples/tips/subscriber_test.cc b/examples/tips/subscriber_test.cc
index fda8909a02..736e6da319 100644
--- a/examples/tips/subscriber_test.cc
+++ b/examples/tips/subscriber_test.cc
@@ -106,7 +106,7 @@ class SubscriberTest : public ::testing::Test {
server_address_ << "localhost:" << port;
ServerBuilder builder;
builder.AddPort(server_address_.str());
- builder.RegisterService(service_.service());
+ builder.RegisterService(&service_);
server_ = builder.BuildAndStart();
channel_ = CreateChannel(server_address_.str(), ChannelArguments());
diff --git a/include/grpc++/async_server_context.h b/include/grpc++/async_server_context.h
deleted file mode 100644
index c038286ac1..0000000000
--- a/include/grpc++/async_server_context.h
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef __GRPCPP_ASYNC_SERVER_CONTEXT_H__
-#define __GRPCPP_ASYNC_SERVER_CONTEXT_H__
-
-#include <chrono>
-
-#include <grpc++/config.h>
-
-struct grpc_byte_buffer;
-struct grpc_call;
-struct grpc_completion_queue;
-
-namespace google {
-namespace protobuf {
-class Message;
-}
-}
-
-using std::chrono::system_clock;
-
-namespace grpc {
-class Status;
-
-// TODO(rocking): wrap grpc c structures.
-class AsyncServerContext {
- public:
- AsyncServerContext(grpc_call* call, const grpc::string& method,
- const grpc::string& host,
- system_clock::time_point absolute_deadline);
- ~AsyncServerContext();
-
- // Accept this rpc, bind it to a completion queue.
- void Accept(grpc_completion_queue* cq);
-
- // Read and write calls, all async. Return true for success.
- bool StartRead(google::protobuf::Message* request);
- bool StartWrite(const google::protobuf::Message& response, int flags);
- bool StartWriteStatus(const Status& status);
-
- bool ParseRead(grpc_byte_buffer* read_buffer);
-
- grpc::string method() const { return method_; }
- grpc::string host() const { return host_; }
- system_clock::time_point absolute_deadline() { return absolute_deadline_; }
-
- grpc_call* call() { return call_; }
-
- private:
- AsyncServerContext(const AsyncServerContext&);
- AsyncServerContext& operator=(const AsyncServerContext&);
-
- // These properties may be moved to a ServerContext class.
- const grpc::string method_;
- const grpc::string host_;
- system_clock::time_point absolute_deadline_;
-
- google::protobuf::Message* request_; // not owned
- grpc_call* call_; // owned
-};
-
-} // namespace grpc
-
-#endif // __GRPCPP_ASYNC_SERVER_CONTEXT_H__
diff --git a/test/cpp/end2end/async_test_server.h b/include/grpc++/call.h
index a277061ace..de789febe6 100644
--- a/test/cpp/end2end/async_test_server.h
+++ b/include/grpc++/call.h
@@ -31,45 +31,73 @@
*
*/
-#ifndef __GRPCPP_TEST_END2END_ASYNC_TEST_SERVER_H__
-#define __GRPCPP_TEST_END2END_ASYNC_TEST_SERVER_H__
+#ifndef __GRPCPP_CALL_H__
+#define __GRPCPP_CALL_H__
-#include <condition_variable>
-#include <mutex>
-#include <string>
-
-#include <grpc++/async_server.h>
+#include <grpc++/status.h>
#include <grpc++/completion_queue.h>
+#include <memory>
+#include <vector>
+
+namespace google {
+namespace protobuf {
+class Message;
+} // namespace protobuf
+} // namespace google
+
+struct grpc_call;
+struct grpc_op;
+
namespace grpc {
-namespace testing {
+class ChannelInterface;
+
+class CallOpBuffer final : public CompletionQueueTag {
+ public:
+ CallOpBuffer() : return_tag_(this) {}
+
+ void Reset(void *next_return_tag);
+
+ void AddSendInitialMetadata(std::vector<std::pair<grpc::string, grpc::string> > *metadata);
+ void AddSendMessage(const google::protobuf::Message &message);
+ void AddRecvMessage(google::protobuf::Message *message);
+ void AddClientSendClose();
+ void AddClientRecvStatus(Status *status);
-class AsyncTestServer {
+ // INTERNAL API:
+
+ // Convert to an array of grpc_op elements
+ void FillOps(grpc_op *ops, size_t *nops);
+
+ // Called by completion queue just prior to returning from Next() or Pluck()
+ void FinalizeResult(void *tag, bool *status) override;
+
+ private:
+ void *return_tag_;
+};
+
+class CCallDeleter {
public:
- AsyncTestServer();
- virtual ~AsyncTestServer();
+ void operator()(grpc_call *c);
+};
- void AddPort(const grpc::string& addr);
- void Start();
- void RequestOneRpc();
- virtual void MainLoop();
- void Shutdown();
+// Straightforward wrapping of the C call object
+class Call final {
+ public:
+ Call(grpc_call *call, ChannelInterface *channel, CompletionQueue *cq);
- CompletionQueue* completion_queue() { return &cq_; }
+ void PerformOps(CallOpBuffer *buffer);
- protected:
- void HandleQueueClosed();
+ grpc_call *call() { return call_.get(); }
+ CompletionQueue *cq() { return cq_; }
private:
- CompletionQueue cq_;
- AsyncServer server_;
- bool cq_drained_;
- std::mutex cq_drained_mu_;
- std::condition_variable cq_drained_cv_;
+ ChannelInterface *channel_;
+ CompletionQueue *cq_;
+ std::unique_ptr<grpc_call, CCallDeleter> call_;
};
-} // namespace testing
} // namespace grpc
-#endif // __GRPCPP_TEST_END2END_ASYNC_TEST_SERVER_H__
+#endif // __GRPCPP_CALL_INTERFACE_H__
diff --git a/include/grpc++/channel_interface.h b/include/grpc++/channel_interface.h
index 9ed35422b8..3631ea4d5d 100644
--- a/include/grpc++/channel_interface.h
+++ b/include/grpc++/channel_interface.h
@@ -39,28 +39,26 @@
namespace google {
namespace protobuf {
class Message;
-}
-}
+} // namespace protobuf
+} // namespace google
-namespace grpc {
+struct grpc_call;
+namespace grpc {
+class Call;
+class CallOpBuffer;
class ClientContext;
+class CompletionQueue;
class RpcMethod;
-class StreamContextInterface;
+class CallInterface;
class ChannelInterface {
public:
virtual ~ChannelInterface() {}
- virtual Status StartBlockingRpc(const RpcMethod& method,
- ClientContext* context,
- const google::protobuf::Message& request,
- google::protobuf::Message* result) = 0;
-
- virtual StreamContextInterface* CreateStream(
- const RpcMethod& method, ClientContext* context,
- const google::protobuf::Message* request,
- google::protobuf::Message* result) = 0;
+ virtual Call CreateCall(const RpcMethod &method, ClientContext *context,
+ CompletionQueue *cq) = 0;
+ virtual void PerformOpsOnCall(CallOpBuffer *ops, Call *call) = 0;
};
} // namespace grpc
diff --git a/include/grpc++/completion_queue.h b/include/grpc++/completion_queue.h
index 72f6253f8e..c976bd5b45 100644
--- a/include/grpc++/completion_queue.h
+++ b/include/grpc++/completion_queue.h
@@ -34,51 +34,66 @@
#ifndef __GRPCPP_COMPLETION_QUEUE_H__
#define __GRPCPP_COMPLETION_QUEUE_H__
+#include <grpc++/impl/client_unary_call.h>
+
struct grpc_completion_queue;
namespace grpc {
+template <class R>
+class ClientReader;
+template <class W>
+class ClientWriter;
+template <class R, class W>
+class ClientReaderWriter;
+template <class R>
+class ServerReader;
+template <class W>
+class ServerWriter;
+template <class R, class W>
+class ServerReaderWriter;
+
+class CompletionQueue;
+
+class CompletionQueueTag {
+ public:
+ // Called prior to returning from Next(), return value
+ // is the status of the operation (return status is the default thing
+ // to do)
+ virtual void FinalizeResult(void *tag, bool *status) = 0;
+};
+
// grpc_completion_queue wrapper class
class CompletionQueue {
public:
CompletionQueue();
~CompletionQueue();
- enum CompletionType {
- QUEUE_CLOSED = 0, // Shutting down.
- RPC_END = 1, // An RPC finished. Either at client or server.
- CLIENT_READ_OK = 2, // A client-side read has finished successfully.
- CLIENT_READ_ERROR = 3, // A client-side read has finished with error.
- CLIENT_WRITE_OK = 4,
- CLIENT_WRITE_ERROR = 5,
- SERVER_RPC_NEW = 6, // A new RPC just arrived at the server.
- SERVER_READ_OK = 7, // A server-side read has finished successfully.
- SERVER_READ_ERROR = 8, // A server-side read has finished with error.
- SERVER_WRITE_OK = 9,
- SERVER_WRITE_ERROR = 10,
- // Client or server has sent half close successfully.
- HALFCLOSE_OK = 11,
- // New CompletionTypes may be added in the future, so user code should
- // always
- // handle the default case of a CompletionType that appears after such code
- // was
- // written.
- DO_NOT_USE = 20,
- };
-
// Blocking read from queue.
- // For QUEUE_CLOSED, *tag is not changed.
- // For SERVER_RPC_NEW, *tag will be a newly allocated AsyncServerContext.
- // For others, *tag will be the AsyncServerContext of this rpc.
- CompletionType Next(void** tag);
+ // Returns true if an event was received, false if the queue is ready
+ // for destruction.
+ bool Next(void **tag, bool *ok);
// Shutdown has to be called, and the CompletionQueue can only be
- // destructed when the QUEUE_CLOSED message has been read with Next().
+ // destructed when false is returned from Next().
void Shutdown();
grpc_completion_queue* cq() { return cq_; }
private:
+ template <class R> friend class ::grpc::ClientReader;
+ template <class W> friend class ::grpc::ClientWriter;
+ template <class R, class W> friend class ::grpc::ClientReaderWriter;
+ template <class R> friend class ::grpc::ServerReader;
+ template <class W> friend class ::grpc::ServerWriter;
+ template <class R, class W> friend class ::grpc::ServerReaderWriter;
+ friend Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
+ ClientContext *context,
+ const google::protobuf::Message &request,
+ google::protobuf::Message *result);
+
+ bool Pluck(CompletionQueueTag *tag);
+
grpc_completion_queue* cq_; // owned
};
diff --git a/include/grpc++/config.h b/include/grpc++/config.h
index 52913fbf0f..1b4b463d35 100644
--- a/include/grpc++/config.h
+++ b/include/grpc++/config.h
@@ -39,6 +39,7 @@
namespace grpc {
typedef std::string string;
-}
+
+} // namespace grpc
#endif // __GRPCPP_CONFIG_H__
diff --git a/include/grpc++/impl/client_unary_call.h b/include/grpc++/impl/client_unary_call.h
new file mode 100644
index 0000000000..091430b884
--- /dev/null
+++ b/include/grpc++/impl/client_unary_call.h
@@ -0,0 +1,67 @@
+/*
+*
+* Copyright 2014, Google Inc.
+* All rights reserved.
+*
+* Redistribution and use in source and binary forms, with or without
+* modification, are permitted provided that the following conditions are
+* met:
+*
+* * Redistributions of source code must retain the above copyright
+* notice, this list of conditions and the following disclaimer.
+* * Redistributions in binary form must reproduce the above
+* copyright notice, this list of conditions and the following disclaimer
+* in the documentation and/or other materials provided with the
+* distribution.
+* * Neither the name of Google Inc. nor the names of its
+* contributors may be used to endorse or promote products derived from
+* this software without specific prior written permission.
+*
+* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+*
+*/
+
+#ifndef __GRPCPP_CLIENT_UNARY_CALL_H__
+#define __GRPCPP_CLIENT_UNARY_CALL_H__
+
+namespace google {
+namespace protobuf {
+class Message;
+} // namespace protobuf
+} // namespace google
+
+namespace grpc {
+
+class ChannelInterface;
+class ClientContext;
+class CompletionQueue;
+class RpcMethod;
+class Status;
+
+// Wrapper that begins an asynchronous unary call
+void AsyncUnaryCall(ChannelInterface *channel, const RpcMethod &method,
+ ClientContext *context,
+ const google::protobuf::Message &request,
+ google::protobuf::Message *result, Status *status,
+ CompletionQueue *cq, void *tag);
+
+// Wrapper that performs a blocking unary call
+Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
+ ClientContext *context,
+ const google::protobuf::Message &request,
+ google::protobuf::Message *result);
+
+} // namespace grpc
+
+#endif
+
diff --git a/include/grpc++/impl/rpc_method.h b/include/grpc++/impl/rpc_method.h
index 75fec356dd..bb16e64c96 100644
--- a/include/grpc++/impl/rpc_method.h
+++ b/include/grpc++/impl/rpc_method.h
@@ -37,8 +37,8 @@
namespace google {
namespace protobuf {
class Message;
-}
-}
+} // namespace protobuf
+} // namespace google
namespace grpc {
diff --git a/include/grpc++/impl/rpc_service_method.h b/include/grpc++/impl/rpc_service_method.h
index 620de5e67f..0fb4f79b59 100644
--- a/include/grpc++/impl/rpc_service_method.h
+++ b/include/grpc++/impl/rpc_service_method.h
@@ -55,25 +55,18 @@ class MethodHandler {
public:
virtual ~MethodHandler() {}
struct HandlerParameter {
- HandlerParameter(ServerContext* context,
+ HandlerParameter(Call *c,
+ ServerContext* context,
const google::protobuf::Message* req,
google::protobuf::Message* resp)
- : server_context(context),
+ : call(c),
+ server_context(context),
request(req),
- response(resp),
- stream_context(nullptr) {}
- HandlerParameter(ServerContext* context,
- const google::protobuf::Message* req,
- google::protobuf::Message* resp,
- StreamContextInterface* stream)
- : server_context(context),
- request(req),
- response(resp),
- stream_context(stream) {}
+ response(resp) {}
+ Call* call;
ServerContext* server_context;
const google::protobuf::Message* request;
google::protobuf::Message* response;
- StreamContextInterface* stream_context;
};
virtual Status RunHandler(const HandlerParameter& param) = 0;
};
@@ -114,7 +107,7 @@ class ClientStreamingHandler : public MethodHandler {
: func_(func), service_(service) {}
Status RunHandler(const HandlerParameter& param) final {
- ServerReader<RequestType> reader(param.stream_context);
+ ServerReader<RequestType> reader(param.call);
return func_(service_, param.server_context, &reader,
dynamic_cast<ResponseType*>(param.response));
}
@@ -136,7 +129,7 @@ class ServerStreamingHandler : public MethodHandler {
: func_(func), service_(service) {}
Status RunHandler(const HandlerParameter& param) final {
- ServerWriter<ResponseType> writer(param.stream_context);
+ ServerWriter<ResponseType> writer(param.call);
return func_(service_, param.server_context,
dynamic_cast<const RequestType*>(param.request), &writer);
}
@@ -159,7 +152,7 @@ class BidiStreamingHandler : public MethodHandler {
: func_(func), service_(service) {}
Status RunHandler(const HandlerParameter& param) final {
- ServerReaderWriter<ResponseType, RequestType> stream(param.stream_context);
+ ServerReaderWriter<ResponseType, RequestType> stream(param.call);
return func_(service_, param.server_context, &stream);
}
@@ -203,7 +196,7 @@ class RpcService {
public:
// Takes ownership.
void AddMethod(RpcServiceMethod* method) {
- methods_.push_back(std::unique_ptr<RpcServiceMethod>(method));
+ methods_.emplace_back(method);
}
RpcServiceMethod* GetMethod(int i) { return methods_[i].get(); }
diff --git a/include/grpc++/async_server.h b/include/grpc++/impl/service_type.h
index fe2c5d9367..0684f322d8 100644
--- a/include/grpc++/async_server.h
+++ b/include/grpc++/impl/service_type.h
@@ -31,40 +31,25 @@
*
*/
-#ifndef __GRPCPP_ASYNC_SERVER_H__
-#define __GRPCPP_ASYNC_SERVER_H__
-
-#include <mutex>
-
-#include <grpc++/config.h>
-
-struct grpc_server;
+#ifndef __GRPCPP_IMPL_SERVICE_TYPE_H__
+#define __GRPCPP_IMPL_SERVICE_TYPE_H__
namespace grpc {
-class CompletionQueue;
-
-class AsyncServer {
- public:
- explicit AsyncServer(CompletionQueue* cc);
- ~AsyncServer();
- void AddPort(const grpc::string& addr);
+class RpcService;
- void Start();
-
- // The user has to call this to get one new rpc on the completion
- // queue.
- void RequestOneRpc();
-
- void Shutdown();
+class SynchronousService {
+ public:
+ virtual ~SynchronousService() {}
+ virtual RpcService *service() = 0;
+};
- private:
- bool started_;
- std::mutex shutdown_mu_;
- bool shutdown_;
- grpc_server* server_;
+class AsynchronousService {
+ public:
+ virtual ~AsynchronousService() {}
+ virtual RpcService *service() = 0;
};
} // namespace grpc
-#endif // __GRPCPP_ASYNC_SERVER_H__
+#endif // __GRPCPP_IMPL_SERVICE_TYPE_H__ \ No newline at end of file
diff --git a/include/grpc++/server.h b/include/grpc++/server.h
index 5fa371ba62..670ffa7815 100644
--- a/include/grpc++/server.h
+++ b/include/grpc++/server.h
@@ -48,8 +48,8 @@ struct grpc_server;
namespace google {
namespace protobuf {
class Message;
-}
-}
+} // namespace protobuf
+} // namespace google
namespace grpc {
class AsyncServerContext;
@@ -70,17 +70,16 @@ class Server {
friend class ServerBuilder;
// ServerBuilder use only
- Server(ThreadPoolInterface* thread_pool, ServerCredentials* creds);
+ Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned, ServerCredentials* creds);
Server();
// Register a service. This call does not take ownership of the service.
// The service must exist for the lifetime of the Server instance.
- void RegisterService(RpcService* service);
+ bool RegisterService(RpcService* service);
// Add a listening port. Can be called multiple times.
- void AddPort(const grpc::string& addr);
+ int AddPort(const grpc::string& addr);
// Start the server.
- void Start();
+ bool Start();
- void AllowOneRpc();
void HandleQueueClosed();
void RunRpc();
void ScheduleCallback();
diff --git a/include/grpc++/server_builder.h b/include/grpc++/server_builder.h
index cf27452010..8b4c81bc87 100644
--- a/include/grpc++/server_builder.h
+++ b/include/grpc++/server_builder.h
@@ -41,9 +41,11 @@
namespace grpc {
+class AsynchronousService;
class RpcService;
class Server;
class ServerCredentials;
+class SynchronousService;
class ThreadPoolInterface;
class ServerBuilder {
@@ -53,7 +55,9 @@ class ServerBuilder {
// Register a service. This call does not take ownership of the service.
// The service must exist for the lifetime of the Server instance returned by
// BuildAndStart().
- void RegisterService(RpcService* service);
+ void RegisterService(SynchronousService* service);
+
+ void RegisterAsyncService(AsynchronousService *service);
// Add a listening port. Can be called multiple times.
void AddPort(const grpc::string& addr);
@@ -71,9 +75,10 @@ class ServerBuilder {
private:
std::vector<RpcService*> services_;
+ std::vector<AsynchronousService*> async_services_;
std::vector<grpc::string> ports_;
std::shared_ptr<ServerCredentials> creds_;
- ThreadPoolInterface* thread_pool_;
+ ThreadPoolInterface* thread_pool_ = nullptr;
};
} // namespace grpc
diff --git a/include/grpc++/server_context.h b/include/grpc++/server_context.h
index 47fd6cf1c8..4af9fd6aaa 100644
--- a/include/grpc++/server_context.h
+++ b/include/grpc++/server_context.h
@@ -35,6 +35,9 @@
#define __GRPCPP_SERVER_CONTEXT_H_
#include <chrono>
+#include <vector>
+
+#include "config.h"
namespace grpc {
@@ -43,7 +46,10 @@ class ServerContext {
public:
virtual ~ServerContext() {}
- virtual std::chrono::system_clock::time_point absolute_deadline() const = 0;
+ std::chrono::system_clock::time_point absolute_deadline();
+
+ private:
+ std::vector<std::pair<grpc::string, grpc::string> > metadata_;
};
} // namespace grpc
diff --git a/include/grpc++/stream.h b/include/grpc++/stream.h
index b8982f4d93..c30825a7a5 100644
--- a/include/grpc++/stream.h
+++ b/include/grpc++/stream.h
@@ -34,7 +34,9 @@
#ifndef __GRPCPP_STREAM_H__
#define __GRPCPP_STREAM_H__
-#include <grpc++/stream_context_interface.h>
+#include <grpc++/call.h>
+#include <grpc++/channel_interface.h>
+#include <grpc++/completion_queue.h>
#include <grpc++/status.h>
#include <grpc/support/log.h>
@@ -45,16 +47,12 @@ class ClientStreamingInterface {
public:
virtual ~ClientStreamingInterface() {}
- // Try to cancel the stream. Wait() still needs to be called to get the final
- // status. Cancelling after the stream has finished has no effects.
- virtual void Cancel() = 0;
-
// Wait until the stream finishes, and return the final status. When the
// client side declares it has no more message to send, either implicitly or
// by calling WritesDone, it needs to make sure there is no more message to
// be received from the server, either implicitly or by getting a false from
// a Read(). Otherwise, this implicitly cancels the stream.
- virtual const Status& Wait() = 0;
+ virtual Status Finish() = 0;
};
// An interface that yields a sequence of R messages.
@@ -82,147 +80,391 @@ class WriterInterface {
};
template <class R>
-class ClientReader : public ClientStreamingInterface,
- public ReaderInterface<R> {
+class ClientReader final : public ClientStreamingInterface,
+ public ReaderInterface<R> {
public:
// Blocking create a stream and write the first request out.
- explicit ClientReader(StreamContextInterface* context) : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(true);
- context_->Write(context_->request(), true);
+ ClientReader(ChannelInterface *channel, const RpcMethod &method,
+ ClientContext *context,
+ const google::protobuf::Message &request)
+ : call_(channel->CreateCall(method, context, &cq_)) {
+ CallOpBuffer buf;
+ buf.AddSendMessage(request);
+ buf.AddClientSendClose();
+ call_.PerformOps(&buf);
+ cq_.Pluck(&buf);
}
- ~ClientReader() { delete context_; }
+ virtual bool Read(R *msg) override {
+ CallOpBuffer buf;
+ buf.AddRecvMessage(msg);
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
+ }
- virtual bool Read(R* msg) { return context_->Read(msg); }
+ virtual Status Finish() override {
+ CallOpBuffer buf;
+ Status status;
+ buf.AddClientRecvStatus(&status);
+ call_.PerformOps(&buf);
+ GPR_ASSERT(cq_.Pluck(&buf));
+ return status;
+ }
- virtual void Cancel() { context_->Cancel(); }
+ private:
+ CompletionQueue cq_;
+ Call call_;
+};
- virtual const Status& Wait() { return context_->Wait(); }
+template <class W>
+class ClientWriter final : public ClientStreamingInterface,
+ public WriterInterface<W> {
+ public:
+ // Blocking create a stream.
+ ClientWriter(ChannelInterface *channel, const RpcMethod &method,
+ ClientContext *context,
+ google::protobuf::Message *response)
+ : response_(response),
+ call_(channel->CreateCall(method, context, &cq_)) {}
+
+ virtual bool Write(const W& msg) override {
+ CallOpBuffer buf;
+ buf.AddSendMessage(msg);
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
+ }
+
+ virtual bool WritesDone() {
+ CallOpBuffer buf;
+ buf.AddClientSendClose();
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
+ }
+
+ // Read the final response and wait for the final status.
+ virtual Status Finish() override {
+ CallOpBuffer buf;
+ Status status;
+ buf.AddRecvMessage(response_);
+ buf.AddClientRecvStatus(&status);
+ call_.PerformOps(&buf);
+ GPR_ASSERT(cq_.Pluck(&buf));
+ return status;
+ }
private:
- StreamContextInterface* const context_;
+ google::protobuf::Message *const response_;
+ CompletionQueue cq_;
+ Call call_;
};
-template <class W>
-class ClientWriter : public ClientStreamingInterface,
- public WriterInterface<W> {
+// Client-side interface for bi-directional streaming.
+template <class W, class R>
+class ClientReaderWriter final : public ClientStreamingInterface,
+ public WriterInterface<W>,
+ public ReaderInterface<R> {
public:
// Blocking create a stream.
- explicit ClientWriter(StreamContextInterface* context) : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(false);
+ ClientReaderWriter(ChannelInterface *channel,
+ const RpcMethod &method, ClientContext *context)
+ : call_(channel->CreateCall(method, context, &cq_)) {}
+
+ virtual bool Read(R *msg) override {
+ CallOpBuffer buf;
+ buf.AddRecvMessage(msg);
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
}
- ~ClientWriter() { delete context_; }
+ virtual bool Write(const W& msg) override {
+ CallOpBuffer buf;
+ buf.AddSendMessage(msg);
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
+ }
- virtual bool Write(const W& msg) {
- return context_->Write(const_cast<W*>(&msg), false);
+ virtual bool WritesDone() {
+ CallOpBuffer buf;
+ buf.AddClientSendClose();
+ call_.PerformOps(&buf);
+ return cq_.Pluck(&buf);
}
- virtual void WritesDone() { context_->Write(nullptr, true); }
+ virtual Status Finish() override {
+ CallOpBuffer buf;
+ Status status;
+ buf.AddClientRecvStatus(&status);
+ call_.PerformOps(&buf);
+ GPR_ASSERT(cq_.Pluck(&buf));
+ return status;
+ }
- virtual void Cancel() { context_->Cancel(); }
+ private:
+ CompletionQueue cq_;
+ Call call_;
+};
- // Read the final response and wait for the final status.
- virtual const Status& Wait() {
- bool success = context_->Read(context_->response());
- if (!success) {
- Cancel();
- } else {
- success = context_->Read(nullptr);
- if (success) {
- Cancel();
- }
- }
- return context_->Wait();
+template <class R>
+class ServerReader final : public ReaderInterface<R> {
+ public:
+ explicit ServerReader(Call* call) : call_(call) {}
+
+ virtual bool Read(R* msg) override {
+ CallOpBuffer buf;
+ buf.AddRecvMessage(msg);
+ call_->PerformOps(&buf);
+ return call_->cq()->Pluck(&buf);
}
private:
- StreamContextInterface* const context_;
+ Call* call_;
};
-// Client-side interface for bi-directional streaming.
+template <class W>
+class ServerWriter final : public WriterInterface<W> {
+ public:
+ explicit ServerWriter(Call* call) : call_(call) {}
+
+ virtual bool Write(const W& msg) override {
+ CallOpBuffer buf;
+ buf.AddSendMessage(msg);
+ call_->PerformOps(&buf);
+ return call_->cq()->Pluck(&buf);
+ }
+
+ private:
+ Call* call_;
+};
+
+// Server-side interface for bi-directional streaming.
template <class W, class R>
-class ClientReaderWriter : public ClientStreamingInterface,
- public WriterInterface<W>,
+class ServerReaderWriter final : public WriterInterface<W>,
public ReaderInterface<R> {
public:
- // Blocking create a stream.
- explicit ClientReaderWriter(StreamContextInterface* context)
- : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(false);
+ explicit ServerReaderWriter(Call* call) : call_(call) {}
+
+ virtual bool Read(R* msg) override {
+ CallOpBuffer buf;
+ buf.AddRecvMessage(msg);
+ call_->PerformOps(&buf);
+ return call_->cq()->Pluck(&buf);
}
- ~ClientReaderWriter() { delete context_; }
+ virtual bool Write(const W& msg) override {
+ CallOpBuffer buf;
+ buf.AddSendMessage(msg);
+ call_->PerformOps(&buf);
+ return call_->cq()->Pluck(&buf);
+ }
+
+ private:
+ CompletionQueue* cq_;
+ Call* call_;
+};
+
+// Async interfaces
+// Common interface for all client side streaming.
+class ClientAsyncStreamingInterface {
+ public:
+ virtual ~ClientAsyncStreamingInterface() {}
+
+ virtual void Finish(Status* status, void* tag) = 0;
+};
+
+// An interface that yields a sequence of R messages.
+template <class R>
+class AsyncReaderInterface {
+ public:
+ virtual ~AsyncReaderInterface() {}
+
+ virtual void Read(R* msg, void* tag) = 0;
+};
- virtual bool Read(R* msg) { return context_->Read(msg); }
+// An interface that can be fed a sequence of W messages.
+template <class W>
+class AsyncWriterInterface {
+ public:
+ virtual ~AsyncWriterInterface() {}
- virtual bool Write(const W& msg) {
- return context_->Write(const_cast<W*>(&msg), false);
+ virtual void Write(const W& msg, void* tag) = 0;
+};
+
+template <class R>
+class ClientAsyncReader final : public ClientAsyncStreamingInterface,
+ public AsyncReaderInterface<R> {
+ public:
+ // Blocking create a stream and write the first request out.
+ ClientAsyncReader(ChannelInterface *channel, const RpcMethod &method,
+ ClientContext *context,
+ const google::protobuf::Message &request, void* tag)
+ : call_(channel->CreateCall(method, context, &cq_)) {
+ init_buf_.Reset(tag);
+ init_buf_.AddSendMessage(request);
+ init_buf_.AddClientSendClose();
+ call_.PerformOps(&init_buf_);
}
- virtual void WritesDone() { context_->Write(nullptr, true); }
+ virtual void Read(R *msg, void* tag) override {
+ read_buf_.Reset(tag);
+ read_buf_.AddRecvMessage(msg);
+ call_.PerformOps(&read_buf_);
+ }
- virtual void Cancel() { context_->Cancel(); }
+ virtual void Finish(Status* status, void* tag) override {
+ finish_buf_.Reset(tag);
+ finish_buf_.AddClientRecvStatus(status);
+ call_.PerformOps(&finish_buf_);
+ }
- virtual const Status& Wait() { return context_->Wait(); }
+ private:
+ CompletionQueue cq_;
+ Call call_;
+ CallOpBuffer init_buf_;
+ CallOpBuffer read_buf_;
+ CallOpBuffer finish_buf_;
+};
+
+template <class W>
+class ClientAsyncWriter final : public ClientAsyncStreamingInterface,
+ public WriterInterface<W> {
+ public:
+ // Blocking create a stream.
+ ClientAsyncWriter(ChannelInterface *channel, const RpcMethod &method,
+ ClientContext *context,
+ google::protobuf::Message *response)
+ : response_(response),
+ call_(channel->CreateCall(method, context, &cq_)) {}
+
+ virtual void Write(const W& msg, void* tag) override {
+ write_buf_.Reset(tag);
+ write_buf_.AddSendMessage(msg);
+ call_.PerformOps(&write_buf_);
+ }
+
+ virtual void WritesDone(void* tag) override {
+ writes_done_buf_.Reset(tag);
+ writes_done_buf_.AddClientSendClose();
+ call_.PerformOps(&writes_done_buf_);
+ }
+
+ virtual void Finish(Status* status, void* tag) override {
+ finish_buf_.Reset(tag);
+ finish_buf_.AddRecvMessage(response_);
+ finish_buf_.AddClientRecvStatus(status);
+ call_.PerformOps(&finish_buf_);
+ }
private:
- StreamContextInterface* const context_;
+ google::protobuf::Message *const response_;
+ CompletionQueue cq_;
+ Call call_;
+ CallOpBuffer write_buf_;
+ CallOpBuffer writes_done_buf_;
+ CallOpBuffer finish_buf_;
};
-template <class R>
-class ServerReader : public ReaderInterface<R> {
+// Client-side interface for bi-directional streaming.
+template <class W, class R>
+class ClientAsyncReaderWriter final : public ClientAsyncStreamingInterface,
+ public AsyncWriterInterface<W>,
+ public AsyncReaderInterface<R> {
public:
- explicit ServerReader(StreamContextInterface* context) : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(true);
+ ClientAsyncReaderWriter(ChannelInterface *channel,
+ const RpcMethod &method, ClientContext *context)
+ : call_(channel->CreateCall(method, context, &cq_)) {}
+
+ virtual void Read(R *msg, void* tag) override {
+ read_buf_.Reset(tag);
+ read_buf_.AddRecvMessage(msg);
+ call_.PerformOps(&read_buf_);
+ }
+
+ virtual void Write(const W& msg, void* tag) override {
+ write_buf_.Reset(tag);
+ write_buf_.AddSendMessage(msg);
+ call_.PerformOps(&write_buf_);
}
- virtual bool Read(R* msg) { return context_->Read(msg); }
+ virtual void WritesDone(void* tag) override {
+ writes_done_buf_.Reset(tag);
+ writes_done_buf_.AddClientSendClose();
+ call_.PerformOps(&writes_done_buf_);
+ }
+
+ virtual void Finish(Status* status, void* tag) override {
+ finish_buf_.Reset(tag);
+ finish_buf_.AddClientRecvStatus(status);
+ call_.PerformOps(&finish_buf_);
+ }
private:
- StreamContextInterface* const context_; // not owned
+ CompletionQueue cq_;
+ Call call_;
+ CallOpBuffer read_buf_;
+ CallOpBuffer write_buf_;
+ CallOpBuffer writes_done_buf_;
+ CallOpBuffer finish_buf_;
};
+// TODO(yangg) Move out of stream.h
template <class W>
-class ServerWriter : public WriterInterface<W> {
+class ServerAsyncResponseWriter final {
public:
- explicit ServerWriter(StreamContextInterface* context) : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(true);
- context_->Read(context_->request());
+ explicit ServerAsyncResponseWriter(Call* call) : call_(call) {}
+
+ virtual void Write(const W& msg, void* tag) override {
+ CallOpBuffer buf;
+ buf.AddSendMessage(msg);
+ call_->PerformOps(&buf);
}
- virtual bool Write(const W& msg) {
- return context_->Write(const_cast<W*>(&msg), false);
+ private:
+ Call* call_;
+};
+
+template <class R>
+class ServerAsyncReader : public AsyncReaderInterface<R> {
+ public:
+ explicit ServerAsyncReader(Call* call) : call_(call) {}
+
+ virtual void Read(R* msg, void* tag) {
+ // TODO
+ }
+
+ private:
+ Call* call_;
+};
+
+template <class W>
+class ServerAsyncWriter : public AsyncWriterInterface<W> {
+ public:
+ explicit ServerAsyncWriter(Call* call) : call_(call) {}
+
+ virtual void Write(const W& msg, void* tag) {
+ // TODO
}
private:
- StreamContextInterface* const context_; // not owned
+ Call* call_;
};
// Server-side interface for bi-directional streaming.
template <class W, class R>
-class ServerReaderWriter : public WriterInterface<W>,
- public ReaderInterface<R> {
+class ServerAsyncReaderWriter : public AsyncWriterInterface<W>,
+ public AsyncReaderInterface<R> {
public:
- explicit ServerReaderWriter(StreamContextInterface* context)
- : context_(context) {
- GPR_ASSERT(context_);
- context_->Start(true);
- }
+ explicit ServerAsyncReaderWriter(Call* call) : call_(call) {}
- virtual bool Read(R* msg) { return context_->Read(msg); }
+ virtual void Read(R* msg, void* tag) {
+ // TODO
+ }
- virtual bool Write(const W& msg) {
- return context_->Write(const_cast<W*>(&msg), false);
+ virtual void Write(const W& msg, void* tag) {
+ // TODO
}
private:
- StreamContextInterface* const context_; // not owned
+ Call* call_;
};
} // namespace grpc
diff --git a/src/core/support/cpu.h b/include/grpc/support/cpu.h
index f8ec2c6522..9025f7c21f 100644
--- a/src/core/support/cpu.h
+++ b/include/grpc/support/cpu.h
@@ -34,6 +34,10 @@
#ifndef __GRPC_INTERNAL_SUPPORT_CPU_H__
#define __GRPC_INTERNAL_SUPPORT_CPU_H__
+#ifdef __cplusplus
+extern "C" {
+#endif
+
/* Interface providing CPU information for currently running system */
/* Return the number of CPU cores on the current system. Will return 0 if
@@ -46,4 +50,8 @@ unsigned gpr_cpu_num_cores(void);
[0, gpr_cpu_num_cores() - 1] */
unsigned gpr_cpu_current_cpu(void);
+#ifdef __cplusplus
+} // extern "C"
+#endif
+
#endif /* __GRPC_INTERNAL_SUPPORT_CPU_H__ */
diff --git a/src/compiler/cpp_generator.cc b/src/compiler/cpp_generator.cc
index 8724f97e8b..d04c2efcc4 100644
--- a/src/compiler/cpp_generator.cc
+++ b/src/compiler/cpp_generator.cc
@@ -61,6 +61,17 @@ bool BidiStreaming(const google::protobuf::MethodDescriptor *method) {
return method->client_streaming() && method->server_streaming();
}
+bool HasUnaryCalls(const google::protobuf::FileDescriptor *file) {
+ for (int i = 0; i < file->service_count(); i++) {
+ for (int j = 0; j < file->service(i)->method_count(); j++) {
+ if (NoStreaming(file->service(i)->method(j))) {
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
bool HasClientOnlyStreaming(const google::protobuf::FileDescriptor *file) {
for (int i = 0; i < file->service_count(); i++) {
for (int j = 0; j < file->service(i)->method_count(); j++) {
@@ -97,20 +108,29 @@ bool HasBidiStreaming(const google::protobuf::FileDescriptor *file) {
std::string GetHeaderIncludes(const google::protobuf::FileDescriptor *file) {
std::string temp =
- "#include \"grpc++/impl/internal_stub.h\"\n"
- "#include \"grpc++/status.h\"\n"
+ "#include <grpc++/impl/internal_stub.h>\n"
+ "#include <grpc++/impl/service_type.h>\n"
+ "#include <grpc++/status.h>\n"
"\n"
"namespace grpc {\n"
"class ChannelInterface;\n"
"class RpcService;\n"
"class ServerContext;\n";
+ if (HasUnaryCalls(file)) {
+ temp.append(
+ "template <class OutMessage> class ServerAsyncResponseWriter;\n");
+ }
if (HasClientOnlyStreaming(file)) {
temp.append("template <class OutMessage> class ClientWriter;\n");
temp.append("template <class InMessage> class ServerReader;\n");
+ temp.append("template <class OutMessage> class ClientAsyncWriter;\n");
+ temp.append("template <class InMessage> class ServerAsyncReader;\n");
}
if (HasServerOnlyStreaming(file)) {
temp.append("template <class InMessage> class ClientReader;\n");
temp.append("template <class OutMessage> class ServerWriter;\n");
+ temp.append("template <class OutMessage> class ClientAsyncReader;\n");
+ temp.append("template <class InMessage> class ServerAsyncWriter;\n");
}
if (HasBidiStreaming(file)) {
temp.append(
@@ -119,16 +139,24 @@ std::string GetHeaderIncludes(const google::protobuf::FileDescriptor *file) {
temp.append(
"template <class OutMessage, class InMessage>\n"
"class ServerReaderWriter;\n");
+ temp.append(
+ "template <class OutMessage, class InMessage>\n"
+ "class ClientAsyncReaderWriter;\n");
+ temp.append(
+ "template <class OutMessage, class InMessage>\n"
+ "class ServerAsyncReaderWriter;\n");
}
temp.append("} // namespace grpc\n");
return temp;
}
std::string GetSourceIncludes() {
- return "#include \"grpc++/channel_interface.h\"\n"
- "#include \"grpc++/impl/rpc_method.h\"\n"
- "#include \"grpc++/impl/rpc_service_method.h\"\n"
- "#include \"grpc++/stream.h\"\n";
+ return "#include <grpc++/channel_interface.h>\n"
+ "#include <grpc++/impl/client_unary_call.h>\n"
+ "#include <grpc++/impl/rpc_method.h>\n"
+ "#include <grpc++/impl/rpc_service_method.h>\n"
+ "#include <grpc++/impl/service_type.h>\n"
+ "#include <grpc++/stream.h>\n";
}
void PrintHeaderClientMethod(google::protobuf::io::Printer *printer,
@@ -142,27 +170,45 @@ void PrintHeaderClientMethod(google::protobuf::io::Printer *printer,
if (NoStreaming(method)) {
printer->Print(*vars,
"::grpc::Status $Method$(::grpc::ClientContext* context, "
- "const $Request$& request, $Response$* response);\n\n");
+ "const $Request$& request, $Response$* response);\n");
+ printer->Print(*vars,
+ "void $Method$(::grpc::ClientContext* context, "
+ "const $Request$& request, $Response$* response, "
+ "::grpc::Status *status, "
+ "::grpc::CompletionQueue *cq, void *tag);\n");
} else if (ClientOnlyStreaming(method)) {
- printer->Print(
- *vars,
- "::grpc::ClientWriter< $Request$>* $Method$("
- "::grpc::ClientContext* context, $Response$* response);\n\n");
+ printer->Print(*vars,
+ "::grpc::ClientWriter< $Request$>* $Method$("
+ "::grpc::ClientContext* context, $Response$* response);\n");
+ printer->Print(*vars,
+ "::grpc::ClientWriter< $Request$>* $Method$("
+ "::grpc::ClientContext* context, $Response$* response, "
+ "::grpc::Status *status, "
+ "::grpc::CompletionQueue *cq, void *tag);\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(
*vars,
"::grpc::ClientReader< $Response$>* $Method$("
- "::grpc::ClientContext* context, const $Request$* request);\n\n");
+ "::grpc::ClientContext* context, const $Request$* request);\n");
+ printer->Print(*vars,
+ "::grpc::ClientReader< $Response$>* $Method$("
+ "::grpc::ClientContext* context, const $Request$* request, "
+ "::grpc::CompletionQueue *cq, void *tag);\n");
} else if (BidiStreaming(method)) {
printer->Print(*vars,
"::grpc::ClientReaderWriter< $Request$, $Response$>* "
- "$Method$(::grpc::ClientContext* context);\n\n");
+ "$Method$(::grpc::ClientContext* context);\n");
+ printer->Print(*vars,
+ "::grpc::ClientAsyncReaderWriter< $Request$, $Response$>* "
+ "$Method$(::grpc::ClientContext* context, "
+ "::grpc::CompletionQueue *cq, void *tag);\n");
}
}
-void PrintHeaderServerMethod(google::protobuf::io::Printer *printer,
- const google::protobuf::MethodDescriptor *method,
- std::map<std::string, std::string> *vars) {
+void PrintHeaderServerMethodSync(
+ google::protobuf::io::Printer *printer,
+ const google::protobuf::MethodDescriptor *method,
+ std::map<std::string, std::string> *vars) {
(*vars)["Method"] = method->name();
(*vars)["Request"] =
grpc_cpp_generator::ClassName(method->input_type(), true);
@@ -194,19 +240,56 @@ void PrintHeaderServerMethod(google::protobuf::io::Printer *printer,
}
}
+void PrintHeaderServerMethodAsync(
+ google::protobuf::io::Printer *printer,
+ const google::protobuf::MethodDescriptor *method,
+ std::map<std::string, std::string> *vars) {
+ (*vars)["Method"] = method->name();
+ (*vars)["Request"] =
+ grpc_cpp_generator::ClassName(method->input_type(), true);
+ (*vars)["Response"] =
+ grpc_cpp_generator::ClassName(method->output_type(), true);
+ if (NoStreaming(method)) {
+ printer->Print(*vars,
+ "void $Method$("
+ "::grpc::ServerContext* context, $Request$* request, "
+ "::grpc::ServerAsyncResponseWriter< $Response$>* response, "
+ "::grpc::CompletionQueue* cq, void *tag);\n");
+ } else if (ClientOnlyStreaming(method)) {
+ printer->Print(*vars,
+ "void $Method$("
+ "::grpc::ServerContext* context, "
+ "::grpc::ServerAsyncReader< $Request$>* reader, "
+ "::grpc::CompletionQueue* cq, void *tag);\n");
+ } else if (ServerOnlyStreaming(method)) {
+ printer->Print(*vars,
+ "void $Method$("
+ "::grpc::ServerContext* context, $Request$* request, "
+ "::grpc::ServerAsyncWriter< $Response$>* writer, "
+ "::grpc::CompletionQueue* cq, void *tag);\n");
+ } else if (BidiStreaming(method)) {
+ printer->Print(
+ *vars,
+ "void $Method$("
+ "::grpc::ServerContext* context, "
+ "::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, "
+ "::grpc::CompletionQueue* cq, void *tag);\n");
+ }
+}
+
void PrintHeaderService(google::protobuf::io::Printer *printer,
const google::protobuf::ServiceDescriptor *service,
std::map<std::string, std::string> *vars) {
(*vars)["Service"] = service->name();
printer->Print(*vars,
- "class $Service$ {\n"
+ "class $Service$ final {\n"
" public:\n");
printer->Indent();
// Client side
printer->Print(
- "class Stub : public ::grpc::InternalStub {\n"
+ "class Stub final : public ::grpc::InternalStub {\n"
" public:\n");
printer->Indent();
for (int i = 0; i < service->method_count(); ++i) {
@@ -220,17 +303,34 @@ void PrintHeaderService(google::protobuf::io::Printer *printer,
printer->Print("\n");
- // Server side
+ // Server side - Synchronous
printer->Print(
- "class Service {\n"
+ "class Service : public ::grpc::SynchronousService {\n"
" public:\n");
printer->Indent();
printer->Print("Service() : service_(nullptr) {}\n");
printer->Print("virtual ~Service();\n");
for (int i = 0; i < service->method_count(); ++i) {
- PrintHeaderServerMethod(printer, service->method(i), vars);
+ PrintHeaderServerMethodSync(printer, service->method(i), vars);
+ }
+ printer->Print("::grpc::RpcService* service() override final;\n");
+ printer->Outdent();
+ printer->Print(
+ " private:\n"
+ " ::grpc::RpcService* service_;\n");
+ printer->Print("};\n");
+
+ // Server side - Asynchronous
+ printer->Print(
+ "class AsyncService final : public ::grpc::AsynchronousService {\n"
+ " public:\n");
+ printer->Indent();
+ printer->Print("AsyncService() : service_(nullptr) {}\n");
+ printer->Print("~AsyncService();\n");
+ for (int i = 0; i < service->method_count(); ++i) {
+ PrintHeaderServerMethodAsync(printer, service->method(i), vars);
}
- printer->Print("::grpc::RpcService* service();\n");
+ printer->Print("::grpc::RpcService* service() override;\n");
printer->Outdent();
printer->Print(
" private:\n"
@@ -268,7 +368,7 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer,
"::grpc::ClientContext* context, "
"const $Request$& request, $Response$* response) {\n");
printer->Print(*vars,
- " return channel()->StartBlockingRpc("
+ "return ::grpc::BlockingUnaryCall(channel(),"
"::grpc::RpcMethod(\"/$Package$$Service$/$Method$\"), "
"context, request, response);\n"
"}\n\n");
@@ -279,10 +379,10 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer,
"::grpc::ClientContext* context, $Response$* response) {\n");
printer->Print(*vars,
" return new ::grpc::ClientWriter< $Request$>("
- "channel()->CreateStream("
+ "channel(),"
"::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", "
"::grpc::RpcMethod::RpcType::CLIENT_STREAMING), "
- "context, nullptr, response));\n"
+ "context, response);\n"
"}\n\n");
} else if (ServerOnlyStreaming(method)) {
printer->Print(
@@ -291,10 +391,10 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer,
"::grpc::ClientContext* context, const $Request$* request) {\n");
printer->Print(*vars,
" return new ::grpc::ClientReader< $Response$>("
- "channel()->CreateStream("
+ "channel(),"
"::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", "
"::grpc::RpcMethod::RpcType::SERVER_STREAMING), "
- "context, request, nullptr));\n"
+ "context, *request);\n"
"}\n\n");
} else if (BidiStreaming(method)) {
printer->Print(
@@ -304,10 +404,10 @@ void PrintSourceClientMethod(google::protobuf::io::Printer *printer,
printer->Print(
*vars,
" return new ::grpc::ClientReaderWriter< $Request$, $Response$>("
- "channel()->CreateStream("
+ "channel(),"
"::grpc::RpcMethod(\"/$Package$$Service$/$Method$\", "
"::grpc::RpcMethod::RpcType::BIDI_STREAMING), "
- "context, nullptr, nullptr));\n"
+ "context);\n"
"}\n\n");
}
}
@@ -362,6 +462,47 @@ void PrintSourceServerMethod(google::protobuf::io::Printer *printer,
}
}
+void PrintSourceServerAsyncMethod(google::protobuf::io::Printer *printer,
+ const google::protobuf::MethodDescriptor *method,
+ std::map<std::string, std::string> *vars) {
+ (*vars)["Method"] = method->name();
+ (*vars)["Request"] =
+ grpc_cpp_generator::ClassName(method->input_type(), true);
+ (*vars)["Response"] =
+ grpc_cpp_generator::ClassName(method->output_type(), true);
+ if (NoStreaming(method)) {
+ printer->Print(*vars,
+ "void $Service$::AsyncService::$Method$("
+ "::grpc::ServerContext* context, "
+ "$Request$* request, "
+ "::grpc::ServerAsyncResponseWriter< $Response$>* response, "
+ "::grpc::CompletionQueue* cq, void* tag) {\n");
+ printer->Print("}\n\n");
+ } else if (ClientOnlyStreaming(method)) {
+ printer->Print(*vars,
+ "void $Service$::AsyncService::$Method$("
+ "::grpc::ServerContext* context, "
+ "::grpc::ServerAsyncReader< $Request$>* reader, "
+ "::grpc::CompletionQueue* cq, void* tag) {\n");
+ printer->Print("}\n\n");
+ } else if (ServerOnlyStreaming(method)) {
+ printer->Print(*vars,
+ "void $Service$::AsyncService::$Method$("
+ "::grpc::ServerContext* context, "
+ "$Request$* request, "
+ "::grpc::ServerAsyncWriter< $Response$>* writer, "
+ "::grpc::CompletionQueue* cq, void* tag) {\n");
+ printer->Print("}\n\n");
+ } else if (BidiStreaming(method)) {
+ printer->Print(*vars,
+ "void $Service$::AsyncService::$Method$("
+ "::grpc::ServerContext* context, "
+ "::grpc::ServerAsyncReaderWriter< $Response$, $Request$>* stream, "
+ "::grpc::CompletionQueue* cq, void *tag) {\n");
+ printer->Print("}\n\n");
+ }
+}
+
void PrintSourceService(google::protobuf::io::Printer *printer,
const google::protobuf::ServiceDescriptor *service,
std::map<std::string, std::string> *vars) {
@@ -384,6 +525,7 @@ void PrintSourceService(google::protobuf::io::Printer *printer,
"}\n\n");
for (int i = 0; i < service->method_count(); ++i) {
PrintSourceServerMethod(printer, service->method(i), vars);
+ PrintSourceServerAsyncMethod(printer, service->method(i), vars);
}
printer->Print(*vars,
"::grpc::RpcService* $Service$::Service::service() {\n");
diff --git a/src/core/statistics/census_log.c b/src/core/statistics/census_log.c
index aea0a33bad..1504c027de 100644
--- a/src/core/statistics/census_log.c
+++ b/src/core/statistics/census_log.c
@@ -91,9 +91,9 @@
*/
#include "src/core/statistics/census_log.h"
#include <string.h>
-#include "src/core/support/cpu.h"
#include <grpc/support/alloc.h>
#include <grpc/support/atm.h>
+#include <grpc/support/cpu.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/sync.h>
diff --git a/src/core/support/cpu_linux.c b/src/core/support/cpu_linux.c
index ad82174894..c8375e65b6 100644
--- a/src/core/support/cpu_linux.c
+++ b/src/core/support/cpu_linux.c
@@ -39,7 +39,7 @@
#ifdef GPR_CPU_LINUX
-#include "src/core/support/cpu.h"
+#include <grpc/support/cpu.h>
#include <sched.h>
#include <errno.h>
diff --git a/src/cpp/client/channel.cc b/src/cpp/client/channel.cc
index 3f39364bda..a1539e4711 100644
--- a/src/cpp/client/channel.cc
+++ b/src/cpp/client/channel.cc
@@ -42,9 +42,10 @@
#include <grpc/support/slice.h>
#include "src/cpp/proto/proto_utils.h"
-#include "src/cpp/stream/stream_context.h"
+#include <grpc++/call.h>
#include <grpc++/channel_arguments.h>
#include <grpc++/client_context.h>
+#include <grpc++/completion_queue.h>
#include <grpc++/config.h>
#include <grpc++/credentials.h>
#include <grpc++/impl/rpc_method.h>
@@ -77,103 +78,23 @@ Channel::Channel(const grpc::string &target,
Channel::~Channel() { grpc_channel_destroy(c_channel_); }
-namespace {
-// Pluck the finished event and set to status when it is not nullptr.
-void GetFinalStatus(grpc_completion_queue *cq, void *finished_tag,
- Status *status) {
- grpc_event *ev =
- grpc_completion_queue_pluck(cq, finished_tag, gpr_inf_future);
- if (status) {
- StatusCode error_code = static_cast<StatusCode>(ev->data.finished.status);
- grpc::string details(ev->data.finished.details ? ev->data.finished.details
- : "");
- *status = Status(error_code, details);
- }
- grpc_event_finish(ev);
+Call Channel::CreateCall(const RpcMethod &method, ClientContext *context,
+ CompletionQueue *cq) {
+ auto c_call =
+ grpc_channel_create_call(c_channel_, cq->cq(), method.name(),
+ target_.c_str(), context->RawDeadline());
+ context->set_call(c_call);
+ return Call(c_call, this, cq);
}
-} // namespace
-// TODO(yangg) more error handling
-Status Channel::StartBlockingRpc(const RpcMethod &method,
- ClientContext *context,
- const google::protobuf::Message &request,
- google::protobuf::Message *result) {
- Status status;
- grpc_call *call = grpc_channel_create_call_old(
- c_channel_, method.name(), target_.c_str(), context->RawDeadline());
- context->set_call(call);
-
- grpc_event *ev;
- void *finished_tag = reinterpret_cast<char *>(call);
- void *metadata_read_tag = reinterpret_cast<char *>(call) + 2;
- void *write_tag = reinterpret_cast<char *>(call) + 3;
- void *halfclose_tag = reinterpret_cast<char *>(call) + 4;
- void *read_tag = reinterpret_cast<char *>(call) + 5;
-
- grpc_completion_queue *cq = grpc_completion_queue_create();
- context->set_cq(cq);
- // add_metadata from context
- //
- // invoke
- GPR_ASSERT(grpc_call_invoke_old(call, cq, metadata_read_tag, finished_tag,
- GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
- // write request
- grpc_byte_buffer *write_buffer = nullptr;
- bool success = SerializeProto(request, &write_buffer);
- if (!success) {
- grpc_call_cancel(call);
- status =
- Status(StatusCode::DATA_LOSS, "Failed to serialize request proto.");
- GetFinalStatus(cq, finished_tag, nullptr);
- return status;
- }
- GPR_ASSERT(grpc_call_start_write_old(call, write_buffer, write_tag,
- GRPC_WRITE_BUFFER_HINT) == GRPC_CALL_OK);
- grpc_byte_buffer_destroy(write_buffer);
- ev = grpc_completion_queue_pluck(cq, write_tag, gpr_inf_future);
-
- success = ev->data.write_accepted == GRPC_OP_OK;
- grpc_event_finish(ev);
- if (!success) {
- GetFinalStatus(cq, finished_tag, &status);
- return status;
- }
- // writes done
- GPR_ASSERT(grpc_call_writes_done_old(call, halfclose_tag) == GRPC_CALL_OK);
- ev = grpc_completion_queue_pluck(cq, halfclose_tag, gpr_inf_future);
- grpc_event_finish(ev);
- // start read metadata
- //
- ev = grpc_completion_queue_pluck(cq, metadata_read_tag, gpr_inf_future);
- grpc_event_finish(ev);
- // start read
- GPR_ASSERT(grpc_call_start_read_old(call, read_tag) == GRPC_CALL_OK);
- ev = grpc_completion_queue_pluck(cq, read_tag, gpr_inf_future);
- if (ev->data.read) {
- if (!DeserializeProto(ev->data.read, result)) {
- grpc_event_finish(ev);
- status = Status(StatusCode::DATA_LOSS, "Failed to parse response proto.");
- GetFinalStatus(cq, finished_tag, nullptr);
- return status;
- }
- }
- grpc_event_finish(ev);
-
- // wait status
- GetFinalStatus(cq, finished_tag, &status);
- return status;
-}
-
-StreamContextInterface *Channel::CreateStream(
- const RpcMethod &method, ClientContext *context,
- const google::protobuf::Message *request,
- google::protobuf::Message *result) {
- grpc_call *call = grpc_channel_create_call_old(
- c_channel_, method.name(), target_.c_str(), context->RawDeadline());
- context->set_call(call);
- grpc_completion_queue *cq = grpc_completion_queue_create();
- context->set_cq(cq);
- return new StreamContext(method, context, request, result);
+void Channel::PerformOpsOnCall(CallOpBuffer *buf, Call *call) {
+ static const size_t MAX_OPS = 8;
+ size_t nops = MAX_OPS;
+ grpc_op ops[MAX_OPS];
+ buf->FillOps(ops, &nops);
+ GPR_ASSERT(GRPC_CALL_OK ==
+ grpc_call_start_batch(call->call(), ops, nops,
+ buf));
}
} // namespace grpc
diff --git a/src/cpp/client/channel.h b/src/cpp/client/channel.h
index 67d18bf4c8..894f87698c 100644
--- a/src/cpp/client/channel.h
+++ b/src/cpp/client/channel.h
@@ -42,11 +42,14 @@
struct grpc_channel;
namespace grpc {
+class Call;
+class CallOpBuffer;
class ChannelArguments;
+class CompletionQueue;
class Credentials;
class StreamContextInterface;
-class Channel : public ChannelInterface {
+class Channel final : public ChannelInterface {
public:
Channel(const grpc::string &target, const ChannelArguments &args);
Channel(const grpc::string &target, const std::unique_ptr<Credentials> &creds,
@@ -54,14 +57,10 @@ class Channel : public ChannelInterface {
~Channel() override;
- Status StartBlockingRpc(const RpcMethod &method, ClientContext *context,
- const google::protobuf::Message &request,
- google::protobuf::Message *result) override;
-
- StreamContextInterface *CreateStream(
- const RpcMethod &method, ClientContext *context,
- const google::protobuf::Message *request,
- google::protobuf::Message *result) override;
+ virtual Call CreateCall(const RpcMethod &method, ClientContext *context,
+ CompletionQueue *cq) override;
+ virtual void PerformOpsOnCall(CallOpBuffer *ops,
+ Call *call) override;
private:
const grpc::string target_;
diff --git a/src/cpp/server/server_rpc_handler.h b/src/cpp/client/client_unary_call.cc
index a43e07dc5f..e652750e22 100644
--- a/src/cpp/server/server_rpc_handler.h
+++ b/src/cpp/client/client_unary_call.cc
@@ -31,36 +31,30 @@
*
*/
-#ifndef __GRPCPP_INTERNAL_SERVER_SERVER_RPC_HANDLER_H__
-#define __GRPCPP_INTERNAL_SERVER_SERVER_RPC_HANDLER_H__
-
-#include <memory>
-
+#include <grpc++/impl/client_unary_call.h>
+#include <grpc++/call.h>
+#include <grpc++/channel_interface.h>
#include <grpc++/completion_queue.h>
#include <grpc++/status.h>
namespace grpc {
-class AsyncServerContext;
-class RpcServiceMethod;
-
-class ServerRpcHandler {
- public:
- // Takes ownership of async_server_context.
- ServerRpcHandler(AsyncServerContext *async_server_context,
- RpcServiceMethod *method);
-
- void StartRpc();
-
- private:
- CompletionQueue::CompletionType WaitForNextEvent();
- void FinishRpc(const Status &status);
-
- std::unique_ptr<AsyncServerContext> async_server_context_;
- RpcServiceMethod *method_;
- CompletionQueue cq_;
-};
+// Wrapper that performs a blocking unary call
+Status BlockingUnaryCall(ChannelInterface *channel, const RpcMethod &method,
+ ClientContext *context,
+ const google::protobuf::Message &request,
+ google::protobuf::Message *result) {
+ CompletionQueue cq;
+ Call call(channel->CreateCall(method, context, &cq));
+ CallOpBuffer buf;
+ Status status;
+ buf.AddSendMessage(request);
+ buf.AddRecvMessage(result);
+ buf.AddClientSendClose();
+ buf.AddClientRecvStatus(&status);
+ call.PerformOps(&buf);
+ cq.Pluck(&buf);
+ return status;
+}
} // namespace grpc
-
-#endif // __GRPCPP_INTERNAL_SERVER_SERVER_RPC_HANDLER_H__
diff --git a/include/grpc++/stream_context_interface.h b/src/cpp/common/call.cc
index a84119800b..d3a9de3620 100644
--- a/include/grpc++/stream_context_interface.h
+++ b/src/cpp/common/call.cc
@@ -31,34 +31,13 @@
*
*/
-#ifndef __GRPCPP_STREAM_CONTEXT_INTERFACE_H__
-#define __GRPCPP_STREAM_CONTEXT_INTERFACE_H__
-
-namespace google {
-namespace protobuf {
-class Message;
-}
-}
+#include <include/grpc++/call.h>
+#include <include/grpc++/channel_interface.h>
namespace grpc {
-class Status;
-
-// An interface to avoid dependency on internal implementation.
-class StreamContextInterface {
- public:
- virtual ~StreamContextInterface() {}
- virtual void Start(bool buffered) = 0;
-
- virtual bool Read(google::protobuf::Message* msg) = 0;
- virtual bool Write(const google::protobuf::Message* msg, bool is_last) = 0;
- virtual const Status& Wait() = 0;
- virtual void Cancel() = 0;
-
- virtual google::protobuf::Message* request() = 0;
- virtual google::protobuf::Message* response() = 0;
-};
+void Call::PerformOps(CallOpBuffer* buffer) {
+ channel_->PerformOpsOnCall(buffer, this);
+}
} // namespace grpc
-
-#endif // __GRPCPP_STREAM_CONTEXT_INTERFACE_H__
diff --git a/src/cpp/common/completion_queue.cc b/src/cpp/common/completion_queue.cc
index f06da9b04f..cbeda81a0b 100644
--- a/src/cpp/common/completion_queue.cc
+++ b/src/cpp/common/completion_queue.cc
@@ -1,5 +1,4 @@
/*
- *
* Copyright 2014, Google Inc.
* All rights reserved.
*
@@ -33,11 +32,12 @@
#include <grpc++/completion_queue.h>
+#include <memory>
+
#include <grpc/grpc.h>
#include <grpc/support/log.h>
#include <grpc/support/time.h>
#include "src/cpp/util/time.h"
-#include <grpc++/async_server_context.h>
namespace grpc {
@@ -47,66 +47,35 @@ CompletionQueue::~CompletionQueue() { grpc_completion_queue_destroy(cq_); }
void CompletionQueue::Shutdown() { grpc_completion_queue_shutdown(cq_); }
-CompletionQueue::CompletionType CompletionQueue::Next(void **tag) {
- grpc_event *ev;
- CompletionType return_type;
- bool success;
+// Helper class so we can declare a unique_ptr with grpc_event
+class EventDeleter {
+ public:
+ void operator()(grpc_event *ev) { if (ev) grpc_event_finish(ev); }
+};
- ev = grpc_completion_queue_next(cq_, gpr_inf_future);
- if (!ev) {
- gpr_log(GPR_ERROR, "no next event in queue");
- abort();
- }
- switch (ev->type) {
- case GRPC_QUEUE_SHUTDOWN:
- return_type = QUEUE_CLOSED;
- break;
- case GRPC_READ:
- *tag = ev->tag;
- if (ev->data.read) {
- success = static_cast<AsyncServerContext *>(ev->tag)
- ->ParseRead(ev->data.read);
- return_type = success ? SERVER_READ_OK : SERVER_READ_ERROR;
- } else {
- return_type = SERVER_READ_ERROR;
- }
- break;
- case GRPC_WRITE_ACCEPTED:
- *tag = ev->tag;
- if (ev->data.write_accepted != GRPC_OP_ERROR) {
- return_type = SERVER_WRITE_OK;
- } else {
- return_type = SERVER_WRITE_ERROR;
- }
- break;
- case GRPC_SERVER_RPC_NEW:
- GPR_ASSERT(!ev->tag);
- // Finishing the pending new rpcs after the server has been shutdown.
- if (!ev->call) {
- *tag = nullptr;
- } else {
- *tag = new AsyncServerContext(
- ev->call, ev->data.server_rpc_new.method,
- ev->data.server_rpc_new.host,
- Timespec2Timepoint(ev->data.server_rpc_new.deadline));
- }
- return_type = SERVER_RPC_NEW;
- break;
- case GRPC_FINISHED:
- *tag = ev->tag;
- return_type = RPC_END;
- break;
- case GRPC_FINISH_ACCEPTED:
- *tag = ev->tag;
- return_type = HALFCLOSE_OK;
- break;
- default:
- // We do not handle client side messages now
- gpr_log(GPR_ERROR, "client-side messages aren't supported yet");
- abort();
+bool CompletionQueue::Next(void **tag, bool *ok) {
+ std::unique_ptr<grpc_event, EventDeleter> ev;
+
+ ev.reset(grpc_completion_queue_next(cq_, gpr_inf_future));
+ if (ev->type == GRPC_QUEUE_SHUTDOWN) {
+ return false;
}
- grpc_event_finish(ev);
- return return_type;
+ auto cq_tag = static_cast<CompletionQueueTag *>(ev->tag);
+ *ok = ev->data.op_complete == GRPC_OP_OK;
+ *tag = cq_tag;
+ cq_tag->FinalizeResult(tag, ok);
+ return true;
+}
+
+bool CompletionQueue::Pluck(CompletionQueueTag *tag) {
+ std::unique_ptr<grpc_event, EventDeleter> ev;
+
+ ev.reset(grpc_completion_queue_pluck(cq_, tag, gpr_inf_future));
+ bool ok = ev->data.op_complete == GRPC_OP_OK;
+ void *ignored = tag;
+ tag->FinalizeResult(&ignored, &ok);
+ GPR_ASSERT(ignored == tag);
+ return ok;
}
} // namespace grpc
diff --git a/src/cpp/server/async_server.cc b/src/cpp/server/async_server.cc
deleted file mode 100644
index 86faa07b31..0000000000
--- a/src/cpp/server/async_server.cc
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <grpc++/async_server.h>
-
-#include <grpc/grpc.h>
-#include <grpc/support/log.h>
-#include <grpc++/completion_queue.h>
-
-namespace grpc {
-
-AsyncServer::AsyncServer(CompletionQueue *cc)
- : started_(false), shutdown_(false) {
- server_ = grpc_server_create(cc->cq(), nullptr);
-}
-
-AsyncServer::~AsyncServer() {
- std::unique_lock<std::mutex> lock(shutdown_mu_);
- if (started_ && !shutdown_) {
- lock.unlock();
- Shutdown();
- }
- grpc_server_destroy(server_);
-}
-
-void AsyncServer::AddPort(const grpc::string &addr) {
- GPR_ASSERT(!started_);
- int success = grpc_server_add_http2_port(server_, addr.c_str());
- GPR_ASSERT(success);
-}
-
-void AsyncServer::Start() {
- GPR_ASSERT(!started_);
- started_ = true;
- grpc_server_start(server_);
-}
-
-void AsyncServer::RequestOneRpc() {
- GPR_ASSERT(started_);
- std::unique_lock<std::mutex> lock(shutdown_mu_);
- if (shutdown_) {
- return;
- }
- lock.unlock();
- grpc_call_error err = grpc_server_request_call_old(server_, nullptr);
- GPR_ASSERT(err == GRPC_CALL_OK);
-}
-
-void AsyncServer::Shutdown() {
- std::unique_lock<std::mutex> lock(shutdown_mu_);
- if (started_ && !shutdown_) {
- shutdown_ = true;
- lock.unlock();
- // TODO(yangg) should we shutdown without start?
- grpc_server_shutdown(server_);
- }
-}
-
-} // namespace grpc
diff --git a/src/cpp/server/server.cc b/src/cpp/server/server.cc
index 1abdf702e2..5d44ab2ba4 100644
--- a/src/cpp/server/server.cc
+++ b/src/cpp/server/server.cc
@@ -37,25 +37,20 @@
#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
#include <grpc/support/log.h>
-#include "src/cpp/server/server_rpc_handler.h"
-#include "src/cpp/server/thread_pool.h"
-#include <grpc++/async_server_context.h>
#include <grpc++/completion_queue.h>
#include <grpc++/impl/rpc_service_method.h>
+#include <grpc++/server_context.h>
#include <grpc++/server_credentials.h>
+#include <grpc++/thread_pool_interface.h>
namespace grpc {
-// TODO(rocking): consider a better default value like num of cores.
-static const int kNumThreads = 4;
-
-Server::Server(ThreadPoolInterface *thread_pool, ServerCredentials *creds)
+Server::Server(ThreadPoolInterface *thread_pool, bool thread_pool_owned, ServerCredentials *creds)
: started_(false),
shutdown_(false),
num_running_cb_(0),
- thread_pool_(thread_pool == nullptr ? new ThreadPool(kNumThreads)
- : thread_pool),
- thread_pool_owned_(thread_pool == nullptr),
+ thread_pool_(thread_pool),
+ thread_pool_owned_(thread_pool_owned),
secure_(creds != nullptr) {
if (creds) {
server_ =
@@ -75,6 +70,8 @@ Server::~Server() {
if (started_ && !shutdown_) {
lock.unlock();
Shutdown();
+ } else {
+ lock.unlock();
}
grpc_server_destroy(server_);
if (thread_pool_owned_) {
@@ -82,37 +79,38 @@ Server::~Server() {
}
}
-void Server::RegisterService(RpcService *service) {
+bool Server::RegisterService(RpcService *service) {
for (int i = 0; i < service->GetMethodCount(); ++i) {
RpcServiceMethod *method = service->GetMethod(i);
+ if (method_map_.find(method->name()) != method_map_.end()) {
+ gpr_log(GPR_DEBUG, "Attempt to register %s multiple times", method->name());
+ return false;
+ }
method_map_.insert(std::make_pair(method->name(), method));
}
+ return true;
}
-void Server::AddPort(const grpc::string &addr) {
+int Server::AddPort(const grpc::string &addr) {
GPR_ASSERT(!started_);
- int success;
if (secure_) {
- success = grpc_server_add_secure_http2_port(server_, addr.c_str());
+ return grpc_server_add_secure_http2_port(server_, addr.c_str());
} else {
- success = grpc_server_add_http2_port(server_, addr.c_str());
+ return grpc_server_add_http2_port(server_, addr.c_str());
}
- GPR_ASSERT(success);
}
-void Server::Start() {
+bool Server::Start() {
GPR_ASSERT(!started_);
started_ = true;
grpc_server_start(server_);
// Start processing rpcs.
- ScheduleCallback();
-}
+ if (thread_pool_) {
+ ScheduleCallback();
+ }
-void Server::AllowOneRpc() {
- GPR_ASSERT(started_);
- grpc_call_error err = grpc_server_request_call_old(server_, nullptr);
- GPR_ASSERT(err == GRPC_CALL_OK);
+ return true;
}
void Server::Shutdown() {
@@ -132,8 +130,8 @@ void Server::Shutdown() {
// Shutdown the completion queue.
cq_.Shutdown();
void *tag = nullptr;
- CompletionQueue::CompletionType t = cq_.Next(&tag);
- GPR_ASSERT(t == CompletionQueue::QUEUE_CLOSED);
+ bool ok = false;
+ GPR_ASSERT(false == cq_.Next(&tag, &ok));
}
void Server::ScheduleCallback() {
@@ -141,31 +139,40 @@ void Server::ScheduleCallback() {
std::unique_lock<std::mutex> lock(mu_);
num_running_cb_++;
}
- std::function<void()> callback = std::bind(&Server::RunRpc, this);
- thread_pool_->ScheduleCallback(callback);
+ thread_pool_->ScheduleCallback(std::bind(&Server::RunRpc, this));
}
void Server::RunRpc() {
// Wait for one more incoming rpc.
void *tag = nullptr;
- AllowOneRpc();
- CompletionQueue::CompletionType t = cq_.Next(&tag);
- GPR_ASSERT(t == CompletionQueue::SERVER_RPC_NEW);
-
- AsyncServerContext *server_context = static_cast<AsyncServerContext *>(tag);
- // server_context could be nullptr during server shutdown.
- if (server_context != nullptr) {
- // Schedule a new callback to handle more rpcs.
+ GPR_ASSERT(started_);
+ grpc_call *c_call = NULL;
+ grpc_call_details call_details;
+ grpc_call_details_init(&call_details);
+ grpc_metadata_array initial_metadata;
+ grpc_metadata_array_init(&initial_metadata);
+ CompletionQueue cq;
+ grpc_call_error err = grpc_server_request_call(server_, &c_call, &call_details, &initial_metadata, cq.cq(), nullptr);
+ GPR_ASSERT(err == GRPC_CALL_OK);
+ bool ok = false;
+ GPR_ASSERT(cq_.Next(&tag, &ok));
+ if (ok) {
+ ServerContext context;
+ Call call(c_call, nullptr, &cq);
ScheduleCallback();
-
RpcServiceMethod *method = nullptr;
- auto iter = method_map_.find(server_context->method());
+ auto iter = method_map_.find(call_details.method);
if (iter != method_map_.end()) {
method = iter->second;
}
- ServerRpcHandler rpc_handler(server_context, method);
- rpc_handler.StartRpc();
+ // TODO(ctiller): allocate only if necessary
+ std::unique_ptr<google::protobuf::Message> request(method->AllocateRequestProto());
+ std::unique_ptr<google::protobuf::Message> response(method->AllocateResponseProto());
+ method->handler()->RunHandler(MethodHandler::HandlerParameter(
+ &call, &context, request.get(), response.get()));
}
+ grpc_call_details_destroy(&call_details);
+ grpc_metadata_array_destroy(&initial_metadata);
{
std::unique_lock<std::mutex> lock(mu_);
diff --git a/src/cpp/server/server_builder.cc b/src/cpp/server/server_builder.cc
index add22cc3d8..d6bcb9313a 100644
--- a/src/cpp/server/server_builder.cc
+++ b/src/cpp/server/server_builder.cc
@@ -33,15 +33,22 @@
#include <grpc++/server_builder.h>
+#include <grpc/support/cpu.h>
#include <grpc/support/log.h>
+#include <grpc++/impl/service_type.h>
#include <grpc++/server.h>
+#include "src/cpp/server/thread_pool.h"
namespace grpc {
-ServerBuilder::ServerBuilder() : thread_pool_(nullptr) {}
+ServerBuilder::ServerBuilder() {}
-void ServerBuilder::RegisterService(RpcService *service) {
- services_.push_back(service);
+void ServerBuilder::RegisterService(SynchronousService *service) {
+ services_.push_back(service->service());
+}
+
+void ServerBuilder::RegisterAsyncService(AsynchronousService *service) {
+ async_services_.push_back(service);
}
void ServerBuilder::AddPort(const grpc::string &addr) {
@@ -59,14 +66,31 @@ void ServerBuilder::SetThreadPool(ThreadPoolInterface *thread_pool) {
}
std::unique_ptr<Server> ServerBuilder::BuildAndStart() {
- std::unique_ptr<Server> server(new Server(thread_pool_, creds_.get()));
+ bool thread_pool_owned = false;
+ if (!async_services_.empty() && !services_.empty()) {
+ gpr_log(GPR_ERROR, "Mixing async and sync services is unsupported for now");
+ return nullptr;
+ }
+ if (!thread_pool_ && services_.size()) {
+ int cores = gpr_cpu_num_cores();
+ if (!cores) cores = 4;
+ thread_pool_ = new ThreadPool(cores);
+ thread_pool_owned = true;
+ }
+ std::unique_ptr<Server> server(new Server(thread_pool_, thread_pool_owned, creds_.get()));
for (auto *service : services_) {
- server->RegisterService(service);
+ if (!server->RegisterService(service)) {
+ return nullptr;
+ }
}
for (auto &port : ports_) {
- server->AddPort(port);
+ if (!server->AddPort(port)) {
+ return nullptr;
+ }
+ }
+ if (!server->Start()) {
+ return nullptr;
}
- server->Start();
return server;
}
diff --git a/src/cpp/server/server_rpc_handler.cc b/src/cpp/server/server_rpc_handler.cc
deleted file mode 100644
index bf02de8b80..0000000000
--- a/src/cpp/server/server_rpc_handler.cc
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/cpp/server/server_rpc_handler.h"
-
-#include <grpc/support/log.h>
-#include "src/cpp/server/server_context_impl.h"
-#include "src/cpp/stream/stream_context.h"
-#include <grpc++/async_server_context.h>
-#include <grpc++/impl/rpc_service_method.h>
-
-namespace grpc {
-
-ServerRpcHandler::ServerRpcHandler(AsyncServerContext *async_server_context,
- RpcServiceMethod *method)
- : async_server_context_(async_server_context), method_(method) {}
-
-void ServerRpcHandler::StartRpc() {
- if (method_ == nullptr) {
- // Method not supported, finish the rpc with error.
- // TODO(rocking): do we need to call read to consume the request?
- FinishRpc(Status(StatusCode::UNIMPLEMENTED, "No such method."));
- return;
- }
-
- ServerContextImpl user_context(async_server_context_->absolute_deadline());
-
- if (method_->method_type() == RpcMethod::NORMAL_RPC) {
- // Start the rpc on this dedicated completion queue.
- async_server_context_->Accept(cq_.cq());
-
- // Allocate request and response.
- std::unique_ptr<google::protobuf::Message> request(
- method_->AllocateRequestProto());
- std::unique_ptr<google::protobuf::Message> response(
- method_->AllocateResponseProto());
-
- // Read request
- async_server_context_->StartRead(request.get());
- auto type = WaitForNextEvent();
- GPR_ASSERT(type == CompletionQueue::SERVER_READ_OK);
-
- // Run the application's rpc handler
- MethodHandler *handler = method_->handler();
- Status status = handler->RunHandler(MethodHandler::HandlerParameter(
- &user_context, request.get(), response.get()));
-
- if (status.IsOk()) {
- // Send the response if we get an ok status.
- async_server_context_->StartWrite(*response, GRPC_WRITE_BUFFER_HINT);
- type = WaitForNextEvent();
- if (type != CompletionQueue::SERVER_WRITE_OK) {
- status = Status(StatusCode::INTERNAL, "Error writing response.");
- }
- }
-
- FinishRpc(status);
- } else {
- // Allocate request and response.
- // TODO(yangg) maybe not allocate both when not needed?
- std::unique_ptr<google::protobuf::Message> request(
- method_->AllocateRequestProto());
- std::unique_ptr<google::protobuf::Message> response(
- method_->AllocateResponseProto());
-
- StreamContext stream_context(*method_, async_server_context_->call(),
- cq_.cq(), request.get(), response.get());
-
- // Run the application's rpc handler
- MethodHandler *handler = method_->handler();
- Status status = handler->RunHandler(MethodHandler::HandlerParameter(
- &user_context, request.get(), response.get(), &stream_context));
- if (status.IsOk() &&
- method_->method_type() == RpcMethod::CLIENT_STREAMING) {
- stream_context.Write(response.get(), false);
- }
- // TODO(yangg) Do we need to consider the status in stream_context?
- FinishRpc(status);
- }
-}
-
-CompletionQueue::CompletionType ServerRpcHandler::WaitForNextEvent() {
- void *tag = nullptr;
- CompletionQueue::CompletionType type = cq_.Next(&tag);
- if (type != CompletionQueue::QUEUE_CLOSED &&
- type != CompletionQueue::RPC_END) {
- GPR_ASSERT(static_cast<AsyncServerContext *>(tag) ==
- async_server_context_.get());
- }
- return type;
-}
-
-void ServerRpcHandler::FinishRpc(const Status &status) {
- async_server_context_->StartWriteStatus(status);
- CompletionQueue::CompletionType type;
-
- // HALFCLOSE_OK and RPC_END events come in either order.
- type = WaitForNextEvent();
- GPR_ASSERT(type == CompletionQueue::HALFCLOSE_OK ||
- type == CompletionQueue::RPC_END);
- type = WaitForNextEvent();
- GPR_ASSERT(type == CompletionQueue::HALFCLOSE_OK ||
- type == CompletionQueue::RPC_END);
-
- cq_.Shutdown();
- type = WaitForNextEvent();
- GPR_ASSERT(type == CompletionQueue::QUEUE_CLOSED);
-}
-
-} // namespace grpc
diff --git a/src/cpp/server/thread_pool.h b/src/cpp/server/thread_pool.h
index c53f7a7517..8a28c87704 100644
--- a/src/cpp/server/thread_pool.h
+++ b/src/cpp/server/thread_pool.h
@@ -44,12 +44,12 @@
namespace grpc {
-class ThreadPool : public ThreadPoolInterface {
+class ThreadPool final : public ThreadPoolInterface {
public:
explicit ThreadPool(int num_threads);
~ThreadPool();
- void ScheduleCallback(const std::function<void()> &callback) final;
+ void ScheduleCallback(const std::function<void()> &callback) override;
private:
std::mutex mu_;
diff --git a/src/cpp/stream/stream_context.cc b/src/cpp/stream/stream_context.cc
deleted file mode 100644
index e4f344dbb9..0000000000
--- a/src/cpp/stream/stream_context.cc
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "src/cpp/stream/stream_context.h"
-
-#include <grpc/support/log.h>
-#include "src/cpp/proto/proto_utils.h"
-#include "src/cpp/util/time.h"
-#include <grpc++/client_context.h>
-#include <grpc++/config.h>
-#include <grpc++/impl/rpc_method.h>
-#include <google/protobuf/message.h>
-
-namespace grpc {
-
-// Client only ctor
-StreamContext::StreamContext(const RpcMethod &method, ClientContext *context,
- const google::protobuf::Message *request,
- google::protobuf::Message *result)
- : is_client_(true),
- method_(&method),
- call_(context->call()),
- cq_(context->cq()),
- request_(const_cast<google::protobuf::Message *>(request)),
- result_(result),
- peer_halfclosed_(false),
- self_halfclosed_(false) {
- GPR_ASSERT(method_->method_type() != RpcMethod::RpcType::NORMAL_RPC);
-}
-
-// Server only ctor
-StreamContext::StreamContext(const RpcMethod &method, grpc_call *call,
- grpc_completion_queue *cq,
- google::protobuf::Message *request,
- google::protobuf::Message *result)
- : is_client_(false),
- method_(&method),
- call_(call),
- cq_(cq),
- request_(request),
- result_(result),
- peer_halfclosed_(false),
- self_halfclosed_(false) {
- GPR_ASSERT(method_->method_type() != RpcMethod::RpcType::NORMAL_RPC);
-}
-
-StreamContext::~StreamContext() {}
-
-void StreamContext::Start(bool buffered) {
- if (is_client_) {
- // TODO(yangg) handle metadata send path
- int flag = buffered ? GRPC_WRITE_BUFFER_HINT : 0;
- grpc_call_error error = grpc_call_invoke_old(
- call(), cq(), client_metadata_read_tag(), finished_tag(), flag);
- GPR_ASSERT(GRPC_CALL_OK == error);
- } else {
- // TODO(yangg) metadata needs to be added before accept
- // TODO(yangg) correctly set flag to accept
- GPR_ASSERT(grpc_call_server_accept_old(call(), cq(), finished_tag()) ==
- GRPC_CALL_OK);
- GPR_ASSERT(grpc_call_server_end_initial_metadata_old(call(), 0) ==
- GRPC_CALL_OK);
- }
-}
-
-bool StreamContext::Read(google::protobuf::Message *msg) {
- // TODO(yangg) check peer_halfclosed_ here for possible early return.
- grpc_call_error err = grpc_call_start_read_old(call(), read_tag());
- GPR_ASSERT(err == GRPC_CALL_OK);
- grpc_event *read_ev =
- grpc_completion_queue_pluck(cq(), read_tag(), gpr_inf_future);
- GPR_ASSERT(read_ev->type == GRPC_READ);
- bool ret = true;
- if (read_ev->data.read) {
- if (!DeserializeProto(read_ev->data.read, msg)) {
- ret = false;
- grpc_call_cancel_with_status(call(), GRPC_STATUS_DATA_LOSS,
- "Failed to parse incoming proto");
- }
- } else {
- ret = false;
- peer_halfclosed_ = true;
- }
- grpc_event_finish(read_ev);
- return ret;
-}
-
-bool StreamContext::Write(const google::protobuf::Message *msg, bool is_last) {
- // TODO(yangg) check self_halfclosed_ for possible early return.
- bool ret = true;
- grpc_event *ev = nullptr;
-
- if (msg) {
- grpc_byte_buffer *out_buf = nullptr;
- if (!SerializeProto(*msg, &out_buf)) {
- grpc_call_cancel_with_status(call(), GRPC_STATUS_INVALID_ARGUMENT,
- "Failed to serialize outgoing proto");
- return false;
- }
- int flag = is_last ? GRPC_WRITE_BUFFER_HINT : 0;
- grpc_call_error err =
- grpc_call_start_write_old(call(), out_buf, write_tag(), flag);
- grpc_byte_buffer_destroy(out_buf);
- GPR_ASSERT(err == GRPC_CALL_OK);
-
- ev = grpc_completion_queue_pluck(cq(), write_tag(), gpr_inf_future);
- GPR_ASSERT(ev->type == GRPC_WRITE_ACCEPTED);
-
- ret = ev->data.write_accepted == GRPC_OP_OK;
- grpc_event_finish(ev);
- }
- if (ret && is_last) {
- grpc_call_error err = grpc_call_writes_done_old(call(), halfclose_tag());
- GPR_ASSERT(err == GRPC_CALL_OK);
- ev = grpc_completion_queue_pluck(cq(), halfclose_tag(), gpr_inf_future);
- GPR_ASSERT(ev->type == GRPC_FINISH_ACCEPTED);
- grpc_event_finish(ev);
-
- self_halfclosed_ = true;
- } else if (!ret) { // Stream broken
- self_halfclosed_ = true;
- peer_halfclosed_ = true;
- }
-
- return ret;
-}
-
-const Status &StreamContext::Wait() {
- // TODO(yangg) properly support metadata
- grpc_event *metadata_ev = grpc_completion_queue_pluck(
- cq(), client_metadata_read_tag(), gpr_inf_future);
- grpc_event_finish(metadata_ev);
- // TODO(yangg) protect states by a mutex, including other places.
- if (!self_halfclosed_ || !peer_halfclosed_) {
- Cancel();
- }
- grpc_event *finish_ev =
- grpc_completion_queue_pluck(cq(), finished_tag(), gpr_inf_future);
- GPR_ASSERT(finish_ev->type == GRPC_FINISHED);
- final_status_ = Status(
- static_cast<StatusCode>(finish_ev->data.finished.status),
- finish_ev->data.finished.details ? finish_ev->data.finished.details : "");
- grpc_event_finish(finish_ev);
- return final_status_;
-}
-
-void StreamContext::Cancel() { grpc_call_cancel(call()); }
-
-} // namespace grpc
diff --git a/src/cpp/stream/stream_context.h b/src/cpp/stream/stream_context.h
deleted file mode 100644
index 8def589841..0000000000
--- a/src/cpp/stream/stream_context.h
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#ifndef __GRPCPP_INTERNAL_STREAM_STREAM_CONTEXT_H__
-#define __GRPCPP_INTERNAL_STREAM_STREAM_CONTEXT_H__
-
-#include <grpc/grpc.h>
-#include <grpc++/status.h>
-#include <grpc++/stream_context_interface.h>
-
-namespace google {
-namespace protobuf {
-class Message;
-}
-}
-
-namespace grpc {
-class ClientContext;
-class RpcMethod;
-
-class StreamContext final : public StreamContextInterface {
- public:
- StreamContext(const RpcMethod &method, ClientContext *context,
- const google::protobuf::Message *request,
- google::protobuf::Message *result);
- StreamContext(const RpcMethod &method, grpc_call *call,
- grpc_completion_queue *cq, google::protobuf::Message *request,
- google::protobuf::Message *result);
- ~StreamContext();
- // Start the stream, if there is a final write following immediately, set
- // buffered so that the messages can be sent in batch.
- void Start(bool buffered) override;
- bool Read(google::protobuf::Message *msg) override;
- bool Write(const google::protobuf::Message *msg, bool is_last) override;
- const Status &Wait() override;
- void Cancel() override;
-
- google::protobuf::Message *request() override { return request_; }
- google::protobuf::Message *response() override { return result_; }
-
- private:
- // Unique tags for plucking events from the c layer. this pointer is casted
- // to char* to create single byte step between tags. It implicitly relies on
- // that StreamContext is large enough to contain all the pointers.
- void *finished_tag() { return reinterpret_cast<char *>(this); }
- void *read_tag() { return reinterpret_cast<char *>(this) + 1; }
- void *write_tag() { return reinterpret_cast<char *>(this) + 2; }
- void *halfclose_tag() { return reinterpret_cast<char *>(this) + 3; }
- void *client_metadata_read_tag() {
- return reinterpret_cast<char *>(this) + 5;
- }
- grpc_call *call() { return call_; }
- grpc_completion_queue *cq() { return cq_; }
-
- bool is_client_;
- const RpcMethod *method_; // not owned
- grpc_call *call_; // not owned
- grpc_completion_queue *cq_; // not owned
- google::protobuf::Message *request_; // first request, not owned
- google::protobuf::Message *result_; // last response, not owned
-
- bool peer_halfclosed_;
- bool self_halfclosed_;
- Status final_status_;
-};
-
-} // namespace grpc
-
-#endif // __GRPCPP_INTERNAL_STREAM_STREAM_CONTEXT_H__
diff --git a/test/core/statistics/census_log_tests.c b/test/core/statistics/census_log_tests.c
index c7b2b2e46d..e2ad78a6f2 100644
--- a/test/core/statistics/census_log_tests.c
+++ b/test/core/statistics/census_log_tests.c
@@ -35,7 +35,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
-#include "src/core/support/cpu.h"
+#include <grpc/support/cpu.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include <grpc/support/sync.h>
diff --git a/test/cpp/end2end/async_test_server.cc b/test/cpp/end2end/async_test_server.cc
deleted file mode 100644
index f18b6c00bc..0000000000
--- a/test/cpp/end2end/async_test_server.cc
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include "test/cpp/end2end/async_test_server.h"
-
-#include <chrono>
-
-#include <grpc/support/log.h>
-#include "src/cpp/proto/proto_utils.h"
-#include "test/cpp/util/echo.pb.h"
-#include <grpc++/async_server.h>
-#include <grpc++/async_server_context.h>
-#include <grpc++/completion_queue.h>
-#include <grpc++/status.h>
-#include <gtest/gtest.h>
-
-using grpc::cpp::test::util::EchoRequest;
-using grpc::cpp::test::util::EchoResponse;
-
-using std::chrono::duration_cast;
-using std::chrono::microseconds;
-using std::chrono::seconds;
-using std::chrono::system_clock;
-
-namespace grpc {
-namespace testing {
-
-AsyncTestServer::AsyncTestServer() : server_(&cq_), cq_drained_(false) {}
-
-AsyncTestServer::~AsyncTestServer() {}
-
-void AsyncTestServer::AddPort(const grpc::string& addr) {
- server_.AddPort(addr);
-}
-
-void AsyncTestServer::Start() { server_.Start(); }
-
-// Return true if deadline actual is within 0.5s from expected.
-bool DeadlineMatched(const system_clock::time_point& actual,
- const system_clock::time_point& expected) {
- microseconds diff_usecs = duration_cast<microseconds>(expected - actual);
- gpr_log(GPR_INFO, "diff_usecs= %d", diff_usecs.count());
- return diff_usecs.count() < 500000 && diff_usecs.count() > -500000;
-}
-
-void AsyncTestServer::RequestOneRpc() { server_.RequestOneRpc(); }
-
-void AsyncTestServer::MainLoop() {
- EchoRequest request;
- EchoResponse response;
- void* tag = nullptr;
-
- RequestOneRpc();
-
- while (true) {
- CompletionQueue::CompletionType t = cq_.Next(&tag);
- AsyncServerContext* server_context = static_cast<AsyncServerContext*>(tag);
- switch (t) {
- case CompletionQueue::SERVER_RPC_NEW:
- gpr_log(GPR_INFO, "SERVER_RPC_NEW %p", server_context);
- if (server_context) {
- EXPECT_EQ(server_context->method(), "/foo");
- // TODO(ctiller): verify deadline
- server_context->Accept(cq_.cq());
- // Handle only one rpc at a time.
- RequestOneRpc();
- server_context->StartRead(&request);
- }
- break;
- case CompletionQueue::RPC_END:
- gpr_log(GPR_INFO, "RPC_END %p", server_context);
- delete server_context;
- break;
- case CompletionQueue::SERVER_READ_OK:
- gpr_log(GPR_INFO, "SERVER_READ_OK %p", server_context);
- response.set_message(request.message());
- server_context->StartWrite(response, 0);
- break;
- case CompletionQueue::SERVER_READ_ERROR:
- gpr_log(GPR_INFO, "SERVER_READ_ERROR %p", server_context);
- server_context->StartWriteStatus(Status::OK);
- break;
- case CompletionQueue::HALFCLOSE_OK:
- gpr_log(GPR_INFO, "HALFCLOSE_OK %p", server_context);
- // Do nothing, just wait for RPC_END.
- break;
- case CompletionQueue::SERVER_WRITE_OK:
- gpr_log(GPR_INFO, "SERVER_WRITE_OK %p", server_context);
- server_context->StartRead(&request);
- break;
- case CompletionQueue::SERVER_WRITE_ERROR:
- EXPECT_TRUE(0);
- break;
- case CompletionQueue::QUEUE_CLOSED: {
- gpr_log(GPR_INFO, "QUEUE_CLOSED");
- HandleQueueClosed();
- return;
- }
- default:
- EXPECT_TRUE(0);
- break;
- }
- }
-}
-
-void AsyncTestServer::HandleQueueClosed() {
- std::unique_lock<std::mutex> lock(cq_drained_mu_);
- cq_drained_ = true;
- cq_drained_cv_.notify_all();
-}
-
-void AsyncTestServer::Shutdown() {
- // The server need to be shut down before cq_ as grpc_server flushes all
- // pending requested calls to the completion queue at shutdown.
- server_.Shutdown();
- cq_.Shutdown();
- std::unique_lock<std::mutex> lock(cq_drained_mu_);
- while (!cq_drained_) {
- cq_drained_cv_.wait(lock);
- }
-}
-
-} // namespace testing
-} // namespace grpc
diff --git a/test/cpp/end2end/end2end_test.cc b/test/cpp/end2end/end2end_test.cc
index 4dea77ea81..52deb0f32d 100644
--- a/test/cpp/end2end/end2end_test.cc
+++ b/test/cpp/end2end/end2end_test.cc
@@ -147,8 +147,8 @@ class End2endTest : public ::testing::Test {
// Setup server
ServerBuilder builder;
builder.AddPort(server_address_.str());
- builder.RegisterService(service_.service());
- builder.RegisterService(dup_pkg_service_.service());
+ builder.RegisterService(&service_);
+ builder.RegisterService(&dup_pkg_service_);
server_ = builder.BuildAndStart();
}
@@ -290,7 +290,7 @@ TEST_F(End2endTest, RequestStreamOneRequest) {
request.set_message("hello");
EXPECT_TRUE(stream->Write(request));
stream->WritesDone();
- Status s = stream->Wait();
+ Status s = stream->Finish();
EXPECT_EQ(response.message(), request.message());
EXPECT_TRUE(s.IsOk());
@@ -308,7 +308,7 @@ TEST_F(End2endTest, RequestStreamTwoRequests) {
EXPECT_TRUE(stream->Write(request));
EXPECT_TRUE(stream->Write(request));
stream->WritesDone();
- Status s = stream->Wait();
+ Status s = stream->Finish();
EXPECT_EQ(response.message(), "hellohello");
EXPECT_TRUE(s.IsOk());
@@ -332,7 +332,7 @@ TEST_F(End2endTest, ResponseStream) {
EXPECT_EQ(response.message(), request.message() + "2");
EXPECT_FALSE(stream->Read(&response));
- Status s = stream->Wait();
+ Status s = stream->Finish();
EXPECT_TRUE(s.IsOk());
delete stream;
@@ -366,7 +366,7 @@ TEST_F(End2endTest, BidiStream) {
stream->WritesDone();
EXPECT_FALSE(stream->Read(&response));
- Status s = stream->Wait();
+ Status s = stream->Finish();
EXPECT_TRUE(s.IsOk());
delete stream;
@@ -422,7 +422,7 @@ TEST_F(End2endTest, BadCredentials) {
ClientContext context2;
ClientReaderWriter<EchoRequest, EchoResponse>* stream =
stub->BidiStream(&context2);
- s = stream->Wait();
+ s = stream->Finish();
EXPECT_FALSE(s.IsOk());
EXPECT_EQ(StatusCode::UNKNOWN, s.code());
EXPECT_EQ("Rpc sent on a lame channel.", s.details());
diff --git a/test/cpp/end2end/sync_client_async_server_test.cc b/test/cpp/end2end/sync_client_async_server_test.cc
deleted file mode 100644
index 9955eb306f..0000000000
--- a/test/cpp/end2end/sync_client_async_server_test.cc
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- *
- * Copyright 2014, Google Inc.
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- *
- */
-
-#include <chrono>
-#include <memory>
-#include <sstream>
-#include <string>
-
-#include <grpc/grpc.h>
-#include <grpc/support/thd.h>
-#include "test/cpp/util/echo.pb.h"
-#include <grpc++/channel_arguments.h>
-#include <grpc++/channel_interface.h>
-#include <grpc++/client_context.h>
-#include <grpc++/create_channel.h>
-#include <grpc++/impl/internal_stub.h>
-#include <grpc++/impl/rpc_method.h>
-#include <grpc++/status.h>
-#include <grpc++/stream.h>
-#include "test/cpp/end2end/async_test_server.h"
-#include "test/core/util/port.h"
-#include <gtest/gtest.h>
-
-using grpc::cpp::test::util::EchoRequest;
-using grpc::cpp::test::util::EchoResponse;
-
-using std::chrono::duration_cast;
-using std::chrono::microseconds;
-using std::chrono::seconds;
-using std::chrono::system_clock;
-
-using grpc::testing::AsyncTestServer;
-
-namespace grpc {
-namespace {
-
-void ServerLoop(void* s) {
- AsyncTestServer* server = static_cast<AsyncTestServer*>(s);
- server->MainLoop();
-}
-
-class End2endTest : public ::testing::Test {
- protected:
- void SetUp() override {
- int port = grpc_pick_unused_port_or_die();
- // TODO(yangg) protobuf has a StringPrintf, maybe use that
- std::ostringstream oss;
- oss << "[::]:" << port;
- // Setup server
- server_.reset(new AsyncTestServer());
- server_->AddPort(oss.str());
- server_->Start();
-
- RunServerThread();
-
- // Setup client
- oss.str("");
- oss << "127.0.0.1:" << port;
- std::shared_ptr<ChannelInterface> channel =
- CreateChannel(oss.str(), ChannelArguments());
- stub_.set_channel(channel);
- }
-
- void RunServerThread() {
- gpr_thd_id id;
- EXPECT_TRUE(gpr_thd_new(&id, ServerLoop, server_.get(), NULL));
- }
-
- void TearDown() override { server_->Shutdown(); }
-
- std::unique_ptr<AsyncTestServer> server_;
- InternalStub stub_;
-};
-
-TEST_F(End2endTest, NoOpTest) { EXPECT_TRUE(stub_.channel() != nullptr); }
-
-TEST_F(End2endTest, SimpleRpc) {
- EchoRequest request;
- request.set_message("hello");
- EchoResponse result;
- ClientContext context;
- RpcMethod method("/foo");
- std::chrono::system_clock::time_point deadline =
- std::chrono::system_clock::now() + std::chrono::seconds(10);
- context.set_absolute_deadline(deadline);
- Status s =
- stub_.channel()->StartBlockingRpc(method, &context, request, &result);
- EXPECT_EQ(result.message(), request.message());
- EXPECT_TRUE(s.IsOk());
-}
-
-TEST_F(End2endTest, KSequentialSimpleRpcs) {
- int k = 3;
- for (int i = 0; i < k; i++) {
- EchoRequest request;
- request.set_message("hello");
- EchoResponse result;
- ClientContext context;
- RpcMethod method("/foo");
- std::chrono::system_clock::time_point deadline =
- std::chrono::system_clock::now() + std::chrono::seconds(10);
- context.set_absolute_deadline(deadline);
- Status s =
- stub_.channel()->StartBlockingRpc(method, &context, request, &result);
- EXPECT_EQ(result.message(), request.message());
- EXPECT_TRUE(s.IsOk());
- }
-}
-
-TEST_F(End2endTest, OnePingpongBidiStream) {
- EchoRequest request;
- request.set_message("hello");
- EchoResponse result;
- ClientContext context;
- RpcMethod method("/foo", RpcMethod::RpcType::BIDI_STREAMING);
- std::chrono::system_clock::time_point deadline =
- std::chrono::system_clock::now() + std::chrono::seconds(10);
- context.set_absolute_deadline(deadline);
- StreamContextInterface* stream_interface =
- stub_.channel()->CreateStream(method, &context, nullptr, nullptr);
- std::unique_ptr<ClientReaderWriter<EchoRequest, EchoResponse>> stream(
- new ClientReaderWriter<EchoRequest, EchoResponse>(stream_interface));
- EXPECT_TRUE(stream->Write(request));
- EXPECT_TRUE(stream->Read(&result));
- stream->WritesDone();
- EXPECT_FALSE(stream->Read(&result));
- Status s = stream->Wait();
- EXPECT_EQ(result.message(), request.message());
- EXPECT_TRUE(s.IsOk());
-}
-
-TEST_F(End2endTest, TwoPingpongBidiStream) {
- EchoRequest request;
- request.set_message("hello");
- EchoResponse result;
- ClientContext context;
- RpcMethod method("/foo", RpcMethod::RpcType::BIDI_STREAMING);
- std::chrono::system_clock::time_point deadline =
- std::chrono::system_clock::now() + std::chrono::seconds(10);
- context.set_absolute_deadline(deadline);
- StreamContextInterface* stream_interface =
- stub_.channel()->CreateStream(method, &context, nullptr, nullptr);
- std::unique_ptr<ClientReaderWriter<EchoRequest, EchoResponse>> stream(
- new ClientReaderWriter<EchoRequest, EchoResponse>(stream_interface));
- EXPECT_TRUE(stream->Write(request));
- EXPECT_TRUE(stream->Read(&result));
- EXPECT_EQ(result.message(), request.message());
- EXPECT_TRUE(stream->Write(request));
- EXPECT_TRUE(stream->Read(&result));
- EXPECT_EQ(result.message(), request.message());
- stream->WritesDone();
- EXPECT_FALSE(stream->Read(&result));
- Status s = stream->Wait();
- EXPECT_TRUE(s.IsOk());
-}
-
-TEST_F(End2endTest, OnePingpongClientStream) {
- EchoRequest request;
- request.set_message("hello");
- EchoResponse result;
- ClientContext context;
- RpcMethod method("/foo", RpcMethod::RpcType::CLIENT_STREAMING);
- std::chrono::system_clock::time_point deadline =
- std::chrono::system_clock::now() + std::chrono::seconds(10);
- context.set_absolute_deadline(deadline);
- StreamContextInterface* stream_interface =
- stub_.channel()->CreateStream(method, &context, nullptr, &result);
- std::unique_ptr<ClientWriter<EchoRequest>> stream(
- new ClientWriter<EchoRequest>(stream_interface));
- EXPECT_TRUE(stream->Write(request));
- stream->WritesDone();
- Status s = stream->Wait();
- EXPECT_EQ(result.message(), request.message());
- EXPECT_TRUE(s.IsOk());
-}
-
-TEST_F(End2endTest, OnePingpongServerStream) {
- EchoRequest request;
- request.set_message("hello");
- EchoResponse result;
- ClientContext context;
- RpcMethod method("/foo", RpcMethod::RpcType::SERVER_STREAMING);
- std::chrono::system_clock::time_point deadline =
- std::chrono::system_clock::now() + std::chrono::seconds(10);
- context.set_absolute_deadline(deadline);
- StreamContextInterface* stream_interface =
- stub_.channel()->CreateStream(method, &context, &request, nullptr);
- std::unique_ptr<ClientReader<EchoResponse>> stream(
- new ClientReader<EchoResponse>(stream_interface));
- EXPECT_TRUE(stream->Read(&result));
- EXPECT_FALSE(stream->Read(nullptr));
- Status s = stream->Wait();
- EXPECT_EQ(result.message(), request.message());
- EXPECT_TRUE(s.IsOk());
-}
-
-} // namespace
-} // namespace grpc
-
-int main(int argc, char** argv) {
- grpc_init();
- ::testing::InitGoogleTest(&argc, argv);
- int result = RUN_ALL_TESTS();
- grpc_shutdown();
- return result;
-}
diff --git a/test/cpp/interop/client.cc b/test/cpp/interop/client.cc
index 0fa76f0e02..29bbe4d931 100644
--- a/test/cpp/interop/client.cc
+++ b/test/cpp/interop/client.cc
@@ -248,7 +248,7 @@ void DoRequestStreaming() {
aggregated_payload_size += request_stream_sizes[i];
}
stream->WritesDone();
- grpc::Status s = stream->Wait();
+ grpc::Status s = stream->Finish();
GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size);
GPR_ASSERT(s.IsOk());
@@ -278,7 +278,7 @@ void DoResponseStreaming() {
++i;
}
GPR_ASSERT(response_stream_sizes.size() == i);
- grpc::Status s = stream->Wait();
+ grpc::Status s = stream->Finish();
GPR_ASSERT(s.IsOk());
gpr_log(GPR_INFO, "Response streaming done.");
@@ -311,7 +311,7 @@ void DoResponseStreamingWithSlowConsumer() {
++i;
}
GPR_ASSERT(kNumResponseMessages == i);
- grpc::Status s = stream->Wait();
+ grpc::Status s = stream->Finish();
GPR_ASSERT(s.IsOk());
gpr_log(GPR_INFO, "Response streaming done.");
@@ -345,7 +345,7 @@ void DoHalfDuplex() {
++i;
}
GPR_ASSERT(response_stream_sizes.size() == i);
- grpc::Status s = stream->Wait();
+ grpc::Status s = stream->Finish();
GPR_ASSERT(s.IsOk());
gpr_log(GPR_INFO, "Half-duplex streaming rpc done.");
}
@@ -378,7 +378,7 @@ void DoPingPong() {
stream->WritesDone();
GPR_ASSERT(!stream->Read(&response));
- grpc::Status s = stream->Wait();
+ grpc::Status s = stream->Finish();
GPR_ASSERT(s.IsOk());
gpr_log(GPR_INFO, "Ping pong streaming done.");
}
diff --git a/test/cpp/interop/server.cc b/test/cpp/interop/server.cc
index 8a6be57929..a8399779b9 100644
--- a/test/cpp/interop/server.cc
+++ b/test/cpp/interop/server.cc
@@ -200,7 +200,7 @@ void RunServer() {
ServerBuilder builder;
builder.AddPort(server_address.str());
- builder.RegisterService(service.service());
+ builder.RegisterService(&service);
if (FLAGS_enable_ssl) {
SslServerCredentialsOptions ssl_opts = {
"", {{test_server1_key, test_server1_cert}}};
diff --git a/test/cpp/qps/server.cc b/test/cpp/qps/server.cc
index c35d9ebdd8..4e1d2cab0e 100644
--- a/test/cpp/qps/server.cc
+++ b/test/cpp/qps/server.cc
@@ -125,7 +125,7 @@ static void RunServer() {
ServerBuilder builder;
builder.AddPort(server_address);
- builder.RegisterService(service.service());
+ builder.RegisterService(&service);
std::unique_ptr<Server> server(builder.BuildAndStart());
gpr_log(GPR_INFO, "Server listening on %s\n", server_address);
diff --git a/tools/run_tests/tests.json b/tools/run_tests/tests.json
index 197dc3b2ba..2ab3f50bb2 100644
--- a/tools/run_tests/tests.json
+++ b/tools/run_tests/tests.json
@@ -283,10 +283,6 @@
},
{
"language": "c++",
- "name": "sync_client_async_server_test"
- },
- {
- "language": "c++",
"name": "thread_pool_test"
},
{