The JUCE cross-platform C++ framework, with DISTRHO/KXStudio specific changes
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

395 lines
10KB

  1. /*
  2. ==============================================================================
  3. This file is part of the JUCE library.
  4. Copyright (c) 2016 - ROLI Ltd.
  5. Permission is granted to use this software under the terms of the ISC license
  6. http://www.isc.org/downloads/software-support-policy/isc-license/
  7. Permission to use, copy, modify, and/or distribute this software for any
  8. purpose with or without fee is hereby granted, provided that the above
  9. copyright notice and this permission notice appear in all copies.
  10. THE SOFTWARE IS PROVIDED "AS IS" AND ISC DISCLAIMS ALL WARRANTIES WITH REGARD
  11. TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
  12. FITNESS. IN NO EVENT SHALL ISC BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT,
  13. OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF
  14. USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER
  15. TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
  16. OF THIS SOFTWARE.
  17. -----------------------------------------------------------------------------
  18. To release a closed-source product which uses other parts of JUCE not
  19. licensed under the ISC terms, commercial licenses are available: visit
  20. www.juce.com for more information.
  21. ==============================================================================
  22. */
  23. class ThreadPool::ThreadPoolThread : public Thread
  24. {
  25. public:
  26. ThreadPoolThread (ThreadPool& p, size_t stackSize = 0)
  27. : Thread ("Pool", stackSize), currentJob (nullptr), pool (p)
  28. {
  29. }
  30. void run() override
  31. {
  32. while (! threadShouldExit())
  33. if (! pool.runNextJob (*this))
  34. wait (500);
  35. }
  36. ThreadPoolJob* volatile currentJob;
  37. ThreadPool& pool;
  38. JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ThreadPoolThread)
  39. };
  40. //==============================================================================
  41. ThreadPoolJob::ThreadPoolJob (const String& name)
  42. : jobName (name), pool (nullptr),
  43. shouldStop (false), isActive (false), shouldBeDeleted (false)
  44. {
  45. }
  46. ThreadPoolJob::~ThreadPoolJob()
  47. {
  48. // you mustn't delete a job while it's still in a pool! Use ThreadPool::removeJob()
  49. // to remove it first!
  50. jassert (pool == nullptr || ! pool->contains (this));
  51. }
  52. String ThreadPoolJob::getJobName() const
  53. {
  54. return jobName;
  55. }
  56. void ThreadPoolJob::setJobName (const String& newName)
  57. {
  58. jobName = newName;
  59. }
  60. void ThreadPoolJob::signalJobShouldExit()
  61. {
  62. shouldStop = true;
  63. }
  64. ThreadPoolJob* ThreadPoolJob::getCurrentThreadPoolJob()
  65. {
  66. if (ThreadPool::ThreadPoolThread* t = dynamic_cast<ThreadPool::ThreadPoolThread*> (Thread::getCurrentThread()))
  67. return t->currentJob;
  68. return nullptr;
  69. }
  70. //==============================================================================
  71. ThreadPool::ThreadPool (const int numThreads, size_t threadStackSize)
  72. {
  73. jassert (numThreads > 0); // not much point having a pool without any threads!
  74. createThreads (numThreads, threadStackSize);
  75. }
  76. ThreadPool::ThreadPool()
  77. {
  78. createThreads (SystemStats::getNumCpus());
  79. }
  80. ThreadPool::~ThreadPool()
  81. {
  82. removeAllJobs (true, 5000);
  83. stopThreads();
  84. }
  85. void ThreadPool::createThreads (int numThreads, size_t threadStackSize)
  86. {
  87. for (int i = jmax (1, numThreads); --i >= 0;)
  88. threads.add (new ThreadPoolThread (*this, threadStackSize));
  89. for (int i = threads.size(); --i >= 0;)
  90. threads.getUnchecked(i)->startThread();
  91. }
  92. void ThreadPool::stopThreads()
  93. {
  94. for (int i = threads.size(); --i >= 0;)
  95. threads.getUnchecked(i)->signalThreadShouldExit();
  96. for (int i = threads.size(); --i >= 0;)
  97. threads.getUnchecked(i)->stopThread (500);
  98. }
  99. void ThreadPool::addJob (ThreadPoolJob* const job, const bool deleteJobWhenFinished)
  100. {
  101. jassert (job != nullptr);
  102. jassert (job->pool == nullptr);
  103. if (job->pool == nullptr)
  104. {
  105. job->pool = this;
  106. job->shouldStop = false;
  107. job->isActive = false;
  108. job->shouldBeDeleted = deleteJobWhenFinished;
  109. {
  110. const ScopedLock sl (lock);
  111. jobs.add (job);
  112. }
  113. for (int i = threads.size(); --i >= 0;)
  114. threads.getUnchecked(i)->notify();
  115. }
  116. }
  117. int ThreadPool::getNumJobs() const
  118. {
  119. return jobs.size();
  120. }
  121. int ThreadPool::getNumThreads() const
  122. {
  123. return threads.size();
  124. }
  125. ThreadPoolJob* ThreadPool::getJob (const int index) const
  126. {
  127. const ScopedLock sl (lock);
  128. return jobs [index];
  129. }
  130. bool ThreadPool::contains (const ThreadPoolJob* const job) const
  131. {
  132. const ScopedLock sl (lock);
  133. return jobs.contains (const_cast<ThreadPoolJob*> (job));
  134. }
  135. bool ThreadPool::isJobRunning (const ThreadPoolJob* const job) const
  136. {
  137. const ScopedLock sl (lock);
  138. return jobs.contains (const_cast<ThreadPoolJob*> (job)) && job->isActive;
  139. }
  140. bool ThreadPool::waitForJobToFinish (const ThreadPoolJob* const job, const int timeOutMs) const
  141. {
  142. if (job != nullptr)
  143. {
  144. const uint32 start = Time::getMillisecondCounter();
  145. while (contains (job))
  146. {
  147. if (timeOutMs >= 0 && Time::getMillisecondCounter() >= start + (uint32) timeOutMs)
  148. return false;
  149. jobFinishedSignal.wait (2);
  150. }
  151. }
  152. return true;
  153. }
  154. bool ThreadPool::removeJob (ThreadPoolJob* const job,
  155. const bool interruptIfRunning,
  156. const int timeOutMs)
  157. {
  158. bool dontWait = true;
  159. OwnedArray<ThreadPoolJob> deletionList;
  160. if (job != nullptr)
  161. {
  162. const ScopedLock sl (lock);
  163. if (jobs.contains (job))
  164. {
  165. if (job->isActive)
  166. {
  167. if (interruptIfRunning)
  168. job->signalJobShouldExit();
  169. dontWait = false;
  170. }
  171. else
  172. {
  173. jobs.removeFirstMatchingValue (job);
  174. addToDeleteList (deletionList, job);
  175. }
  176. }
  177. }
  178. return dontWait || waitForJobToFinish (job, timeOutMs);
  179. }
  180. bool ThreadPool::removeAllJobs (const bool interruptRunningJobs, const int timeOutMs,
  181. ThreadPool::JobSelector* const selectedJobsToRemove)
  182. {
  183. Array <ThreadPoolJob*> jobsToWaitFor;
  184. {
  185. OwnedArray<ThreadPoolJob> deletionList;
  186. {
  187. const ScopedLock sl (lock);
  188. for (int i = jobs.size(); --i >= 0;)
  189. {
  190. ThreadPoolJob* const job = jobs.getUnchecked(i);
  191. if (selectedJobsToRemove == nullptr || selectedJobsToRemove->isJobSuitable (job))
  192. {
  193. if (job->isActive)
  194. {
  195. jobsToWaitFor.add (job);
  196. if (interruptRunningJobs)
  197. job->signalJobShouldExit();
  198. }
  199. else
  200. {
  201. jobs.remove (i);
  202. addToDeleteList (deletionList, job);
  203. }
  204. }
  205. }
  206. }
  207. }
  208. const uint32 start = Time::getMillisecondCounter();
  209. for (;;)
  210. {
  211. for (int i = jobsToWaitFor.size(); --i >= 0;)
  212. {
  213. ThreadPoolJob* const job = jobsToWaitFor.getUnchecked (i);
  214. if (! isJobRunning (job))
  215. jobsToWaitFor.remove (i);
  216. }
  217. if (jobsToWaitFor.size() == 0)
  218. break;
  219. if (timeOutMs >= 0 && Time::getMillisecondCounter() >= start + (uint32) timeOutMs)
  220. return false;
  221. jobFinishedSignal.wait (20);
  222. }
  223. return true;
  224. }
  225. StringArray ThreadPool::getNamesOfAllJobs (const bool onlyReturnActiveJobs) const
  226. {
  227. StringArray s;
  228. const ScopedLock sl (lock);
  229. for (int i = 0; i < jobs.size(); ++i)
  230. {
  231. const ThreadPoolJob* const job = jobs.getUnchecked(i);
  232. if (job->isActive || ! onlyReturnActiveJobs)
  233. s.add (job->getJobName());
  234. }
  235. return s;
  236. }
  237. bool ThreadPool::setThreadPriorities (const int newPriority)
  238. {
  239. bool ok = true;
  240. for (int i = threads.size(); --i >= 0;)
  241. if (! threads.getUnchecked(i)->setPriority (newPriority))
  242. ok = false;
  243. return ok;
  244. }
  245. ThreadPoolJob* ThreadPool::pickNextJobToRun()
  246. {
  247. OwnedArray<ThreadPoolJob> deletionList;
  248. {
  249. const ScopedLock sl (lock);
  250. for (int i = 0; i < jobs.size(); ++i)
  251. {
  252. ThreadPoolJob* job = jobs[i];
  253. if (job != nullptr && ! job->isActive)
  254. {
  255. if (job->shouldStop)
  256. {
  257. jobs.remove (i);
  258. addToDeleteList (deletionList, job);
  259. --i;
  260. continue;
  261. }
  262. job->isActive = true;
  263. return job;
  264. }
  265. }
  266. }
  267. return nullptr;
  268. }
  269. bool ThreadPool::runNextJob (ThreadPoolThread& thread)
  270. {
  271. if (ThreadPoolJob* const job = pickNextJobToRun())
  272. {
  273. ThreadPoolJob::JobStatus result = ThreadPoolJob::jobHasFinished;
  274. thread.currentJob = job;
  275. try
  276. {
  277. result = job->runJob();
  278. }
  279. catch (...)
  280. {
  281. jassertfalse; // Your runJob() method mustn't throw any exceptions!
  282. }
  283. thread.currentJob = nullptr;
  284. OwnedArray<ThreadPoolJob> deletionList;
  285. {
  286. const ScopedLock sl (lock);
  287. if (jobs.contains (job))
  288. {
  289. job->isActive = false;
  290. if (result != ThreadPoolJob::jobNeedsRunningAgain || job->shouldStop)
  291. {
  292. jobs.removeFirstMatchingValue (job);
  293. addToDeleteList (deletionList, job);
  294. jobFinishedSignal.signal();
  295. }
  296. else
  297. {
  298. // move the job to the end of the queue if it wants another go
  299. jobs.move (jobs.indexOf (job), -1);
  300. }
  301. }
  302. }
  303. return true;
  304. }
  305. return false;
  306. }
  307. void ThreadPool::addToDeleteList (OwnedArray<ThreadPoolJob>& deletionList, ThreadPoolJob* const job) const
  308. {
  309. job->shouldStop = true;
  310. job->pool = nullptr;
  311. if (job->shouldBeDeleted)
  312. deletionList.add (job);
  313. }