OpenShot Library | libopenshot-audio  0.2.0
juce_ThreadPool.cpp
1 /*
2  ==============================================================================
3 
4  This file is part of the JUCE library.
5  Copyright (c) 2017 - ROLI Ltd.
6 
7  JUCE is an open source library subject to commercial or open-source
8  licensing.
9 
10  The code included in this file is provided under the terms of the ISC license
11  http://www.isc.org/downloads/software-support-policy/isc-license. Permission
12  To use, copy, modify, and/or distribute this software for any purpose with or
13  without fee is hereby granted provided that the above copyright notice and
14  this permission notice appear in all copies.
15 
16  JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
17  EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
18  DISCLAIMED.
19 
20  ==============================================================================
21 */
22 
23 namespace juce
24 {
25 
27 {
28  ThreadPoolThread (ThreadPool& p, size_t stackSize)
29  : Thread ("Pool", stackSize), pool (p)
30  {
31  }
32 
33  void run() override
34  {
35  while (! threadShouldExit())
36  if (! pool.runNextJob (*this))
37  wait (500);
38  }
39 
40  std::atomic<ThreadPoolJob*> currentJob { nullptr };
41  ThreadPool& pool;
42 
43  JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ThreadPoolThread)
44 };
45 
46 //==============================================================================
47 ThreadPoolJob::ThreadPoolJob (const String& name) : jobName (name)
48 {
49 }
50 
52 {
53  // you mustn't delete a job while it's still in a pool! Use ThreadPool::removeJob()
54  // to remove it first!
55  jassert (pool == nullptr || ! pool->contains (this));
56 }
57 
59 {
60  return jobName;
61 }
62 
63 void ThreadPoolJob::setJobName (const String& newName)
64 {
65  jobName = newName;
66 }
67 
69 {
70  shouldStop = true;
71  listeners.call ([] (Thread::Listener& l) { l.exitSignalSent(); });
72 }
73 
75 {
76  listeners.add (listener);
77 }
78 
80 {
81  listeners.remove (listener);
82 }
83 
85 {
86  if (auto* t = dynamic_cast<ThreadPool::ThreadPoolThread*> (Thread::getCurrentThread()))
87  return t->currentJob.load();
88 
89  return nullptr;
90 }
91 
92 //==============================================================================
93 ThreadPool::ThreadPool (int numThreads, size_t threadStackSize)
94 {
95  jassert (numThreads > 0); // not much point having a pool without any threads!
96 
97  createThreads (numThreads, threadStackSize);
98 }
99 
101 {
102  createThreads (SystemStats::getNumCpus());
103 }
104 
106 {
107  removeAllJobs (true, 5000);
108  stopThreads();
109 }
110 
111 void ThreadPool::createThreads (int numThreads, size_t threadStackSize)
112 {
113  for (int i = jmax (1, numThreads); --i >= 0;)
114  threads.add (new ThreadPoolThread (*this, threadStackSize));
115 
116  for (auto* t : threads)
117  t->startThread();
118 }
119 
120 void ThreadPool::stopThreads()
121 {
122  for (auto* t : threads)
123  t->signalThreadShouldExit();
124 
125  for (auto* t : threads)
126  t->stopThread (500);
127 }
128 
129 void ThreadPool::addJob (ThreadPoolJob* job, bool deleteJobWhenFinished)
130 {
131  jassert (job != nullptr);
132  jassert (job->pool == nullptr);
133 
134  if (job->pool == nullptr)
135  {
136  job->pool = this;
137  job->shouldStop = false;
138  job->isActive = false;
139  job->shouldBeDeleted = deleteJobWhenFinished;
140 
141  {
142  const ScopedLock sl (lock);
143  jobs.add (job);
144  }
145 
146  for (auto* t : threads)
147  t->notify();
148  }
149 }
150 
151 void ThreadPool::addJob (std::function<ThreadPoolJob::JobStatus()> jobToRun)
152 {
153  struct LambdaJobWrapper : public ThreadPoolJob
154  {
155  LambdaJobWrapper (std::function<ThreadPoolJob::JobStatus()> j) : ThreadPoolJob ("lambda"), job (j) {}
156  JobStatus runJob() override { return job(); }
157 
158  std::function<ThreadPoolJob::JobStatus()> job;
159  };
160 
161  addJob (new LambdaJobWrapper (jobToRun), true);
162 }
163 
164 void ThreadPool::addJob (std::function<void()> jobToRun)
165 {
166  struct LambdaJobWrapper : public ThreadPoolJob
167  {
168  LambdaJobWrapper (std::function<void()> j) : ThreadPoolJob ("lambda"), job (j) {}
169  JobStatus runJob() override { job(); return ThreadPoolJob::jobHasFinished; }
170 
171  std::function<void()> job;
172  };
173 
174  addJob (new LambdaJobWrapper (jobToRun), true);
175 }
176 
177 int ThreadPool::getNumJobs() const noexcept
178 {
179  return jobs.size();
180 }
181 
182 int ThreadPool::getNumThreads() const noexcept
183 {
184  return threads.size();
185 }
186 
187 ThreadPoolJob* ThreadPool::getJob (int index) const noexcept
188 {
189  const ScopedLock sl (lock);
190  return jobs [index];
191 }
192 
193 bool ThreadPool::contains (const ThreadPoolJob* job) const noexcept
194 {
195  const ScopedLock sl (lock);
196  return jobs.contains (const_cast<ThreadPoolJob*> (job));
197 }
198 
199 bool ThreadPool::isJobRunning (const ThreadPoolJob* job) const noexcept
200 {
201  const ScopedLock sl (lock);
202  return jobs.contains (const_cast<ThreadPoolJob*> (job)) && job->isActive;
203 }
204 
205 void ThreadPool::moveJobToFront (const ThreadPoolJob* job) noexcept
206 {
207  const ScopedLock sl (lock);
208 
209  auto index = jobs.indexOf (const_cast<ThreadPoolJob*> (job));
210 
211  if (index > 0 && ! job->isActive)
212  jobs.move (index, 0);
213 }
214 
215 bool ThreadPool::waitForJobToFinish (const ThreadPoolJob* job, int timeOutMs) const
216 {
217  if (job != nullptr)
218  {
219  auto start = Time::getMillisecondCounter();
220 
221  while (contains (job))
222  {
223  if (timeOutMs >= 0 && Time::getMillisecondCounter() >= start + (uint32) timeOutMs)
224  return false;
225 
226  jobFinishedSignal.wait (2);
227  }
228  }
229 
230  return true;
231 }
232 
233 bool ThreadPool::removeJob (ThreadPoolJob* job, bool interruptIfRunning, int timeOutMs)
234 {
235  bool dontWait = true;
236  OwnedArray<ThreadPoolJob> deletionList;
237 
238  if (job != nullptr)
239  {
240  const ScopedLock sl (lock);
241 
242  if (jobs.contains (job))
243  {
244  if (job->isActive)
245  {
246  if (interruptIfRunning)
247  job->signalJobShouldExit();
248 
249  dontWait = false;
250  }
251  else
252  {
253  jobs.removeFirstMatchingValue (job);
254  addToDeleteList (deletionList, job);
255  }
256  }
257  }
258 
259  return dontWait || waitForJobToFinish (job, timeOutMs);
260 }
261 
262 bool ThreadPool::removeAllJobs (bool interruptRunningJobs, int timeOutMs,
263  ThreadPool::JobSelector* selectedJobsToRemove)
264 {
265  Array<ThreadPoolJob*> jobsToWaitFor;
266 
267  {
268  OwnedArray<ThreadPoolJob> deletionList;
269 
270  {
271  const ScopedLock sl (lock);
272 
273  for (int i = jobs.size(); --i >= 0;)
274  {
275  auto* job = jobs.getUnchecked(i);
276 
277  if (selectedJobsToRemove == nullptr || selectedJobsToRemove->isJobSuitable (job))
278  {
279  if (job->isActive)
280  {
281  jobsToWaitFor.add (job);
282 
283  if (interruptRunningJobs)
284  job->signalJobShouldExit();
285  }
286  else
287  {
288  jobs.remove (i);
289  addToDeleteList (deletionList, job);
290  }
291  }
292  }
293  }
294  }
295 
296  auto start = Time::getMillisecondCounter();
297 
298  for (;;)
299  {
300  for (int i = jobsToWaitFor.size(); --i >= 0;)
301  {
302  auto* job = jobsToWaitFor.getUnchecked (i);
303 
304  if (! isJobRunning (job))
305  jobsToWaitFor.remove (i);
306  }
307 
308  if (jobsToWaitFor.size() == 0)
309  break;
310 
311  if (timeOutMs >= 0 && Time::getMillisecondCounter() >= start + (uint32) timeOutMs)
312  return false;
313 
314  jobFinishedSignal.wait (20);
315  }
316 
317  return true;
318 }
319 
320 StringArray ThreadPool::getNamesOfAllJobs (bool onlyReturnActiveJobs) const
321 {
322  StringArray s;
323  const ScopedLock sl (lock);
324 
325  for (auto* job : jobs)
326  if (job->isActive || ! onlyReturnActiveJobs)
327  s.add (job->getJobName());
328 
329  return s;
330 }
331 
332 bool ThreadPool::setThreadPriorities (int newPriority)
333 {
334  bool ok = true;
335 
336  for (auto* t : threads)
337  if (! t->setPriority (newPriority))
338  ok = false;
339 
340  return ok;
341 }
342 
343 ThreadPoolJob* ThreadPool::pickNextJobToRun()
344 {
345  OwnedArray<ThreadPoolJob> deletionList;
346 
347  {
348  const ScopedLock sl (lock);
349 
350  for (int i = 0; i < jobs.size(); ++i)
351  {
352  if (auto* job = jobs[i])
353  {
354  if (! job->isActive)
355  {
356  if (job->shouldStop)
357  {
358  jobs.remove (i);
359  addToDeleteList (deletionList, job);
360  --i;
361  continue;
362  }
363 
364  job->isActive = true;
365  return job;
366  }
367  }
368  }
369  }
370 
371  return nullptr;
372 }
373 
374 bool ThreadPool::runNextJob (ThreadPoolThread& thread)
375 {
376  if (auto* job = pickNextJobToRun())
377  {
378  auto result = ThreadPoolJob::jobHasFinished;
379  thread.currentJob = job;
380 
381  try
382  {
383  result = job->runJob();
384  }
385  catch (...)
386  {
387  jassertfalse; // Your runJob() method mustn't throw any exceptions!
388  }
389 
390  thread.currentJob = nullptr;
391 
392  OwnedArray<ThreadPoolJob> deletionList;
393 
394  {
395  const ScopedLock sl (lock);
396 
397  if (jobs.contains (job))
398  {
399  job->isActive = false;
400 
401  if (result != ThreadPoolJob::jobNeedsRunningAgain || job->shouldStop)
402  {
403  jobs.removeFirstMatchingValue (job);
404  addToDeleteList (deletionList, job);
405 
406  jobFinishedSignal.signal();
407  }
408  else
409  {
410  // move the job to the end of the queue if it wants another go
411  jobs.move (jobs.indexOf (job), -1);
412  }
413  }
414  }
415 
416  return true;
417  }
418 
419  return false;
420 }
421 
422 void ThreadPool::addToDeleteList (OwnedArray<ThreadPoolJob>& deletionList, ThreadPoolJob* job) const
423 {
424  job->shouldStop = true;
425  job->pool = nullptr;
426 
427  if (job->shouldBeDeleted)
428  deletionList.add (job);
429 }
430 
431 } // namespace juce
int getNumJobs() const noexcept
Returns the number of jobs currently running or queued.
bool waitForJobToFinish(const ThreadPoolJob *job, int timeOutMilliseconds) const
Waits until a job has finished running and has been removed from the pool.
void removeListener(Thread::Listener *)
Removes a listener added with addListener.
int getNumThreads() const noexcept
Returns the number of threads assigned to this thread pool.
Thread(const String &threadName, size_t threadStackSize=0)
Creates a thread.
Definition: juce_Thread.cpp:26
void add(const ElementType &newElement)
Appends a new element at the end of the array.
Definition: juce_Array.h:375
A callback class used when you need to select which ThreadPoolJob objects are suitable for some kind ...
virtual bool isJobSuitable(ThreadPoolJob *job)=0
Should return true if the specified thread matches your criteria for whatever operation that this obj...
A special array for holding a list of strings.
The JUCE String class!
Definition: juce_String.h:42
virtual ~ThreadPoolJob()
Destructor.
ElementType getUnchecked(int index) const
Returns one of the elements in the array, without checking the index passed in.
Definition: juce_Array.h:256
virtual JobStatus runJob()=0
Peforms the actual work that this job needs to do.
void run() override
Must be implemented to perform the thread&#39;s actual code.
static ThreadPoolJob * getCurrentThreadPoolJob()
If the calling thread is being invoked inside a runJob() method, this will return the ThreadPoolJob t...
bool isJobRunning(const ThreadPoolJob *job) const noexcept
Returns true if the given job is currently being run by a thread.
bool removeAllJobs(bool interruptRunningJobs, int timeOutMilliseconds, JobSelector *selectedJobsToRemove=nullptr)
Tries to remove all jobs from the pool.
JobStatus
These are the values that can be returned by the runJob() method.
Used to receive callbacks for thread exit calls.
Definition: juce_Thread.h:184
bool threadShouldExit() const
Checks whether the thread has been told to stop running.
Encapsulates a thread.
Definition: juce_Thread.h:46
ThreadPoolJob(const String &name)
Creates a thread pool job object.
~ThreadPool()
Destructor.
void addListener(Thread::Listener *)
Add a listener to this thread job which will receive a callback when signalJobShouldExit was called o...
Holds a resizable array of primitive or copy-by-value objects.
Definition: juce_Array.h:59
int size() const noexcept
Returns the current number of elements in the array.
Definition: juce_Array.h:219
virtual void exitSignalSent()=0
Called if Thread::signalThreadShouldExit was called.
indicates that the job has finished and can be removed from the pool.
void moveJobToFront(const ThreadPoolJob *jobToMove) noexcept
If the given job is in the queue, this will move it to the front so that it is the next one to be exe...
StringArray getNamesOfAllJobs(bool onlyReturnActiveJobs) const
Returns a list of the names of all the jobs currently running or queued.
bool setThreadPriorities(int newPriority)
Changes the priority of all the threads.
ObjectClass * add(ObjectClass *newObject) noexcept
Appends a new object to the end of the array.
An array designed for holding objects.
String getJobName() const
Returns the name of this job.
A task that is executed by a ThreadPool object.
void signalJobShouldExit()
Calling this will cause the shouldExit() method to return true, and the job should (if it&#39;s been impl...
static Thread *JUCE_CALLTYPE getCurrentThread()
Finds the thread object that is currently running.
ThreadPool()
Creates a thread pool with one thread per CPU core.
void setJobName(const String &newName)
Changes the job&#39;s name.
void addJob(ThreadPoolJob *job, bool deleteJobWhenFinished)
Adds a job to the queue.
A set of threads that will run a list of jobs.
Automatically locks and unlocks a mutex object.
ThreadPoolJob * getJob(int index) const noexcept
Returns one of the jobs in the queue.
void remove(int indexToRemove)
Removes an element from the array.
Definition: juce_Array.h:724
bool wait(int timeOutMilliseconds) const
Suspends the execution of this thread until either the specified timeout period has elapsed...
indicates that the job would like to be called again when a thread is free.
bool removeJob(ThreadPoolJob *job, bool interruptIfRunning, int timeOutMilliseconds)
Tries to remove a job from the pool.
static int getNumCpus() noexcept
Returns the number of logical CPU cores.
void add(String stringToAdd)
Appends a string at the end of the array.
bool contains(const ThreadPoolJob *job) const noexcept
Returns true if the given job is currently queued or running.
static uint32 getMillisecondCounter() noexcept
Returns the number of millisecs since a fixed event (usually system startup).
Definition: juce_Time.cpp:226