diff options
-rw-r--r-- | src/core/ext/transport/cronet/transport/cronet_api_dummy.c | 3 | ||||
-rw-r--r-- | src/objective-c/GRPCClient/GRPCCall.m | 46 | ||||
-rw-r--r-- | src/objective-c/RxLibrary/GRXImmediateSingleWriter.h | 50 | ||||
-rw-r--r-- | src/objective-c/RxLibrary/GRXImmediateSingleWriter.m | 83 | ||||
-rw-r--r-- | src/objective-c/RxLibrary/GRXWriter+Immediate.m | 3 | ||||
-rw-r--r-- | src/objective-c/tests/InteropTests.m | 64 | ||||
-rw-r--r-- | src/objective-c/tests/InteropTestsRemoteWithCronet/InteropTestsRemoteWithCronet.m | 11 | ||||
-rw-r--r-- | src/objective-c/tests/Podfile | 1 | ||||
-rw-r--r-- | src/objective-c/tests/Tests.xcodeproj/project.pbxproj | 1 | ||||
-rw-r--r-- | third_party/objective_c/Cronet/cronet_c_for_grpc.h | 115 |
10 files changed, 320 insertions, 57 deletions
diff --git a/src/core/ext/transport/cronet/transport/cronet_api_dummy.c b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c index 687026c9fd..38755604b9 100644 --- a/src/core/ext/transport/cronet/transport/cronet_api_dummy.c +++ b/src/core/ext/transport/cronet/transport/cronet_api_dummy.c @@ -77,9 +77,8 @@ int cronet_bidirectional_stream_write(cronet_bidirectional_stream* stream, return 0; } -int cronet_bidirectional_stream_cancel(cronet_bidirectional_stream* stream) { +void cronet_bidirectional_stream_cancel(cronet_bidirectional_stream* stream) { GPR_ASSERT(0); - return 0; } #endif /* GRPC_COMPILE_WITH_CRONET */ diff --git a/src/objective-c/GRPCClient/GRPCCall.m b/src/objective-c/GRPCClient/GRPCCall.m index 44393f6b99..0a10322367 100644 --- a/src/objective-c/GRPCClient/GRPCCall.m +++ b/src/objective-c/GRPCClient/GRPCCall.m @@ -36,6 +36,7 @@ #include <grpc/grpc.h> #include <grpc/support/time.h> #import <RxLibrary/GRXConcurrentWriteable.h> +#import <RxLibrary/GRXImmediateSingleWriter.h> #import "private/GRPCConnectivityMonitor.h" #import "private/GRPCHost.h" @@ -100,6 +101,10 @@ static NSMutableDictionary *callFlags; GRPCCall *_retainSelf; GRPCRequestHeaders *_requestHeaders; + + BOOL _unaryCall; + + NSMutableArray *_unaryOpBatch; } @synthesize state = _state; @@ -157,6 +162,11 @@ static NSMutableDictionary *callFlags; _requestWriter = requestWriter; _requestHeaders = [[GRPCRequestHeaders alloc] initWithCall:self]; + + if ([requestWriter isKindOfClass:[GRXImmediateSingleWriter class]]) { + _unaryCall = true; + _unaryOpBatch = [[NSMutableArray alloc] init]; + } } return self; } @@ -165,6 +175,9 @@ static NSMutableDictionary *callFlags; - (void)finishWithError:(NSError *)errorOrNil { @synchronized(self) { + if (_state == GRXWriterStateFinished) { + return; + } _state = GRXWriterStateFinished; } @@ -254,9 +267,15 @@ static NSMutableDictionary *callFlags; - (void)sendHeaders:(NSDictionary *)headers { // TODO(jcanizales): Add error handlers for async failures - [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMetadata alloc] initWithMetadata:headers - flags:[GRPCCall callFlagsForHost:_host path:_path] - handler:nil]]]; + if (!_unaryCall) { + [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMetadata alloc] initWithMetadata:headers + flags:[GRPCCall callFlagsForHost:_host path:_path] + handler:nil]]]; + } else { + [_unaryOpBatch addObject:[[GRPCOpSendMetadata alloc] initWithMetadata:headers + flags:[GRPCCall callFlagsForHost:_host path:_path] + handler:nil]]; + } } #pragma mark GRXWriteable implementation @@ -275,9 +294,14 @@ static NSMutableDictionary *callFlags; } } }; - [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc] initWithMessage:message - handler:resumingHandler]] - errorHandler:errorHandler]; + if (!_unaryCall) { + [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendMessage alloc] initWithMessage:message + handler:resumingHandler]] + errorHandler:errorHandler]; + } else { + [_unaryOpBatch addObject:[[GRPCOpSendMessage alloc] initWithMessage:message + handler:resumingHandler]]; + } } - (void)writeValue:(id)value { @@ -302,8 +326,14 @@ static NSMutableDictionary *callFlags; // Only called from the call queue. The error handler will be called from the // network queue if the requests stream couldn't be closed successfully. - (void)finishRequestWithErrorHandler:(void (^)())errorHandler { - [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendClose alloc] init]] - errorHandler:errorHandler]; + if (!_unaryOpBatch) { + [_wrappedCall startBatchWithOperations:@[[[GRPCOpSendClose alloc] init]] + errorHandler:errorHandler]; + } else { + [_unaryOpBatch addObject:[[GRPCOpSendClose alloc] init]]; + [_wrappedCall startBatchWithOperations:_unaryOpBatch + errorHandler:errorHandler]; + } } - (void)writesFinishedWithError:(NSError *)errorOrNil { diff --git a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.h b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.h new file mode 100644 index 0000000000..0ec788f756 --- /dev/null +++ b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.h @@ -0,0 +1,50 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#import <Foundation/Foundation.h> + +#import "GRXImmediateWriter.h" + +/** + * Utility to construct GRXWriter instances from values that are immediately available when + * required. + */ +@interface GRXImmediateSingleWriter : GRXImmediateWriter + +/** + * Returns a writer that sends the passed value to its writeable and then finishes (releasing the + * value). + */ ++ (GRXWriter *)writerWithValue:(id)value; + +@end diff --git a/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m new file mode 100644 index 0000000000..a0d3b771e8 --- /dev/null +++ b/src/objective-c/RxLibrary/GRXImmediateSingleWriter.m @@ -0,0 +1,83 @@ +/* + * + * Copyright 2016, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#import "GRXImmediateSingleWriter.h" + +@implementation GRXImmediateSingleWriter { + id _value; + NSError *_errorOrNil; + id<GRXWriteable> _writeable; +} + +@synthesize state = _state; + +- (instancetype)initWithValue:(id)value error:(NSError *)errorOrNil { + if (self = [super init]) { + _value = value; + _errorOrNil = errorOrNil; + _state = GRXWriterStateNotStarted; + } + return self; +} + ++ (GRXWriter *)writerWithValue:(id)value { + return [[self alloc] initWithValue:value error:nil]; +} + +- (void)startWithWriteable:(id<GRXWriteable>)writeable { + _state = GRXWriterStateStarted; + _writeable = writeable; + [writeable writeValue:_value]; + [self finishWithError:_errorOrNil]; +} + +- (void)finishWithError:(NSError *)errorOrNil { + _state = GRXWriterStateFinished; + _errorOrNil = nil; + _value = nil; + id<GRXWriteable> writeable = _writeable; + _writeable = nil; + [writeable writesFinishedWithError:errorOrNil]; +} + +- (void)setState:(GRXWriterState)newState { + // Manual state transition is not allowed + return; +} + +- (GRXWriter *)map:(id (^)(id))map { + _value = map(_value); + return self; +} + +@end
\ No newline at end of file diff --git a/src/objective-c/RxLibrary/GRXWriter+Immediate.m b/src/objective-c/RxLibrary/GRXWriter+Immediate.m index 1d55eb3529..ea6e681406 100644 --- a/src/objective-c/RxLibrary/GRXWriter+Immediate.m +++ b/src/objective-c/RxLibrary/GRXWriter+Immediate.m @@ -34,6 +34,7 @@ #import "GRXWriter+Immediate.h" #import "GRXImmediateWriter.h" +#import "GRXImmediateSingleWriter.h" @implementation GRXWriter (Immediate) @@ -50,7 +51,7 @@ } + (instancetype)writerWithValue:(id)value { - return [GRXImmediateWriter writerWithValue:value]; + return [GRXImmediateSingleWriter writerWithValue:value]; } + (instancetype)writerWithError:(NSError *)error { diff --git a/src/objective-c/tests/InteropTests.m b/src/objective-c/tests/InteropTests.m index c3935ce1e0..8adf0a6164 100644 --- a/src/objective-c/tests/InteropTests.m +++ b/src/objective-c/tests/InteropTests.m @@ -45,6 +45,8 @@ #import <RemoteTest/Test.pbrpc.h> #import <RxLibrary/GRXBufferedPipe.h> #import <RxLibrary/GRXWriter+Immediate.h> +#import <grpc/support/log.h> +#import <grpc/grpc.h> #define TEST_TIMEOUT 32 @@ -94,15 +96,6 @@ return 0; } -+ (void)setUp { -#ifdef GRPC_COMPILE_WITH_CRONET - // Cronet setup - [Cronet setHttp2Enabled:YES]; - [Cronet start]; - [GRPCCall useCronetWithEngine:[Cronet getGlobalEngine]]; -#endif -} - - (void)setUp { self.continueAfterFailure = NO; @@ -152,6 +145,59 @@ [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil]; } +// TODO (mxyan): Do the same test for chttp2 +#ifdef GRPC_COMPILE_WITH_CRONET +#ifdef GRPC_CRONET_WITH_PACKET_COALESCING + +static bool coalesced_message_and_eos; + +static void log_processor(gpr_log_func_args *args) { + unsigned long file_len = strlen(args->file); + const char suffix[] = "call.c"; + const int suffix_len = sizeof(suffix) - 1; + const char nops[] = "nops=3"; + + if (file_len > suffix_len && + 0 == strcmp(suffix, &args->file[file_len - suffix_len]) && + strstr(args->message, nops)) { + fprintf(stderr, "%s, %s\n", args->file, args->message); + coalesced_message_and_eos = true; + } +} + +- (void)testPacketCoalescing { + gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG); + grpc_tracer_set_enabled("all", 1); + gpr_set_log_function(log_processor); + coalesced_message_and_eos = false; + + XCTAssertNotNil(self.class.host); + __weak XCTestExpectation *expectation = [self expectationWithDescription:@"LargeUnary"]; + + RMTSimpleRequest *request = [RMTSimpleRequest message]; + request.responseType = RMTPayloadType_Compressable; + request.responseSize = 10; + request.payload.body = [NSMutableData dataWithLength:10]; + + [_service unaryCallWithRequest:request handler:^(RMTSimpleResponse *response, NSError *error) { + XCTAssertNil(error, @"Finished with unexpected error: %@", error); + + RMTSimpleResponse *expectedResponse = [RMTSimpleResponse message]; + expectedResponse.payload.type = RMTPayloadType_Compressable; + expectedResponse.payload.body = [NSMutableData dataWithLength:10]; + XCTAssertEqualObjects(response, expectedResponse); + + XCTAssert(coalesced_message_and_eos); + + [expectation fulfill]; + }]; + + [self waitForExpectationsWithTimeout:16 handler:nil]; +} + +#endif +#endif + - (void)test4MBResponsesAreAccepted { XCTAssertNotNil(self.class.host); __weak XCTestExpectation *expectation = [self expectationWithDescription:@"MaxResponseSize"]; diff --git a/src/objective-c/tests/InteropTestsRemoteWithCronet/InteropTestsRemoteWithCronet.m b/src/objective-c/tests/InteropTestsRemoteWithCronet/InteropTestsRemoteWithCronet.m index fab8ad8d25..793b71a9be 100644 --- a/src/objective-c/tests/InteropTestsRemoteWithCronet/InteropTestsRemoteWithCronet.m +++ b/src/objective-c/tests/InteropTestsRemoteWithCronet/InteropTestsRemoteWithCronet.m @@ -33,6 +33,9 @@ #import <GRPCClient/GRPCCall+Tests.h> +#import <Cronet/Cronet.h> +#import <GRPCClient/GRPCCall+Cronet.h> + #import "InteropTests.h" static NSString * const kRemoteSSLHost = @"grpc-test.sandbox.googleapis.com"; @@ -43,6 +46,14 @@ static NSString * const kRemoteSSLHost = @"grpc-test.sandbox.googleapis.com"; @implementation InteropTestsRemoteWithCronet ++ (void)setUp { + // Cronet setup + [Cronet setHttp2Enabled:YES]; + [Cronet start]; + [GRPCCall useCronetWithEngine:[Cronet getGlobalEngine]]; + [Cronet startNetLogToFile:@"Documents/cronet_netlog.json" logBytes:YES]; +} + + (NSString *)host { return kRemoteSSLHost; } diff --git a/src/objective-c/tests/Podfile b/src/objective-c/tests/Podfile index 5785b976f2..d1ef0886fe 100644 --- a/src/objective-c/tests/Podfile +++ b/src/objective-c/tests/Podfile @@ -92,6 +92,7 @@ post_install do |installer| # GPR_UNREACHABLE_CODE causes "Control may reach end of non-void # function" warning config.build_settings['GCC_WARN_ABOUT_RETURN_TYPE'] = 'NO' + config.build_settings['GCC_PREPROCESSOR_DEFINITIONS'] = '$(inherited) COCOAPODS=1 GRPC_CRONET_WITH_PACKET_COALESCING=1' end end diff --git a/src/objective-c/tests/Tests.xcodeproj/project.pbxproj b/src/objective-c/tests/Tests.xcodeproj/project.pbxproj index c4a6567ae0..8455e71b02 100644 --- a/src/objective-c/tests/Tests.xcodeproj/project.pbxproj +++ b/src/objective-c/tests/Tests.xcodeproj/project.pbxproj @@ -1296,6 +1296,7 @@ "$(inherited)", "GPB_USE_PROTOBUF_FRAMEWORK_IMPORTS=1", "GRPC_COMPILE_WITH_CRONET=1", + "GRPC_CRONET_WITH_PACKET_COALESCING=1", ); INFOPLIST_FILE = InteropTestsRemoteWithCronet/Info.plist; IPHONEOS_DEPLOYMENT_TARGET = 9.3; diff --git a/third_party/objective_c/Cronet/cronet_c_for_grpc.h b/third_party/objective_c/Cronet/cronet_c_for_grpc.h index 15a511aebd..3d58a8370e 100644 --- a/third_party/objective_c/Cronet/cronet_c_for_grpc.h +++ b/third_party/objective_c/Cronet/cronet_c_for_grpc.h @@ -5,6 +5,8 @@ #ifndef COMPONENTS_CRONET_IOS_CRONET_C_FOR_GRPC_H_ #define COMPONENTS_CRONET_IOS_CRONET_C_FOR_GRPC_H_ +#define CRONET_EXPORT __attribute__((visibility("default"))) + #ifdef __cplusplus extern "C" { #endif @@ -15,12 +17,10 @@ extern "C" { /* Opaque object representing Cronet Engine. Created and configured outside * of this API to facilitate sharing with other components */ -typedef struct cronet_engine { void* obj; } cronet_engine; - -void cronet_engine_add_quic_hint(cronet_engine* engine, - const char* host, - int port, - int alternate_port); +typedef struct cronet_engine { + void* obj; + void* annotation; +} cronet_engine; /* Cronet Bidirectional Stream API */ @@ -45,11 +45,12 @@ typedef struct cronet_bidirectional_stream_header_array { /* Set of callbacks used to receive callbacks from bidirectional stream. */ typedef struct cronet_bidirectional_stream_callback { - /* Invoked when request headers are sent. Indicates that stream has initiated - * the request. Consumer may call cronet_bidirectional_stream_write() to start - * writing data. + /* Invoked when the stream is ready for reading and writing. + * Consumer may call cronet_bidirectional_stream_read() to start reading data. + * Consumer may call cronet_bidirectional_stream_write() to start writing + * data. */ - void (*on_request_headers_sent)(cronet_bidirectional_stream* stream); + void (*on_stream_ready)(cronet_bidirectional_stream* stream); /* Invoked when initial response headers are received. * Consumer must call cronet_bidirectional_stream_read() to start reading. @@ -67,20 +68,19 @@ typedef struct cronet_bidirectional_stream_callback { * It may be invoked after on_response_trailers_received()}, if there was * pending read data before trailers were received. * - * If count is 0, it means the remote side has signaled that it will send no - * more data; future calls to cronet_bidirectional_stream_read() will result - * in the on_data_read() callback or on_succeded() callback if + * If |bytes_read| is 0, it means the remote side has signaled that it will + * send no more data; future calls to cronet_bidirectional_stream_read() + * will result in the on_data_read() callback or on_succeded() callback if * cronet_bidirectional_stream_write() was invoked with end_of_stream set to * true. */ void (*on_read_completed)(cronet_bidirectional_stream* stream, char* data, - int count); + int bytes_read); /** * Invoked when all data passed to cronet_bidirectional_stream_write() is - * sent. - * To continue writing, call cronet_bidirectional_stream_write(). + * sent. To continue writing, call cronet_bidirectional_stream_write(). */ void (*on_write_completed)(cronet_bidirectional_stream* stream, const char* data); @@ -117,7 +117,7 @@ typedef struct cronet_bidirectional_stream_callback { void (*on_canceled)(cronet_bidirectional_stream* stream); } cronet_bidirectional_stream_callback; -/* Create a new stream object that uses |engine| and |callback|. All stream +/* Creates a new stream object that uses |engine| and |callback|. All stream * tasks are performed asynchronously on the |engine| network thread. |callback| * methods are invoked synchronously on the |engine| network thread, but must * not run tasks on the current thread to prevent blocking networking operations @@ -129,6 +129,7 @@ typedef struct cronet_bidirectional_stream_callback { * * Both |calback| and |engine| must remain valid until stream is destroyed. */ +CRONET_EXPORT cronet_bidirectional_stream* cronet_bidirectional_stream_create( cronet_engine* engine, void* annotation, @@ -136,15 +137,40 @@ cronet_bidirectional_stream* cronet_bidirectional_stream_create( /* TBD: The following methods return int. Should it be a custom type? */ -/* Destroy stream object. Destroy could be called from any thread, including +/* Destroys stream object. Destroy could be called from any thread, including * network thread, but is posted, so |stream| is valid until calling task is * complete. */ +CRONET_EXPORT int cronet_bidirectional_stream_destroy(cronet_bidirectional_stream* stream); -/* Start the stream by sending request to |url| using |method| and |headers|. If - * |end_of_stream| is true, then no data is expected to be written. +/** + * Disables or enables auto flush. By default, data is flushed after + * every cronet_bidirectional_stream_write(). If the auto flush is disabled, + * the client should explicitly call cronet_bidirectional_stream_flush to flush + * the data. + */ +CRONET_EXPORT void cronet_bidirectional_stream_disable_auto_flush( + cronet_bidirectional_stream* stream, + bool disable_auto_flush); + +/** + * Delays sending request headers until cronet_bidirectional_stream_flush() + * is called. This flag is currently only respected when QUIC is negotiated. + * When true, QUIC will send request header frame along with data frame(s) + * as a single packet when possible. + */ +CRONET_EXPORT +void cronet_bidirectional_stream_delay_request_headers_until_flush( + cronet_bidirectional_stream* stream, + bool delay_headers_until_flush); + +/* Starts the stream by sending request to |url| using |method| and |headers|. + * If |end_of_stream| is true, then no data is expected to be written. The + * |method| is HTTP verb, with PUT having a special meaning to mark idempotent + * request, which could use QUIC 0-RTT. */ +CRONET_EXPORT int cronet_bidirectional_stream_start( cronet_bidirectional_stream* stream, const char* url, @@ -153,46 +179,61 @@ int cronet_bidirectional_stream_start( const cronet_bidirectional_stream_header_array* headers, bool end_of_stream); -/* Read response data into |buffer| of |capacity| length. Must only be called at - * most once in response to each invocation of the - * on_response_headers_received() and on_read_completed() methods of the - * cronet_bidirectional_stream_callback. - * Each call will result in an invocation of one of the callback's - * on_read_completed method if data is read, its on_succeeded() method if - * the stream is closed, or its on_failed() method if there's an error. +/* Reads response data into |buffer| of |capacity| length. Must only be called + * at most once in response to each invocation of the + * on_stream_ready()/on_response_headers_received() and on_read_completed() + * methods of the cronet_bidirectional_stream_callback. + * Each call will result in an invocation of the callback's + * on_read_completed() method if data is read, or its on_failed() method if + * there's an error. The callback's on_succeeded() method is also invoked if + * there is no more data to read and |end_of_stream| was previously sent. */ +CRONET_EXPORT int cronet_bidirectional_stream_read(cronet_bidirectional_stream* stream, char* buffer, int capacity); -/* Read response data into |buffer| of |capacity| length. Must only be called at - * most once in response to each invocation of the - * on_response_headers_received() and on_read_completed() methods of the - * cronet_bidirectional_stream_callback. - * Each call will result in an invocation of one of the callback's - * on_read_completed method if data is read, its on_succeeded() method if - * the stream is closed, or its on_failed() method if there's an error. +/* Writes request data from |buffer| of |buffer_length| length. If auto flush is + * disabled, data will be sent only after cronet_bidirectional_stream_flush() is + * called. + * Each call will result in an invocation the callback's on_write_completed() + * method if data is sent, or its on_failed() method if there's an error. + * The callback's on_succeeded() method is also invoked if |end_of_stream| is + * set and all response data has been read. */ +CRONET_EXPORT int cronet_bidirectional_stream_write(cronet_bidirectional_stream* stream, const char* buffer, - int count, + int buffer_length, bool end_of_stream); +/** + * Flushes pending writes. This method should not be called before invocation of + * on_stream_ready() method of the cronet_bidirectional_stream_callback. + * For each previously called cronet_bidirectional_stream_write() + * a corresponding on_write_completed() callback will be invoked when the buffer + * is sent. + */ +CRONET_EXPORT +void cronet_bidirectional_stream_flush(cronet_bidirectional_stream* stream); + /* Cancels the stream. Can be called at any time after * cronet_bidirectional_stream_start(). The on_canceled() method of * cronet_bidirectional_stream_callback will be invoked when cancelation * is complete and no further callback methods will be invoked. If the * stream has completed or has not started, calling * cronet_bidirectional_stream_cancel() has no effect and on_canceled() will not - * be invoked. At most one callback method may be invoked after + * be invoked. At most one callback method may be invoked after * cronet_bidirectional_stream_cancel() has completed. */ -int cronet_bidirectional_stream_cancel(cronet_bidirectional_stream* stream); +CRONET_EXPORT +void cronet_bidirectional_stream_cancel(cronet_bidirectional_stream* stream); /* Returns true if the |stream| was successfully started and is now done * (succeeded, canceled, or failed). * Returns false if the |stream| stream is not yet started or is in progress. */ +CRONET_EXPORT bool cronet_bidirectional_stream_is_done(cronet_bidirectional_stream* stream); #ifdef __cplusplus |