The JUCE cross-platform C++ framework, with DISTRHO/KXStudio specific changes
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

378 lines
9.5KB

  1. /*
  2. ==============================================================================
  3. This file is part of the JUCE library - "Jules' Utility Class Extensions"
  4. Copyright 2004-11 by Raw Material Software Ltd.
  5. ------------------------------------------------------------------------------
  6. JUCE can be redistributed and/or modified under the terms of the GNU General
  7. Public License (Version 2), as published by the Free Software Foundation.
  8. A copy of the license is included in the JUCE distribution, or can be found
  9. online at www.gnu.org/licenses.
  10. JUCE is distributed in the hope that it will be useful, but WITHOUT ANY
  11. WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
  12. A PARTICULAR PURPOSE. See the GNU General Public License for more details.
  13. ------------------------------------------------------------------------------
  14. To release a closed-source product which uses JUCE, commercial licenses are
  15. available: visit www.rawmaterialsoftware.com/juce for more information.
  16. ==============================================================================
  17. */
  18. ThreadPoolJob::ThreadPoolJob (const String& name)
  19. : jobName (name),
  20. pool (nullptr),
  21. shouldStop (false),
  22. isActive (false),
  23. shouldBeDeleted (false)
  24. {
  25. }
  26. ThreadPoolJob::~ThreadPoolJob()
  27. {
  28. // you mustn't delete a job while it's still in a pool! Use ThreadPool::removeJob()
  29. // to remove it first!
  30. jassert (pool == nullptr || ! pool->contains (this));
  31. }
  32. String ThreadPoolJob::getJobName() const
  33. {
  34. return jobName;
  35. }
  36. void ThreadPoolJob::setJobName (const String& newName)
  37. {
  38. jobName = newName;
  39. }
  40. void ThreadPoolJob::signalJobShouldExit()
  41. {
  42. shouldStop = true;
  43. }
  44. //==============================================================================
  45. class ThreadPool::ThreadPoolThread : public Thread
  46. {
  47. public:
  48. ThreadPoolThread (ThreadPool& pool_)
  49. : Thread ("Pool"),
  50. pool (pool_)
  51. {
  52. }
  53. void run()
  54. {
  55. while (! threadShouldExit())
  56. {
  57. if (! pool.runNextJob())
  58. wait (500);
  59. }
  60. }
  61. private:
  62. ThreadPool& pool;
  63. JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ThreadPoolThread);
  64. };
  65. //==============================================================================
  66. ThreadPool::ThreadPool (const int numThreads)
  67. {
  68. jassert (numThreads > 0); // not much point having a pool without any threads!
  69. createThreads (numThreads);
  70. }
  71. ThreadPool::ThreadPool()
  72. {
  73. createThreads (SystemStats::getNumCpus());
  74. }
  75. ThreadPool::~ThreadPool()
  76. {
  77. removeAllJobs (true, 5000);
  78. stopThreads();
  79. }
  80. void ThreadPool::createThreads (int numThreads)
  81. {
  82. for (int i = jmax (1, numThreads); --i >= 0;)
  83. threads.add (new ThreadPoolThread (*this));
  84. for (int i = threads.size(); --i >= 0;)
  85. threads.getUnchecked(i)->startThread();
  86. }
  87. void ThreadPool::stopThreads()
  88. {
  89. for (int i = threads.size(); --i >= 0;)
  90. threads.getUnchecked(i)->signalThreadShouldExit();
  91. for (int i = threads.size(); --i >= 0;)
  92. threads.getUnchecked(i)->stopThread (500);
  93. }
  94. void ThreadPool::addJob (ThreadPoolJob* const job, const bool deleteJobWhenFinished)
  95. {
  96. jassert (job != nullptr);
  97. jassert (job->pool == nullptr);
  98. if (job->pool == nullptr)
  99. {
  100. job->pool = this;
  101. job->shouldStop = false;
  102. job->isActive = false;
  103. job->shouldBeDeleted = deleteJobWhenFinished;
  104. {
  105. const ScopedLock sl (lock);
  106. jobs.add (job);
  107. }
  108. for (int i = threads.size(); --i >= 0;)
  109. threads.getUnchecked(i)->notify();
  110. }
  111. }
  112. int ThreadPool::getNumJobs() const
  113. {
  114. return jobs.size();
  115. }
  116. ThreadPoolJob* ThreadPool::getJob (const int index) const
  117. {
  118. const ScopedLock sl (lock);
  119. return jobs [index];
  120. }
  121. bool ThreadPool::contains (const ThreadPoolJob* const job) const
  122. {
  123. const ScopedLock sl (lock);
  124. return jobs.contains (const_cast <ThreadPoolJob*> (job));
  125. }
  126. bool ThreadPool::isJobRunning (const ThreadPoolJob* const job) const
  127. {
  128. const ScopedLock sl (lock);
  129. return jobs.contains (const_cast <ThreadPoolJob*> (job)) && job->isActive;
  130. }
  131. bool ThreadPool::waitForJobToFinish (const ThreadPoolJob* const job,
  132. const int timeOutMs) const
  133. {
  134. if (job != nullptr)
  135. {
  136. const uint32 start = Time::getMillisecondCounter();
  137. while (contains (job))
  138. {
  139. if (timeOutMs >= 0 && Time::getMillisecondCounter() >= start + timeOutMs)
  140. return false;
  141. jobFinishedSignal.wait (2);
  142. }
  143. }
  144. return true;
  145. }
  146. bool ThreadPool::removeJob (ThreadPoolJob* const job,
  147. const bool interruptIfRunning,
  148. const int timeOutMs)
  149. {
  150. bool dontWait = true;
  151. OwnedArray<ThreadPoolJob> deletionList;
  152. if (job != nullptr)
  153. {
  154. const ScopedLock sl (lock);
  155. if (jobs.contains (job))
  156. {
  157. if (job->isActive)
  158. {
  159. if (interruptIfRunning)
  160. job->signalJobShouldExit();
  161. dontWait = false;
  162. }
  163. else
  164. {
  165. jobs.removeValue (job);
  166. addToDeleteList (deletionList, job);
  167. }
  168. }
  169. }
  170. return dontWait || waitForJobToFinish (job, timeOutMs);
  171. }
  172. bool ThreadPool::removeAllJobs (const bool interruptRunningJobs, const int timeOutMs,
  173. ThreadPool::JobSelector* selectedJobsToRemove)
  174. {
  175. Array <ThreadPoolJob*> jobsToWaitFor;
  176. {
  177. OwnedArray<ThreadPoolJob> deletionList;
  178. {
  179. const ScopedLock sl (lock);
  180. for (int i = jobs.size(); --i >= 0;)
  181. {
  182. ThreadPoolJob* const job = jobs.getUnchecked(i);
  183. if (selectedJobsToRemove == nullptr || selectedJobsToRemove->isJobSuitable (job))
  184. {
  185. if (job->isActive)
  186. {
  187. jobsToWaitFor.add (job);
  188. if (interruptRunningJobs)
  189. job->signalJobShouldExit();
  190. }
  191. else
  192. {
  193. jobs.remove (i);
  194. addToDeleteList (deletionList, job);
  195. }
  196. }
  197. }
  198. }
  199. }
  200. const uint32 start = Time::getMillisecondCounter();
  201. for (;;)
  202. {
  203. for (int i = jobsToWaitFor.size(); --i >= 0;)
  204. {
  205. ThreadPoolJob* const job = jobsToWaitFor.getUnchecked (i);
  206. if (! isJobRunning (job))
  207. jobsToWaitFor.remove (i);
  208. }
  209. if (jobsToWaitFor.size() == 0)
  210. break;
  211. if (timeOutMs >= 0 && Time::getMillisecondCounter() >= start + timeOutMs)
  212. return false;
  213. jobFinishedSignal.wait (20);
  214. }
  215. return true;
  216. }
  217. StringArray ThreadPool::getNamesOfAllJobs (const bool onlyReturnActiveJobs) const
  218. {
  219. StringArray s;
  220. const ScopedLock sl (lock);
  221. for (int i = 0; i < jobs.size(); ++i)
  222. {
  223. const ThreadPoolJob* const job = jobs.getUnchecked(i);
  224. if (job->isActive || ! onlyReturnActiveJobs)
  225. s.add (job->getJobName());
  226. }
  227. return s;
  228. }
  229. bool ThreadPool::setThreadPriorities (const int newPriority)
  230. {
  231. bool ok = true;
  232. for (int i = threads.size(); --i >= 0;)
  233. if (! threads.getUnchecked(i)->setPriority (newPriority))
  234. ok = false;
  235. return ok;
  236. }
  237. ThreadPoolJob* ThreadPool::pickNextJobToRun()
  238. {
  239. OwnedArray<ThreadPoolJob> deletionList;
  240. {
  241. const ScopedLock sl (lock);
  242. for (int i = 0; i < jobs.size(); ++i)
  243. {
  244. ThreadPoolJob* job = jobs[i];
  245. if (job != nullptr && ! job->isActive)
  246. {
  247. if (job->shouldStop)
  248. {
  249. jobs.remove (i);
  250. addToDeleteList (deletionList, job);
  251. --i;
  252. continue;
  253. }
  254. job->isActive = true;
  255. return job;
  256. }
  257. }
  258. }
  259. return nullptr;
  260. }
  261. bool ThreadPool::runNextJob()
  262. {
  263. ThreadPoolJob* const job = pickNextJobToRun();
  264. if (job == nullptr)
  265. return false;
  266. ThreadPoolJob::JobStatus result = ThreadPoolJob::jobHasFinished;
  267. JUCE_TRY
  268. {
  269. result = job->runJob();
  270. }
  271. JUCE_CATCH_ALL_ASSERT
  272. OwnedArray<ThreadPoolJob> deletionList;
  273. {
  274. const ScopedLock sl (lock);
  275. if (jobs.contains (job))
  276. {
  277. job->isActive = false;
  278. if (result != ThreadPoolJob::jobNeedsRunningAgain || job->shouldStop)
  279. {
  280. jobs.removeValue (job);
  281. addToDeleteList (deletionList, job);
  282. jobFinishedSignal.signal();
  283. }
  284. else
  285. {
  286. // move the job to the end of the queue if it wants another go
  287. jobs.move (jobs.indexOf (job), -1);
  288. }
  289. }
  290. }
  291. return true;
  292. }
  293. void ThreadPool::addToDeleteList (OwnedArray<ThreadPoolJob>& deletionList, ThreadPoolJob* const job) const
  294. {
  295. job->shouldStop = true;
  296. job->pool = nullptr;
  297. if (job->shouldBeDeleted)
  298. deletionList.add (job);
  299. }