/* ============================================================================== This file is part of the JUCE library - "Jules' Utility Class Extensions" Copyright 2004-11 by Raw Material Software Ltd. ------------------------------------------------------------------------------ JUCE can be redistributed and/or modified under the terms of the GNU General Public License (Version 2), as published by the Free Software Foundation. A copy of the license is included in the JUCE distribution, or can be found online at www.gnu.org/licenses. JUCE is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. ------------------------------------------------------------------------------ To release a closed-source product which uses JUCE, commercial licenses are available: visit www.rawmaterialsoftware.com/juce for more information. ============================================================================== */ BEGIN_JUCE_NAMESPACE //============================================================================== ThreadPoolJob::ThreadPoolJob (const String& name) : jobName (name), pool (nullptr), shouldStop (false), isActive (false), shouldBeDeleted (false) { } ThreadPoolJob::~ThreadPoolJob() { // you mustn't delete a job while it's still in a pool! Use ThreadPool::removeJob() // to remove it first! jassert (pool == nullptr || ! pool->contains (this)); } String ThreadPoolJob::getJobName() const { return jobName; } void ThreadPoolJob::setJobName (const String& newName) { jobName = newName; } void ThreadPoolJob::signalJobShouldExit() { shouldStop = true; } //============================================================================== class ThreadPool::ThreadPoolThread : public Thread { public: ThreadPoolThread (ThreadPool& pool_) : Thread ("Pool"), pool (pool_), busy (false) { } void run() { while (! threadShouldExit()) { if (! pool.runNextJob()) wait (500); } } private: ThreadPool& pool; bool volatile busy; JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ThreadPoolThread); }; //============================================================================== ThreadPool::ThreadPool (const int numThreads, const bool startThreadsOnlyWhenNeeded, const int stopThreadsWhenNotUsedTimeoutMs) : threadStopTimeout (stopThreadsWhenNotUsedTimeoutMs), priority (5) { jassert (numThreads > 0); // not much point having one of these with no threads in it. for (int i = jmax (1, numThreads); --i >= 0;) threads.add (new ThreadPoolThread (*this)); if (! startThreadsOnlyWhenNeeded) for (int i = threads.size(); --i >= 0;) threads.getUnchecked(i)->startThread (priority); } ThreadPool::~ThreadPool() { removeAllJobs (true, 4000); int i; for (i = threads.size(); --i >= 0;) threads.getUnchecked(i)->signalThreadShouldExit(); for (i = threads.size(); --i >= 0;) threads.getUnchecked(i)->stopThread (500); } void ThreadPool::addJob (ThreadPoolJob* const job) { jassert (job != nullptr); jassert (job->pool == nullptr); if (job->pool == nullptr) { job->pool = this; job->shouldStop = false; job->isActive = false; { const ScopedLock sl (lock); jobs.add (job); int numRunning = 0; for (int i = threads.size(); --i >= 0;) if (threads.getUnchecked(i)->isThreadRunning() && ! threads.getUnchecked(i)->threadShouldExit()) ++numRunning; if (numRunning < threads.size()) { bool startedOne = false; int n = 1000; while (--n >= 0 && ! startedOne) { for (int i = threads.size(); --i >= 0;) { if (! threads.getUnchecked(i)->isThreadRunning()) { threads.getUnchecked(i)->startThread (priority); startedOne = true; break; } } if (! startedOne) Thread::sleep (2); } } } for (int i = threads.size(); --i >= 0;) threads.getUnchecked(i)->notify(); } } int ThreadPool::getNumJobs() const { return jobs.size(); } ThreadPoolJob* ThreadPool::getJob (const int index) const { const ScopedLock sl (lock); return jobs [index]; } bool ThreadPool::contains (const ThreadPoolJob* const job) const { const ScopedLock sl (lock); return jobs.contains (const_cast (job)); } bool ThreadPool::isJobRunning (const ThreadPoolJob* const job) const { const ScopedLock sl (lock); return jobs.contains (const_cast (job)) && job->isActive; } bool ThreadPool::waitForJobToFinish (const ThreadPoolJob* const job, const int timeOutMs) const { if (job != nullptr) { const uint32 start = Time::getMillisecondCounter(); while (contains (job)) { if (timeOutMs >= 0 && Time::getMillisecondCounter() >= start + timeOutMs) return false; jobFinishedSignal.wait (2); } } return true; } bool ThreadPool::removeJob (ThreadPoolJob* const job, const bool interruptIfRunning, const int timeOutMs) { bool dontWait = true; if (job != nullptr) { const ScopedLock sl (lock); if (jobs.contains (job)) { if (job->isActive) { if (interruptIfRunning) job->signalJobShouldExit(); dontWait = false; } else { jobs.removeValue (job); job->pool = nullptr; } } } return dontWait || waitForJobToFinish (job, timeOutMs); } bool ThreadPool::removeAllJobs (const bool interruptRunningJobs, const int timeOutMs, const bool deleteInactiveJobs, ThreadPool::JobSelector* selectedJobsToRemove) { Array jobsToWaitFor; { const ScopedLock sl (lock); for (int i = jobs.size(); --i >= 0;) { ThreadPoolJob* const job = jobs.getUnchecked(i); if (selectedJobsToRemove == nullptr || selectedJobsToRemove->isJobSuitable (job)) { if (job->isActive) { jobsToWaitFor.add (job); if (interruptRunningJobs) job->signalJobShouldExit(); } else { jobs.remove (i); if (deleteInactiveJobs) delete job; else job->pool = nullptr; } } } } const uint32 start = Time::getMillisecondCounter(); for (;;) { for (int i = jobsToWaitFor.size(); --i >= 0;) if (! isJobRunning (jobsToWaitFor.getUnchecked (i))) jobsToWaitFor.remove (i); if (jobsToWaitFor.size() == 0) break; if (timeOutMs >= 0 && Time::getMillisecondCounter() >= start + timeOutMs) return false; jobFinishedSignal.wait (20); } return true; } StringArray ThreadPool::getNamesOfAllJobs (const bool onlyReturnActiveJobs) const { StringArray s; const ScopedLock sl (lock); for (int i = 0; i < jobs.size(); ++i) { const ThreadPoolJob* const job = jobs.getUnchecked(i); if (job->isActive || ! onlyReturnActiveJobs) s.add (job->getJobName()); } return s; } bool ThreadPool::setThreadPriorities (const int newPriority) { bool ok = true; if (priority != newPriority) { priority = newPriority; for (int i = threads.size(); --i >= 0;) if (! threads.getUnchecked(i)->setPriority (newPriority)) ok = false; } return ok; } bool ThreadPool::runNextJob() { ThreadPoolJob* job = nullptr; { const ScopedLock sl (lock); for (int i = 0; i < jobs.size(); ++i) { job = jobs[i]; if (job != nullptr && ! (job->isActive || job->shouldStop)) break; job = nullptr; } if (job != nullptr) job->isActive = true; } if (job != nullptr) { JUCE_TRY { ThreadPoolJob::JobStatus result = job->runJob(); lastJobEndTime = Time::getApproximateMillisecondCounter(); const ScopedLock sl (lock); if (jobs.contains (job)) { job->isActive = false; if (result != ThreadPoolJob::jobNeedsRunningAgain || job->shouldStop) { job->pool = nullptr; job->shouldStop = true; jobs.removeValue (job); if (result == ThreadPoolJob::jobHasFinishedAndShouldBeDeleted) delete job; jobFinishedSignal.signal(); } else { // move the job to the end of the queue if it wants another go jobs.move (jobs.indexOf (job), -1); } } } #if JUCE_CATCH_UNHANDLED_EXCEPTIONS catch (...) { const ScopedLock sl (lock); jobs.removeValue (job); } #endif } else { if (threadStopTimeout > 0 && Time::getApproximateMillisecondCounter() > lastJobEndTime + threadStopTimeout) { const ScopedLock sl (lock); if (jobs.size() == 0) for (int i = threads.size(); --i >= 0;) threads.getUnchecked(i)->signalThreadShouldExit(); } else { return false; } } return true; } END_JUCE_NAMESPACE