aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/cpp/thread_manager/thread_manager.h
blob: 6f0bd17c5fe10c82f091d86540a06dbd818efe39 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
/*
 *
 * Copyright 2016 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#ifndef GRPC_INTERNAL_CPP_THREAD_MANAGER_H
#define GRPC_INTERNAL_CPP_THREAD_MANAGER_H

#include <condition_variable>
#include <list>
#include <memory>
#include <mutex>

#include <grpcpp/support/config.h>

#include "src/core/lib/gprpp/thd.h"
#include "src/core/lib/iomgr/resource_quota.h"

namespace grpc {

class ThreadManager {
 public:
  explicit ThreadManager(const char* name, grpc_resource_quota* resource_quota,
                         int min_pollers, int max_pollers);
  virtual ~ThreadManager();

  // Initializes and Starts the Rpc Manager threads
  void Initialize();

  // The return type of PollForWork() function
  enum WorkStatus { WORK_FOUND, SHUTDOWN, TIMEOUT };

  // "Polls" for new work.
  // If the return value is WORK_FOUND:
  //  - The implementaion of PollForWork() MAY set some opaque identifier to
  //    (identify the work item found) via the '*tag' parameter
  //  - The implementaion MUST set the value of 'ok' to 'true' or 'false'. A
  //    value of 'false' indicates some implemenation specific error (that is
  //    neither SHUTDOWN nor TIMEOUT)
  //  - ThreadManager does not interpret the values of 'tag' and 'ok'
  //  - ThreadManager WILL call DoWork() and pass '*tag' and 'ok' as input to
  //    DoWork()
  //
  // If the return value is SHUTDOWN:,
  //  - ThreadManager WILL NOT call DoWork() and terminates the thead
  //
  // If the return value is TIMEOUT:,
  //  - ThreadManager WILL NOT call DoWork()
  //  - ThreadManager MAY terminate the thread depending on the current number
  //    of active poller threads and mix_pollers/max_pollers settings
  //  - Also, the value of timeout is specific to the derived class
  //    implementation
  virtual WorkStatus PollForWork(void** tag, bool* ok) = 0;

  // The implementation of DoWork() is supposed to perform the work found by
  // PollForWork(). The tag and ok parameters are the same as returned by
  // PollForWork(). The resources parameter indicates that the call actually
  // has the resources available for performing the RPC's work. If it doesn't,
  // the implementation should fail it appropriately.
  //
  // The implementation of DoWork() should also do any setup needed to ensure
  // that the next call to PollForWork() (not necessarily by the current thread)
  // actually finds some work
  virtual void DoWork(void* tag, bool ok, bool resources) = 0;

  // Mark the ThreadManager as shutdown and begin draining the work. This is a
  // non-blocking call and the caller should call Wait(), a blocking call which
  // returns only once the shutdown is complete
  virtual void Shutdown();

  // Has Shutdown() been called
  bool IsShutdown();

  // A blocking call that returns only after the ThreadManager has shutdown and
  // all the threads have drained all the outstanding work
  virtual void Wait();

  // Max number of concurrent threads that were ever active in this thread
  // manager so far. This is useful for debugging purposes (and in unit tests)
  // to check if resource_quota is properly being enforced.
  int GetMaxActiveThreadsSoFar();

 private:
  // Helper wrapper class around grpc_core::Thread. Takes a ThreadManager object
  // and starts a new grpc_core::Thread to calls the Run() function.
  //
  // The Run() function calls ThreadManager::MainWorkLoop() function and once
  // that completes, it marks the WorkerThread completed by calling
  // ThreadManager::MarkAsCompleted()
  //
  // WHY IS THIS NEEDED?:
  // When a thread terminates, some other thread *must* call Join() on that
  // thread so that the resources are released. Having a WorkerThread wrapper
  // will make this easier. Once Run() completes, each thread calls the
  // following two functions:
  //    ThreadManager::CleanupCompletedThreads()
  //    ThreadManager::MarkAsCompleted()
  //
  //  - MarkAsCompleted() puts the WorkerThread object in the ThreadManger's
  //    completed_threads_ list
  //  - CleanupCompletedThreads() calls "Join()" on the threads that are already
  //    in the completed_threads_ list  (since a thread cannot call Join() on
  //    itself, it calls CleanupCompletedThreads() *before* calling
  //    MarkAsCompleted())
  //
  // TODO(sreek): Consider creating the threads 'detached' so that Join() need
  // not be called (and the need for this WorkerThread class is eliminated)
  class WorkerThread {
   public:
    WorkerThread(ThreadManager* thd_mgr);
    ~WorkerThread();

   private:
    // Calls thd_mgr_->MainWorkLoop() and once that completes, calls
    // thd_mgr_>MarkAsCompleted(this) to mark the thread as completed
    void Run();

    ThreadManager* const thd_mgr_;
    grpc_core::Thread thd_;
  };

  // The main funtion in ThreadManager
  void MainWorkLoop();

  void MarkAsCompleted(WorkerThread* thd);
  void CleanupCompletedThreads();

  // Protects shutdown_, num_pollers_, num_threads_ and
  // max_active_threads_sofar_
  std::mutex mu_;

  bool shutdown_;
  std::condition_variable shutdown_cv_;

  // The resource user object to use when requesting quota to create threads
  //
  // Note: The user of this ThreadManager object must create grpc_resource_quota
  // object (that contains the actual max thread quota) and a grpc_resource_user
  // object through which quota is requested whenver new threads need to be
  // created
  grpc_resource_user* resource_user_;

  // Number of threads doing polling
  int num_pollers_;

  // The minimum and maximum number of threads that should be doing polling
  int min_pollers_;
  int max_pollers_;

  // The total number of threads currently active (includes threads includes the
  // threads that are currently polling i.e num_pollers_)
  int num_threads_;

  // See GetMaxActiveThreadsSoFar()'s description.
  // To be more specific, this variable tracks the max value num_threads_ was
  // ever set so far
  int max_active_threads_sofar_;

  std::mutex list_mu_;
  std::list<WorkerThread*> completed_threads_;
};

}  // namespace grpc

#endif  // GRPC_INTERNAL_CPP_THREAD_MANAGER_H