1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
/*
*
* Copyright 2016 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*is % allowed in string
*/
#include <inttypes.h>
#include <ctime>
#include <memory>
#include <string>
#include <gflags/gflags.h>
#include <grpc/support/log.h>
#include <grpc/support/port_platform.h>
#include <grpcpp/grpcpp.h>
#include "src/cpp/thread_manager/thread_manager.h"
#include "test/cpp/util/test_config.h"
namespace grpc {
class ThreadManagerTest final : public grpc::ThreadManager {
public:
ThreadManagerTest()
: ThreadManager(kMinPollers, kMaxPollers),
num_do_work_(0),
num_poll_for_work_(0),
num_work_found_(0) {}
grpc::ThreadManager::WorkStatus PollForWork(void** tag, bool* ok) override;
void DoWork(void* tag, bool ok) override;
void PerformTest();
private:
void SleepForMs(int sleep_time_ms);
static const int kMinPollers = 2;
static const int kMaxPollers = 10;
static const int kPollingTimeoutMsec = 10;
static const int kDoWorkDurationMsec = 1;
// PollForWork will return SHUTDOWN after these many number of invocations
static const int kMaxNumPollForWork = 50;
gpr_atm num_do_work_; // Number of calls to DoWork
gpr_atm num_poll_for_work_; // Number of calls to PollForWork
gpr_atm num_work_found_; // Number of times WORK_FOUND was returned
};
void ThreadManagerTest::SleepForMs(int duration_ms) {
gpr_timespec sleep_time =
gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
gpr_time_from_millis(duration_ms, GPR_TIMESPAN));
gpr_sleep_until(sleep_time);
}
grpc::ThreadManager::WorkStatus ThreadManagerTest::PollForWork(void** tag,
bool* ok) {
int call_num = gpr_atm_no_barrier_fetch_add(&num_poll_for_work_, 1);
if (call_num >= kMaxNumPollForWork) {
Shutdown();
return SHUTDOWN;
}
// Simulate "polling for work" by sleeping for sometime
SleepForMs(kPollingTimeoutMsec);
*tag = nullptr;
*ok = true;
// Return timeout roughly 1 out of every 3 calls
if (call_num % 3 == 0) {
return TIMEOUT;
} else {
gpr_atm_no_barrier_fetch_add(&num_work_found_, 1);
return WORK_FOUND;
}
}
void ThreadManagerTest::DoWork(void* tag, bool ok) {
gpr_atm_no_barrier_fetch_add(&num_do_work_, 1);
SleepForMs(kDoWorkDurationMsec); // Simulate doing work by sleeping
}
void ThreadManagerTest::PerformTest() {
// Initialize() starts the ThreadManager
Initialize();
// Wait for all the threads to gracefully terminate
Wait();
// The number of times DoWork() was called is equal to the number of times
// WORK_FOUND was returned
gpr_log(GPR_DEBUG, "DoWork() called %" PRIdPTR " times",
gpr_atm_no_barrier_load(&num_do_work_));
GPR_ASSERT(gpr_atm_no_barrier_load(&num_do_work_) ==
gpr_atm_no_barrier_load(&num_work_found_));
}
} // namespace grpc
int main(int argc, char** argv) {
std::srand(std::time(nullptr));
grpc::testing::InitTest(&argc, &argv, true);
grpc::ThreadManagerTest test_rpc_manager;
test_rpc_manager.PerformTest();
return 0;
}
|