aboutsummaryrefslogtreecommitdiffhomepage
path: root/test/core/iomgr/ios/CFStreamTests/CFStreamEndpointTests.mm
diff options
context:
space:
mode:
Diffstat (limited to 'test/core/iomgr/ios/CFStreamTests/CFStreamEndpointTests.mm')
-rw-r--r--test/core/iomgr/ios/CFStreamTests/CFStreamEndpointTests.mm344
1 files changed, 344 insertions, 0 deletions
diff --git a/test/core/iomgr/ios/CFStreamTests/CFStreamEndpointTests.mm b/test/core/iomgr/ios/CFStreamTests/CFStreamEndpointTests.mm
new file mode 100644
index 0000000000..fbc34c74d6
--- /dev/null
+++ b/test/core/iomgr/ios/CFStreamTests/CFStreamEndpointTests.mm
@@ -0,0 +1,344 @@
+/*
+ *
+ * Copyright 2018 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#import <XCTest/XCTest.h>
+
+#include "src/core/lib/iomgr/port.h"
+
+#ifdef GRPC_CFSTREAM
+
+#include <netinet/in.h>
+
+#include <grpc/impl/codegen/sync.h>
+#include <grpc/support/sync.h>
+
+#include "src/core/lib/iomgr/endpoint.h"
+#include "src/core/lib/iomgr/resolve_address.h"
+#include "src/core/lib/iomgr/tcp_client.h"
+#include "test/core/util/test_config.h"
+
+static const int kConnectTimeout = 5;
+static const int kWriteTimeout = 5;
+static const int kReadTimeout = 5;
+
+static const int kBufferSize = 10000;
+
+static const int kRunLoopTimeout = 1;
+
+static void set_atm(void *arg, grpc_error *error) {
+ gpr_atm *p = static_cast<gpr_atm *>(arg);
+ gpr_atm_full_cas(p, -1, reinterpret_cast<gpr_atm>(error));
+}
+
+static void init_event_closure(grpc_closure *closure, gpr_atm *atm) {
+ *atm = -1;
+ GRPC_CLOSURE_INIT(closure, set_atm, static_cast<void *>(atm), grpc_schedule_on_exec_ctx);
+}
+
+static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const char *buffer,
+ size_t buffer_len) {
+ if (slices->length != buffer_len) {
+ return false;
+ }
+
+ for (int i = 0; i < slices->count; i++) {
+ grpc_slice slice = slices->slices[i];
+ if (0 != memcmp(buffer, GRPC_SLICE_START_PTR(slice), GRPC_SLICE_LENGTH(slice))) {
+ return false;
+ }
+ buffer += GRPC_SLICE_LENGTH(slice);
+ }
+
+ return true;
+}
+
+@interface CFStreamEndpointTests : XCTestCase
+
+@end
+
+@implementation CFStreamEndpointTests {
+ grpc_endpoint *ep_;
+ int svr_fd_;
+}
+
+- (BOOL)waitForEvent:(gpr_atm *)event timeout:(int)timeout {
+ grpc_core::ExecCtx::Get()->Flush();
+
+ NSDate *deadline = [NSDate dateWithTimeIntervalSinceNow:kConnectTimeout];
+ while (gpr_atm_acq_load(event) == -1 && [deadline timeIntervalSinceNow] > 0) {
+ NSDate *deadline = [NSDate dateWithTimeIntervalSinceNow:kRunLoopTimeout];
+ [[NSRunLoop mainRunLoop] runMode:NSDefaultRunLoopMode beforeDate:deadline];
+ }
+
+ return (gpr_atm_acq_load(event) != -1);
+}
+
++ (void)setUp {
+ grpc_init();
+}
+
++ (void)tearDown {
+ grpc_shutdown();
+}
+
+- (void)setUp {
+ self.continueAfterFailure = NO;
+
+ // Set up CFStream connection before testing the endpoint
+
+ grpc_core::ExecCtx exec_ctx;
+
+ grpc_resolved_address resolved_addr;
+ struct sockaddr_in *addr = reinterpret_cast<struct sockaddr_in *>(resolved_addr.addr);
+ int svr_fd;
+ int r;
+ gpr_atm connected = -1;
+ grpc_closure done;
+
+ gpr_log(GPR_DEBUG, "test_succeeds");
+
+ memset(&resolved_addr, 0, sizeof(resolved_addr));
+ resolved_addr.len = sizeof(struct sockaddr_in);
+ addr->sin_family = AF_INET;
+
+ /* create a dummy server */
+ svr_fd = socket(AF_INET, SOCK_STREAM, 0);
+ XCTAssertGreaterThanOrEqual(svr_fd, 0);
+ XCTAssertEqual(bind(svr_fd, (struct sockaddr *)addr, (socklen_t)resolved_addr.len), 0);
+ XCTAssertEqual(listen(svr_fd, 1), 0);
+
+ /* connect to it */
+ XCTAssertEqual(getsockname(svr_fd, (struct sockaddr *)addr, (socklen_t *)&resolved_addr.len), 0);
+ init_event_closure(&done, &connected);
+ grpc_tcp_client_connect(&done, &ep_, nullptr, nullptr, &resolved_addr, GRPC_MILLIS_INF_FUTURE);
+
+ /* await the connection */
+ do {
+ resolved_addr.len = sizeof(addr);
+ r = accept(svr_fd, reinterpret_cast<struct sockaddr *>(addr),
+ reinterpret_cast<socklen_t *>(&resolved_addr.len));
+ } while (r == -1 && errno == EINTR);
+ XCTAssertGreaterThanOrEqual(r, 0);
+ svr_fd_ = r;
+
+ /* wait for the connection callback to finish */
+ XCTAssertEqual([self waitForEvent:&connected timeout:kConnectTimeout], YES);
+ XCTAssertEqual(reinterpret_cast<grpc_error *>(connected), GRPC_ERROR_NONE);
+}
+
+- (void)tearDown {
+ grpc_core::ExecCtx exec_ctx;
+ close(svr_fd_);
+ grpc_endpoint_destroy(ep_);
+}
+
+- (void)testReadWrite {
+ grpc_core::ExecCtx exec_ctx;
+
+ gpr_atm read;
+ grpc_closure read_done;
+ grpc_slice_buffer read_slices;
+ grpc_slice_buffer read_one_slice;
+ gpr_atm write;
+ grpc_closure write_done;
+ grpc_slice_buffer write_slices;
+
+ grpc_slice slice;
+ char write_buffer[kBufferSize];
+ char read_buffer[kBufferSize];
+ size_t recv_size = 0;
+
+ grpc_slice_buffer_init(&write_slices);
+ slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
+ grpc_slice_buffer_add(&write_slices, slice);
+ init_event_closure(&write_done, &write);
+ grpc_endpoint_write(ep_, &write_slices, &write_done);
+
+ XCTAssertEqual([self waitForEvent:&write timeout:kWriteTimeout], YES);
+ XCTAssertEqual(reinterpret_cast<grpc_error *>(write), GRPC_ERROR_NONE);
+
+ while (recv_size < kBufferSize) {
+ ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
+ XCTAssertGreaterThanOrEqual(size, 0);
+ recv_size += size;
+ }
+
+ XCTAssertEqual(recv_size, kBufferSize);
+ XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0);
+ ssize_t send_size = send(svr_fd_, read_buffer, kBufferSize, 0);
+ XCTAssertGreaterThanOrEqual(send_size, 0);
+
+ grpc_slice_buffer_init(&read_slices);
+ grpc_slice_buffer_init(&read_one_slice);
+ while (read_slices.length < kBufferSize) {
+ init_event_closure(&read_done, &read);
+ grpc_endpoint_read(ep_, &read_one_slice, &read_done);
+ XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], YES);
+ XCTAssertEqual(reinterpret_cast<grpc_error *>(read), GRPC_ERROR_NONE);
+ grpc_slice_buffer_move_into(&read_one_slice, &read_slices);
+ XCTAssertLessThanOrEqual(read_slices.length, kBufferSize);
+ }
+ XCTAssertTrue(compare_slice_buffer_with_buffer(&read_slices, read_buffer, kBufferSize));
+
+ grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE);
+ grpc_slice_buffer_reset_and_unref(&read_slices);
+ grpc_slice_buffer_reset_and_unref(&write_slices);
+ grpc_slice_buffer_reset_and_unref(&read_one_slice);
+}
+
+- (void)testShutdownBeforeRead {
+ grpc_core::ExecCtx exec_ctx;
+
+ gpr_atm read;
+ grpc_closure read_done;
+ grpc_slice_buffer read_slices;
+ gpr_atm write;
+ grpc_closure write_done;
+ grpc_slice_buffer write_slices;
+
+ grpc_slice slice;
+ char write_buffer[kBufferSize];
+ char read_buffer[kBufferSize];
+ size_t recv_size = 0;
+
+ grpc_slice_buffer_init(&read_slices);
+ init_event_closure(&read_done, &read);
+ grpc_endpoint_read(ep_, &read_slices, &read_done);
+
+ grpc_slice_buffer_init(&write_slices);
+ slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
+ grpc_slice_buffer_add(&write_slices, slice);
+ init_event_closure(&write_done, &write);
+ grpc_endpoint_write(ep_, &write_slices, &write_done);
+
+ XCTAssertEqual([self waitForEvent:&write timeout:kWriteTimeout], YES);
+ XCTAssertEqual(reinterpret_cast<grpc_error *>(write), GRPC_ERROR_NONE);
+
+ while (recv_size < kBufferSize) {
+ ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
+ XCTAssertGreaterThanOrEqual(size, 0);
+ recv_size += size;
+ }
+
+ XCTAssertEqual(recv_size, kBufferSize);
+ XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0);
+
+ XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], NO);
+
+ grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE);
+
+ grpc_core::ExecCtx::Get()->Flush();
+ XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], YES);
+ XCTAssertNotEqual(reinterpret_cast<grpc_error *>(read), GRPC_ERROR_NONE);
+
+ grpc_slice_buffer_reset_and_unref(&read_slices);
+ grpc_slice_buffer_reset_and_unref(&write_slices);
+}
+
+- (void)testRemoteClosed {
+ grpc_core::ExecCtx exec_ctx;
+
+ gpr_atm read;
+ grpc_closure read_done;
+ grpc_slice_buffer read_slices;
+ gpr_atm write;
+ grpc_closure write_done;
+ grpc_slice_buffer write_slices;
+
+ grpc_slice slice;
+ char write_buffer[kBufferSize];
+ char read_buffer[kBufferSize];
+ size_t recv_size = 0;
+
+ init_event_closure(&read_done, &read);
+ grpc_slice_buffer_init(&read_slices);
+ grpc_endpoint_read(ep_, &read_slices, &read_done);
+
+ grpc_slice_buffer_init(&write_slices);
+ slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
+ grpc_slice_buffer_add(&write_slices, slice);
+ init_event_closure(&write_done, &write);
+ grpc_endpoint_write(ep_, &write_slices, &write_done);
+
+ XCTAssertEqual([self waitForEvent:&write timeout:kWriteTimeout], YES);
+ XCTAssertEqual(reinterpret_cast<grpc_error *>(write), GRPC_ERROR_NONE);
+
+ while (recv_size < kBufferSize) {
+ ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
+ XCTAssertGreaterThanOrEqual(size, 0);
+ recv_size += size;
+ }
+
+ XCTAssertEqual(recv_size, kBufferSize);
+ XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0);
+
+ close(svr_fd_);
+
+ XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], YES);
+ XCTAssertNotEqual(reinterpret_cast<grpc_error *>(read), GRPC_ERROR_NONE);
+
+ grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE);
+ grpc_slice_buffer_reset_and_unref(&read_slices);
+ grpc_slice_buffer_reset_and_unref(&write_slices);
+}
+
+- (void)testRemoteReset {
+ grpc_core::ExecCtx exec_ctx;
+
+ gpr_atm read;
+ grpc_closure read_done;
+ grpc_slice_buffer read_slices;
+
+ init_event_closure(&read_done, &read);
+ grpc_slice_buffer_init(&read_slices);
+ grpc_endpoint_read(ep_, &read_slices, &read_done);
+
+ struct linger so_linger;
+ so_linger.l_onoff = 1;
+ so_linger.l_linger = 0;
+ setsockopt(svr_fd_, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger));
+
+ close(svr_fd_);
+
+ XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], YES);
+ XCTAssertNotEqual(reinterpret_cast<grpc_error *>(read), GRPC_ERROR_NONE);
+
+ grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE);
+ grpc_slice_buffer_reset_and_unref(&read_slices);
+}
+
+@end
+
+#else // GRPC_CFSTREAM
+
+// Dummy test suite
+@interface CFStreamEndpointTests : XCTestCase
+@end
+
+@implementation CFStreamEndpointTests
+- (void)setUp {
+ [super setUp];
+}
+
+- (void)tearDown {
+ [super tearDown];
+}
+
+@end
+
+#endif // GRPC_CFSTREAM