aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Michael Lumish <mlumish@google.com>2015-05-19 10:30:16 -0700
committerGravatar Michael Lumish <mlumish@google.com>2015-05-19 10:30:16 -0700
commit9a688dfb06226b85bd6781427b6c164e72784798 (patch)
treed673026a30cdd4f735e7e0edbf931ebcb1aea946
parent626b4d15bd73707b9cd759067b1346ab2bd81ec8 (diff)
parent42e47b462ea4c45496e2367e3bfc168ab433600b (diff)
Merge pull request #1647 from jcanizales/buffered-pipe-and-ping-pong-test
Adds GRXBufferedPipe and the PingPong interop test using it
-rw-r--r--src/objective-c/RxLibrary/GRXBufferedPipe.h59
-rw-r--r--src/objective-c/RxLibrary/GRXBufferedPipe.m146
-rw-r--r--src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m98
3 files changed, 292 insertions, 11 deletions
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.h b/src/objective-c/RxLibrary/GRXBufferedPipe.h
new file mode 100644
index 0000000000..4147362ba4
--- /dev/null
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.h
@@ -0,0 +1,59 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#import <Foundation/Foundation.h>
+
+#import "GRXWriteable.h"
+#import "GRXWriter.h"
+
+// A buffered pipe is a Writeable that also acts as a Writer (to whichever other writeable is passed
+// to -startWithWriteable:).
+// Once it is started, whatever values are written into it (via -didReceiveValue:) will be
+// propagated immediately, unless flow control prevents it.
+// If it is throttled and keeps receiving values, as well as if it receives values before being
+// started, it will buffer them and propagate them in order as soon as its state becomes
+// GRXWriterStateStarted.
+// If it receives an error (via -didFinishWithError:), it will drop any buffered values and
+// propagate the error immediately.
+//
+// Beware that a pipe of this type can't prevent receiving more values when it is paused (for
+// example if used to write data to a congested network connection). Because in such situations the
+// pipe will keep buffering all data written to it, your application could run out of memory and
+// crash. If you want to react to flow control signals to prevent that, instead of using this class
+// you can implement an object that conforms to GRXWriter.
+@interface GRXBufferedPipe : NSObject<GRXWriteable, GRXWriter>
+
+// Convenience constructor.
++ (instancetype)pipe;
+
+@end
diff --git a/src/objective-c/RxLibrary/GRXBufferedPipe.m b/src/objective-c/RxLibrary/GRXBufferedPipe.m
new file mode 100644
index 0000000000..3ef470f89f
--- /dev/null
+++ b/src/objective-c/RxLibrary/GRXBufferedPipe.m
@@ -0,0 +1,146 @@
+/*
+ *
+ * Copyright 2015, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#import "GRXBufferedPipe.h"
+
+@implementation GRXBufferedPipe {
+ id<GRXWriteable> _writeable;
+ NSMutableArray *_queue;
+ BOOL _inputIsFinished;
+ NSError *_errorOrNil;
+}
+
+@synthesize state = _state;
+
++ (instancetype)pipe {
+ return [[self alloc] init];
+}
+
+- (instancetype)init {
+ if (self = [super init]) {
+ _queue = [NSMutableArray array];
+ _state = GRXWriterStateNotStarted;
+ }
+ return self;
+}
+
+- (id)popValue {
+ id value = _queue[0];
+ [_queue removeObjectAtIndex:0];
+ return value;
+}
+
+- (void)writeBufferUntilPausedOrStopped {
+ while (_state == GRXWriterStateStarted && _queue.count > 0) {
+ [_writeable didReceiveValue:[self popValue]];
+ }
+ if (_inputIsFinished && _queue.count == 0) {
+ // Our writer finished normally while we were paused or not-started-yet.
+ [self finishWithError:_errorOrNil];
+ }
+}
+
+#pragma mark GRXWriteable implementation
+
+// Returns whether events can be simply propagated to the other end of the pipe.
+- (BOOL)shouldFastForward {
+ return _state == GRXWriterStateStarted && _queue.count == 0;
+}
+
+- (void)didReceiveValue:(id)value {
+ if (self.shouldFastForward) {
+ // Skip the queue.
+ [_writeable didReceiveValue:value];
+ } else {
+ // Even if we're paused and with enqueued values, we can't excert back-pressure to our writer.
+ // So just buffer the new value.
+ // We need a copy, so that it doesn't mutate before it's written at the other end of the pipe.
+ if ([value respondsToSelector:@selector(copy)]) {
+ value = [value copy];
+ }
+ [_queue addObject:value];
+ }
+}
+
+- (void)didFinishWithError:(NSError *)errorOrNil {
+ _inputIsFinished = YES;
+ _errorOrNil = errorOrNil;
+ if (errorOrNil || self.shouldFastForward) {
+ // No need to write pending values.
+ [self finishWithError:_errorOrNil];
+ }
+}
+
+#pragma mark GRXWriter implementation
+
+- (void)setState:(GRXWriterState)newState {
+ // Manual transitions are only allowed from the started or paused states.
+ if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
+ return;
+ }
+
+ switch (newState) {
+ case GRXWriterStateFinished:
+ _state = newState;
+ _queue = nil;
+ // Per GRXWriter's contract, setting the state to Finished manually means one doesn't wish the
+ // writeable to be messaged anymore.
+ _writeable = nil;
+ return;
+ case GRXWriterStatePaused:
+ _state = newState;
+ return;
+ case GRXWriterStateStarted:
+ if (_state == GRXWriterStatePaused) {
+ _state = newState;
+ [self writeBufferUntilPausedOrStopped];
+ }
+ return;
+ case GRXWriterStateNotStarted:
+ return;
+ }
+}
+
+- (void)startWithWriteable:(id<GRXWriteable>)writeable {
+ _state = GRXWriterStateStarted;
+ _writeable = writeable;
+ [self writeBufferUntilPausedOrStopped];
+}
+
+- (void)finishWithError:(NSError *)errorOrNil {
+ id<GRXWriteable> writeable = _writeable;
+ self.state = GRXWriterStateFinished;
+ [writeable didFinishWithError:errorOrNil];
+}
+
+@end
diff --git a/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m b/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m
index f489229834..7cebc0c2a7 100644
--- a/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m
+++ b/src/objective-c/examples/Sample/SampleTests/RemoteProtoTests.m
@@ -36,13 +36,46 @@
#import <UIKit/UIKit.h>
#import <XCTest/XCTest.h>
-#import <gRPC/ProtoRPC.h>
#import <gRPC/GRXWriter+Immediate.h>
+#import <gRPC/GRXBufferedPipe.h>
+#import <gRPC/ProtoRPC.h>
#import <RemoteTest/Empty.pbobjc.h>
#import <RemoteTest/Messages.pbobjc.h>
#import <RemoteTest/Test.pbobjc.h>
#import <RemoteTest/Test.pbrpc.h>
+// Convenience constructors for the generated proto messages:
+
+@interface RMTStreamingOutputCallRequest (Constructors)
++ (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize
+ requestedResponseSize:(NSNumber *)responseSize;
+@end
+
+@implementation RMTStreamingOutputCallRequest (Constructors)
++ (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize
+ requestedResponseSize:(NSNumber *)responseSize {
+ RMTStreamingOutputCallRequest *request = [self message];
+ RMTResponseParameters *parameters = [RMTResponseParameters message];
+ parameters.size = responseSize.integerValue;
+ [request.responseParametersArray addObject:parameters];
+ request.payload.body = [NSMutableData dataWithLength:payloadSize.unsignedIntegerValue];
+ return request;
+}
+@end
+
+@interface RMTStreamingOutputCallResponse (Constructors)
++ (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize;
+@end
+
+@implementation RMTStreamingOutputCallResponse (Constructors)
++ (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize {
+ RMTStreamingOutputCallResponse * response = [self message];
+ response.payload.type = RMTPayloadType_Compressable;
+ response.payload.body = [NSMutableData dataWithLength:payloadSize.unsignedIntegerValue];
+ return response;
+}
+@end
+
@interface RemoteProtoTests : XCTestCase
@end
@@ -70,7 +103,7 @@
[expectation fulfill];
}];
- [self waitForExpectationsWithTimeout:2. handler:nil];
+ [self waitForExpectationsWithTimeout:2 handler:nil];
}
- (void)testLargeUnaryRPC {
@@ -92,7 +125,7 @@
[expectation fulfill];
}];
- [self waitForExpectationsWithTimeout:4. handler:nil];
+ [self waitForExpectationsWithTimeout:4 handler:nil];
}
- (void)testClientStreamingRPC {
@@ -124,7 +157,7 @@
[expectation fulfill];
}];
- [self waitForExpectationsWithTimeout:4. handler:nil];
+ [self waitForExpectationsWithTimeout:4 handler:nil];
}
- (void)testServerStreamingRPC {
@@ -149,10 +182,7 @@
if (response) {
XCTAssertLessThan(index, 4, @"More than 4 responses received.");
- RMTStreamingOutputCallResponse * expected = [RMTStreamingOutputCallResponse message];
- expected.payload.type = RMTPayloadType_Compressable;
- int expectedSize = [expectedSizes[index] unsignedIntegerValue];
- expected.payload.body = [NSMutableData dataWithLength:expectedSize];
+ id expected = [RMTStreamingOutputCallResponse messageWithPayloadSize:expectedSizes[index]];
XCTAssertEqualObjects(response, expected);
index += 1;
}
@@ -166,6 +196,49 @@
[self waitForExpectationsWithTimeout:4 handler:nil];
}
+- (void)testPingPongRPC {
+ __weak XCTestExpectation *expectation = [self expectationWithDescription:@"PingPong"];
+
+ NSArray *requests = @[@27182, @8, @1828, @45904];
+ NSArray *responses = @[@31415, @9, @2653, @58979];
+
+ GRXBufferedPipe *requestsBuffer = [[GRXBufferedPipe alloc] init];
+
+ __block int index = 0;
+
+ id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
+ requestedResponseSize:responses[index]];
+ [requestsBuffer didReceiveValue:request];
+
+ [_service fullDuplexCallWithRequestsWriter:requestsBuffer
+ handler:^(BOOL done,
+ RMTStreamingOutputCallResponse *response,
+ NSError *error) {
+ XCTAssertNil(error, @"Finished with unexpected error: %@", error);
+ XCTAssertTrue(done || response, @"Event handler called without an event.");
+
+ if (response) {
+ XCTAssertLessThan(index, 4, @"More than 4 responses received.");
+ id expected = [RMTStreamingOutputCallResponse messageWithPayloadSize:responses[index]];
+ XCTAssertEqualObjects(response, expected);
+ index += 1;
+ if (index < 4) {
+ id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
+ requestedResponseSize:responses[index]];
+ [requestsBuffer didReceiveValue:request];
+ } else {
+ [requestsBuffer didFinishWithError:nil];
+ }
+ }
+
+ if (done) {
+ XCTAssertEqual(index, 4, @"Received %i responses instead of 4.", index);
+ [expectation fulfill];
+ }
+ }];
+ [self waitForExpectationsWithTimeout:2 handler:nil];
+}
+
- (void)testEmptyStreamRPC {
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"EmptyStream"];
[_service fullDuplexCallWithRequestsWriter:[GRXWriter emptyWriter]
@@ -176,13 +249,16 @@
XCTAssert(done, @"Unexpected response: %@", response);
[expectation fulfill];
}];
- [self waitForExpectationsWithTimeout:4 handler:nil];
+ [self waitForExpectationsWithTimeout:2 handler:nil];
}
- (void)testCancelAfterBeginRPC {
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"CancelAfterBegin"];
- // TODO(mlumish): change to writing that blocks instead of writing
- ProtoRPC *call = [_service RPCToStreamingInputCallWithRequestsWriter:[GRXWriter emptyWriter]
+
+ // A buffered pipe to which we never write any value acts as a writer that just hangs.
+ GRXBufferedPipe *requestsBuffer = [[GRXBufferedPipe alloc] init];
+
+ ProtoRPC *call = [_service RPCToStreamingInputCallWithRequestsWriter:requestsBuffer
handler:^(RMTStreamingInputCallResponse *response,
NSError *error) {
XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED);