The JUCE cross-platform C++ framework, with DISTRHO/KXStudio specific changes
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

404 lines
11KB

  1. /*
  2. ==============================================================================
  3. This file is part of the JUCE library - "Jules' Utility Class Extensions"
  4. Copyright 2004-11 by Raw Material Software Ltd.
  5. ------------------------------------------------------------------------------
  6. JUCE can be redistributed and/or modified under the terms of the GNU General
  7. Public License (Version 2), as published by the Free Software Foundation.
  8. A copy of the license is included in the JUCE distribution, or can be found
  9. online at www.gnu.org/licenses.
  10. JUCE is distributed in the hope that it will be useful, but WITHOUT ANY
  11. WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
  12. A PARTICULAR PURPOSE. See the GNU General Public License for more details.
  13. ------------------------------------------------------------------------------
  14. To release a closed-source product which uses JUCE, commercial licenses are
  15. available: visit www.rawmaterialsoftware.com/juce for more information.
  16. ==============================================================================
  17. */
  18. ThreadPoolJob::ThreadPoolJob (const String& name)
  19. : jobName (name),
  20. pool (nullptr),
  21. shouldStop (false),
  22. isActive (false),
  23. shouldBeDeleted (false)
  24. {
  25. }
  26. ThreadPoolJob::~ThreadPoolJob()
  27. {
  28. // you mustn't delete a job while it's still in a pool! Use ThreadPool::removeJob()
  29. // to remove it first!
  30. jassert (pool == nullptr || ! pool->contains (this));
  31. }
  32. String ThreadPoolJob::getJobName() const
  33. {
  34. return jobName;
  35. }
  36. void ThreadPoolJob::setJobName (const String& newName)
  37. {
  38. jobName = newName;
  39. }
  40. void ThreadPoolJob::signalJobShouldExit()
  41. {
  42. shouldStop = true;
  43. }
  44. //==============================================================================
  45. class ThreadPool::ThreadPoolThread : public Thread
  46. {
  47. public:
  48. ThreadPoolThread (ThreadPool& pool_)
  49. : Thread ("Pool"),
  50. pool (pool_),
  51. busy (false)
  52. {
  53. }
  54. void run()
  55. {
  56. while (! threadShouldExit())
  57. {
  58. if (! pool.runNextJob())
  59. wait (500);
  60. }
  61. }
  62. private:
  63. ThreadPool& pool;
  64. bool volatile busy;
  65. JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ThreadPoolThread);
  66. };
  67. //==============================================================================
  68. ThreadPool::ThreadPool (const int numThreads,
  69. const bool startThreadsOnlyWhenNeeded,
  70. const int stopThreadsWhenNotUsedTimeoutMs)
  71. : threadStopTimeout (stopThreadsWhenNotUsedTimeoutMs),
  72. priority (5)
  73. {
  74. jassert (numThreads > 0); // not much point having one of these with no threads in it.
  75. for (int i = jmax (1, numThreads); --i >= 0;)
  76. threads.add (new ThreadPoolThread (*this));
  77. if (! startThreadsOnlyWhenNeeded)
  78. for (int i = threads.size(); --i >= 0;)
  79. threads.getUnchecked(i)->startThread (priority);
  80. }
  81. ThreadPool::~ThreadPool()
  82. {
  83. removeAllJobs (true, 4000);
  84. int i;
  85. for (i = threads.size(); --i >= 0;)
  86. threads.getUnchecked(i)->signalThreadShouldExit();
  87. for (i = threads.size(); --i >= 0;)
  88. threads.getUnchecked(i)->stopThread (500);
  89. }
  90. void ThreadPool::addJob (ThreadPoolJob* const job)
  91. {
  92. jassert (job != nullptr);
  93. jassert (job->pool == nullptr);
  94. if (job->pool == nullptr)
  95. {
  96. job->pool = this;
  97. job->shouldStop = false;
  98. job->isActive = false;
  99. {
  100. const ScopedLock sl (lock);
  101. jobs.add (job);
  102. int numRunning = 0;
  103. for (int i = threads.size(); --i >= 0;)
  104. if (threads.getUnchecked(i)->isThreadRunning() && ! threads.getUnchecked(i)->threadShouldExit())
  105. ++numRunning;
  106. if (numRunning < threads.size())
  107. {
  108. bool startedOne = false;
  109. int n = 1000;
  110. while (--n >= 0 && ! startedOne)
  111. {
  112. for (int i = threads.size(); --i >= 0;)
  113. {
  114. if (! threads.getUnchecked(i)->isThreadRunning())
  115. {
  116. threads.getUnchecked(i)->startThread (priority);
  117. startedOne = true;
  118. break;
  119. }
  120. }
  121. if (! startedOne)
  122. Thread::sleep (2);
  123. }
  124. }
  125. }
  126. for (int i = threads.size(); --i >= 0;)
  127. threads.getUnchecked(i)->notify();
  128. }
  129. }
  130. int ThreadPool::getNumJobs() const
  131. {
  132. return jobs.size();
  133. }
  134. ThreadPoolJob* ThreadPool::getJob (const int index) const
  135. {
  136. const ScopedLock sl (lock);
  137. return jobs [index];
  138. }
  139. bool ThreadPool::contains (const ThreadPoolJob* const job) const
  140. {
  141. const ScopedLock sl (lock);
  142. return jobs.contains (const_cast <ThreadPoolJob*> (job));
  143. }
  144. bool ThreadPool::isJobRunning (const ThreadPoolJob* const job) const
  145. {
  146. const ScopedLock sl (lock);
  147. return jobs.contains (const_cast <ThreadPoolJob*> (job)) && job->isActive;
  148. }
  149. bool ThreadPool::waitForJobToFinish (const ThreadPoolJob* const job,
  150. const int timeOutMs) const
  151. {
  152. if (job != nullptr)
  153. {
  154. const uint32 start = Time::getMillisecondCounter();
  155. while (contains (job))
  156. {
  157. if (timeOutMs >= 0 && Time::getMillisecondCounter() >= start + timeOutMs)
  158. return false;
  159. jobFinishedSignal.wait (2);
  160. }
  161. }
  162. return true;
  163. }
  164. bool ThreadPool::removeJob (ThreadPoolJob* const job,
  165. const bool interruptIfRunning,
  166. const int timeOutMs)
  167. {
  168. bool dontWait = true;
  169. if (job != nullptr)
  170. {
  171. const ScopedLock sl (lock);
  172. if (jobs.contains (job))
  173. {
  174. if (job->isActive)
  175. {
  176. if (interruptIfRunning)
  177. job->signalJobShouldExit();
  178. dontWait = false;
  179. }
  180. else
  181. {
  182. jobs.removeValue (job);
  183. job->pool = nullptr;
  184. }
  185. }
  186. }
  187. return dontWait || waitForJobToFinish (job, timeOutMs);
  188. }
  189. bool ThreadPool::removeAllJobs (const bool interruptRunningJobs,
  190. const int timeOutMs,
  191. const bool deleteInactiveJobs,
  192. ThreadPool::JobSelector* selectedJobsToRemove)
  193. {
  194. Array <ThreadPoolJob*> jobsToWaitFor;
  195. {
  196. const ScopedLock sl (lock);
  197. for (int i = jobs.size(); --i >= 0;)
  198. {
  199. ThreadPoolJob* const job = jobs.getUnchecked(i);
  200. if (selectedJobsToRemove == nullptr || selectedJobsToRemove->isJobSuitable (job))
  201. {
  202. if (job->isActive)
  203. {
  204. jobsToWaitFor.add (job);
  205. if (interruptRunningJobs)
  206. job->signalJobShouldExit();
  207. }
  208. else
  209. {
  210. jobs.remove (i);
  211. if (deleteInactiveJobs)
  212. delete job;
  213. else
  214. job->pool = nullptr;
  215. }
  216. }
  217. }
  218. }
  219. const uint32 start = Time::getMillisecondCounter();
  220. for (;;)
  221. {
  222. for (int i = jobsToWaitFor.size(); --i >= 0;)
  223. if (! isJobRunning (jobsToWaitFor.getUnchecked (i)))
  224. jobsToWaitFor.remove (i);
  225. if (jobsToWaitFor.size() == 0)
  226. break;
  227. if (timeOutMs >= 0 && Time::getMillisecondCounter() >= start + timeOutMs)
  228. return false;
  229. jobFinishedSignal.wait (20);
  230. }
  231. return true;
  232. }
  233. StringArray ThreadPool::getNamesOfAllJobs (const bool onlyReturnActiveJobs) const
  234. {
  235. StringArray s;
  236. const ScopedLock sl (lock);
  237. for (int i = 0; i < jobs.size(); ++i)
  238. {
  239. const ThreadPoolJob* const job = jobs.getUnchecked(i);
  240. if (job->isActive || ! onlyReturnActiveJobs)
  241. s.add (job->getJobName());
  242. }
  243. return s;
  244. }
  245. bool ThreadPool::setThreadPriorities (const int newPriority)
  246. {
  247. bool ok = true;
  248. if (priority != newPriority)
  249. {
  250. priority = newPriority;
  251. for (int i = threads.size(); --i >= 0;)
  252. if (! threads.getUnchecked(i)->setPriority (newPriority))
  253. ok = false;
  254. }
  255. return ok;
  256. }
  257. bool ThreadPool::runNextJob()
  258. {
  259. ThreadPoolJob* job = nullptr;
  260. {
  261. const ScopedLock sl (lock);
  262. for (int i = 0; i < jobs.size(); ++i)
  263. {
  264. job = jobs[i];
  265. if (job != nullptr && ! (job->isActive || job->shouldStop))
  266. break;
  267. job = nullptr;
  268. }
  269. if (job != nullptr)
  270. job->isActive = true;
  271. }
  272. if (job != nullptr)
  273. {
  274. JUCE_TRY
  275. {
  276. ThreadPoolJob::JobStatus result = job->runJob();
  277. lastJobEndTime = Time::getApproximateMillisecondCounter();
  278. const ScopedLock sl (lock);
  279. if (jobs.contains (job))
  280. {
  281. job->isActive = false;
  282. if (result != ThreadPoolJob::jobNeedsRunningAgain || job->shouldStop)
  283. {
  284. job->pool = nullptr;
  285. job->shouldStop = true;
  286. jobs.removeValue (job);
  287. if (result == ThreadPoolJob::jobHasFinishedAndShouldBeDeleted)
  288. delete job;
  289. jobFinishedSignal.signal();
  290. }
  291. else
  292. {
  293. // move the job to the end of the queue if it wants another go
  294. jobs.move (jobs.indexOf (job), -1);
  295. }
  296. }
  297. }
  298. #if JUCE_CATCH_UNHANDLED_EXCEPTIONS
  299. catch (...)
  300. {
  301. const ScopedLock sl (lock);
  302. jobs.removeValue (job);
  303. }
  304. #endif
  305. }
  306. else
  307. {
  308. if (threadStopTimeout > 0
  309. && Time::getApproximateMillisecondCounter() > lastJobEndTime + threadStopTimeout)
  310. {
  311. const ScopedLock sl (lock);
  312. if (jobs.size() == 0)
  313. for (int i = threads.size(); --i >= 0;)
  314. threads.getUnchecked(i)->signalThreadShouldExit();
  315. }
  316. else
  317. {
  318. return false;
  319. }
  320. }
  321. return true;
  322. }