libkdepim

weaver.cpp

00001 /* -*- C++ -*-
00002 
00003    This file implements the Weaver, Job and Thread classes.
00004 
00005    $ Author: Mirko Boehm $
00006    $ Copyright: (C) 2004, Mirko Boehm $
00007    $ Contact: mirko@kde.org
00008          http://www.kde.org
00009          http://www.hackerbuero.org $
00010    $ License: LGPL with the following explicit clarification:
00011          This code may be linked against any version of the Qt toolkit
00012          from Troll Tech, Norway. $
00013 
00014 */
00015 
00016 extern "C" {
00017 #include <signal.h>
00018 }
00019 
00020 #include <qevent.h>
00021 #include <qapplication.h>
00022 
00023 #include "weaver.h"
00024 
00025 namespace KPIM {
00026 namespace ThreadWeaver {
00027 
00028     bool Debug = true;
00029     int DebugLevel = 2;
00030 
00031     Job::Job (QObject* parent, const char* name)
00032         : QObject (parent, name),
00033           m_finished (false),
00034           m_mutex (new QMutex (true) ),
00035           m_thread (0)
00036     {
00037     }
00038 
00039     Job::~Job()
00040     {
00041     }
00042 
00043     void Job::lock()
00044     {
00045         m_mutex->lock();
00046     }
00047 
00048     void Job::unlock()
00049     {
00050         m_mutex->unlock();
00051     }
00052 
00053     void Job::execute(Thread *th)
00054     {
00055         m_mutex->lock();
00056         m_thread = th;
00057         m_mutex->unlock();
00058 
00059         run ();
00060 
00061         m_mutex->lock();
00062         setFinished (true);
00063         m_thread = 0;
00064         m_mutex->unlock();
00065     }
00066 
00067     Thread *Job::thread ()
00068     {
00069         QMutexLocker l (m_mutex);
00070         return m_thread;
00071     }
00072 
00073     bool Job::isFinished() const
00074     {
00075         QMutexLocker l (m_mutex);
00076         return m_finished;
00077     }
00078 
00079     void Job::setFinished(bool status)
00080     {
00081         QMutexLocker l (m_mutex);
00082         m_finished = status;
00083     }
00084 
00085     void Job::processEvent (Event *e)
00086     {
00087         switch ( e->action() )
00088         {
00089             case Event::JobStarted:
00090                 emit ( started() );
00091                 break;
00092             case Event::JobFinished:
00093                 emit ( done() );
00094                 break;
00095             case Event::JobSPR:
00096                 emit ( SPR () );
00097                 m_wc->wakeOne ();
00098                 break;
00099             case Event::JobAPR:
00100                 emit ( APR () );
00101                 //  no wake here !
00102                 break;
00103             default:
00104                 break;
00105         }
00106     }
00107 
00108     void Job::triggerSPR ()
00109     {
00110         m_mutex->lock ();
00111         m_wc = new QWaitCondition;
00112         m_mutex->unlock ();
00113 
00114         thread()->post (KPIM::ThreadWeaver::Event::JobSPR, this);
00115         m_wc->wait ();
00116 
00117         m_mutex->lock ();
00118         delete m_wc;
00119         m_wc = 0;
00120         m_mutex->unlock ();
00121     }
00122 
00123     void Job::triggerAPR ()
00124     {
00125         m_mutex->lock ();
00126         m_wc = new QWaitCondition;
00127         m_mutex->unlock ();
00128 
00129         thread()->post (KPIM::ThreadWeaver::Event::JobAPR, this);
00130         m_wc->wait ();
00131     }
00132 
00133     void Job::wakeAPR ()
00134     {
00135         QMutexLocker l(m_mutex);
00136         if ( m_wc!=0 )
00137         {
00138             m_wc->wakeOne ();
00139             delete m_wc;
00140             m_wc = 0;
00141         }
00142     }
00143 
00144     const int Event::Type = QEvent::User + 1000;
00145 
00146     Event::Event ( Action action, Thread *thread, Job *job)
00147         : QCustomEvent ( type () ),
00148           m_action (action),
00149           m_thread (thread),
00150           m_job (job)
00151     {
00152     }
00153 
00154     const int Event::type ()
00155     {
00156         return Type;
00157     }
00158 
00159     Thread* Event::thread () const
00160     {
00161         if ( m_thread != 0)
00162         {
00163             return m_thread;
00164         } else {
00165             return 0;
00166         }
00167     }
00168 
00169     Job* Event::job () const
00170     {
00171         return m_job;
00172     }
00173 
00174     Event::Action Event::action () const
00175     {
00176         return m_action;
00177     }
00178 
00179     unsigned int Thread::sm_Id;
00180 
00181     Thread::Thread (Weaver *parent)
00182         : QThread (),
00183           m_parent ( parent ),
00184           m_id ( makeId() )
00185     {
00186     }
00187 
00188     Thread::~Thread()
00189     {
00190     }
00191 
00192     unsigned int Thread::makeId()
00193     {
00194         static QMutex mutex;
00195         QMutexLocker l (&mutex);
00196 
00197         return ++sm_Id;
00198     }
00199 
00200     const unsigned int Thread::id() const
00201     {
00202         return m_id;
00203     }
00204 
00205     void Thread::run()
00206     {
00207         Job *job = 0;
00208 
00209         post ( Event::ThreadStarted );
00210 
00211         while (true)
00212         {
00213             debug ( 3, "Thread::run [%u]: trying to execute the next job.\n", id() );
00214 
00215             job = m_parent->applyForWork ( this, job );
00216 
00217             if (job == 0)
00218             {
00219                 break;
00220             } else {
00221                 post ( Event::JobStarted, job );
00222                 job->execute (this);
00223                 post ( Event::JobFinished, job );
00224             }
00225         }
00226 
00227         post (        Event::ThreadExiting );
00228     }
00229 
00230     void Thread::post (Event::Action a, Job *j)
00231     {
00232         m_parent->post ( a, this, j);
00233     }
00234 
00235     void Thread::msleep(unsigned long msec)
00236     {
00237         QThread::msleep(msec);
00238     }
00239 
00240     Weaver::Weaver(QObject* parent, const char* name,
00241                    int inventoryMin, int inventoryMax)
00242         : QObject(parent, name),
00243           m_active(0),
00244           m_inventoryMin(inventoryMin),
00245           m_inventoryMax(inventoryMax),
00246           m_shuttingDown(false),
00247           m_running (false),
00248           m_suspend (false),
00249           m_mutex ( new QMutex(true) )
00250     {
00251         lock();
00252 
00253         for ( int count = 0; count < m_inventoryMin; ++count)
00254         {
00255             Thread *th = new Thread(this);
00256             m_inventory.append(th);
00257             // this will idle the thread, waiting for a job
00258             th->start();
00259 
00260             emit (threadCreated (th) );
00261         }
00262 
00263         unlock();
00264     }
00265 
00266     Weaver::~Weaver()
00267     {
00268         lock();
00269 
00270         debug ( 1, "Weaver dtor: destroying inventory.\n" );
00271 
00272         m_shuttingDown = true;
00273 
00274         unlock();
00275 
00276         m_jobAvailable.wakeAll();
00277 
00278         // problem: Some threads might not be asleep yet, just finding
00279         // out if a job is available. Those threads will suspend
00280         // waiting for their next job (a rare case, but not impossible).
00281         // Therefore, if we encounter a thread that has not exited, we
00282         // have to wake it again (which we do in the following for
00283         // loop).
00284 
00285         for ( Thread *th = m_inventory.first(); th; th = m_inventory.next() )
00286         {
00287             if ( !th->finished() )
00288             {
00289                 m_jobAvailable.wakeAll();
00290                 th->wait();
00291             }
00292 
00293             emit (threadDestroyed (th) );
00294             delete th;
00295 
00296         }
00297 
00298         m_inventory.clear();
00299 
00300         delete m_mutex;
00301 
00302         debug ( 1, "Weaver dtor: done\n" );
00303 
00304     }
00305 
00306     void Weaver::lock()
00307     {
00308         debug ( 3 , "Weaver::lock: lock (mutex is %s).\n",
00309                 ( m_mutex->locked() ? "locked" : "not locked" ) );
00310         m_mutex->lock();
00311     }
00312 
00313     void Weaver::unlock()
00314     {
00315         m_mutex->unlock();
00316 
00317         debug ( 3 , "Weaver::unlock: unlock (mutex is %s).\n",
00318                 ( m_mutex->locked() ? "locked" : "not locked" ) );
00319     }
00320 
00321     int Weaver::threads () const
00322     {
00323         QMutexLocker l (m_mutex);
00324         return m_inventory.count ();
00325     }
00326 
00327     void Weaver::enqueue(Job* job)
00328     {
00329         lock();
00330 
00331         m_assignments.append(job);
00332         m_running = true;
00333 
00334         unlock();
00335 
00336         assignJobs();
00337     }
00338 
00339     void Weaver::enqueue (QPtrList <Job> jobs)
00340     {
00341         lock();
00342 
00343         for ( Job * job = jobs.first(); job; job = jobs.next() )
00344         {
00345             m_assignments.append (job);
00346         }
00347 
00348         unlock();
00349 
00350         assignJobs();
00351     }
00352 
00353     bool Weaver::dequeue ( Job* job )
00354     {
00355         QMutexLocker l (m_mutex);
00356         return m_assignments.remove (job);
00357     }
00358 
00359     void Weaver::dequeue ()
00360     {
00361         QMutexLocker l (m_mutex);
00362         m_assignments.clear();
00363     }
00364 
00365     void Weaver::suspend (bool state)
00366     {
00367         lock();
00368 
00369         if (state)
00370         {
00371             // no need to wake any threads here
00372             m_suspend = true;
00373             if ( m_active == 0 && isEmpty() )
00374             {   //  instead of waking up threads:
00375                 post (Event::Suspended);
00376             }
00377         } else {
00378             m_suspend = false;
00379             // make sure we emit suspended () even if all threads are sleeping:
00380             assignJobs ();
00381             debug (2, "Weaver::suspend: queueing resumed.\n" );
00382         }
00383 
00384         unlock();
00385     }
00386 
00387     void Weaver::assignJobs()
00388     {
00389         m_jobAvailable.wakeAll();
00390     }
00391 
00392     bool Weaver::event (QEvent *e )
00393     {
00394         if ( e->type() >= QEvent::User )
00395         {
00396 
00397             if ( e->type() == Event::type() )
00398             {
00399                 Event *event = (Event*) e;
00400 
00401                 switch (event->action() )
00402                 {
00403                     case Event::JobFinished:
00404                         if ( event->job() !=0 )
00405                         {
00406                             emit (jobDone (event->job() ) );
00407                         }
00408                         break;
00409                     case Event::Finished:
00410                         emit ( finished() );
00411                         break;
00412                     case Event::Suspended:
00413                         emit ( suspended() );
00414                         break;
00415                     case Event::ThreadSuspended:
00416                         if (!m_shuttingDown )
00417                         {
00418                             emit (threadSuspended ( event->thread() ) );
00419                         }
00420                         break;
00421                     case Event::ThreadBusy:
00422                         if (!m_shuttingDown )
00423                         {
00424                             emit (threadBusy (event->thread() ) );
00425                         }
00426                         break;
00427                     default:
00428                         break;
00429                 }
00430 
00431                 if ( event->job() !=0 )
00432                 {
00433                     event->job()->processEvent (event);
00434                 }
00435             } else {
00436                 debug ( 0, "Weaver::event: Strange: received unknown user event.\n" );
00437             }
00438             return true;
00439         } else {
00440             // others - please make sure we are a QObject!
00441             return QObject::event ( e );
00442         }
00443     }
00444 
00445     void Weaver::post (Event::Action a, Thread* t, Job* j)
00446     {
00447         Event *e = new Event ( a, t, j);
00448         QApplication::postEvent (this, e);
00449     }
00450 
00451     bool Weaver::isEmpty() const
00452     {
00453         QMutexLocker l (m_mutex);
00454         return  m_assignments.count()==0;
00455     }
00456 
00457     Job* Weaver::applyForWork(Thread *th, Job* previous)
00458     {
00459         Job *rc = 0;
00460         bool lastjob = false;
00461         bool suspended = false;
00462 
00463         while (true)
00464         {
00465             lock();
00466 
00467             if (previous != 0)
00468             {   // cleanup and send events:
00469                 --m_active;
00470 
00471                 debug ( 3, "Weaver::applyForWork: job done, %i jobs left, "
00472                         "%i active jobs left.\n",
00473                         queueLength(), m_active );
00474 
00475                 if ( m_active == 0 && isEmpty() )
00476                 {
00477                     lastjob = true;
00478                     m_running = false;
00479                     post (Event::Finished);
00480                     debug ( 3, "Weaver::applyForWork: last job.\n" );
00481                 }
00482 
00483                 if (m_active == 0 && m_suspend == true)
00484                 {
00485                     suspended = true;
00486                     post (Event::Suspended);
00487                     debug ( 2, "Weaver::applyForWork: queueing suspended.\n" );
00488                 }
00489 
00490                 m_jobFinished.wakeOne();
00491             }
00492 
00493             previous = 0;
00494 
00495             if (m_shuttingDown == true)
00496             {
00497                 unlock();
00498 
00499                 return 0;
00500             } else {
00501                 if ( !isEmpty() && m_suspend == false )
00502                 {
00503                     rc = m_assignments.getFirst();
00504                     m_assignments.removeFirst ();
00505                     ++m_active;
00506 
00507                     debug ( 3, "Weaver::applyForWork: job assigned, "
00508                             "%i jobs in queue (%i active).\n",
00509                             m_assignments.count(), m_active );
00510                     unlock();
00511 
00512                     post (Event::ThreadBusy, th);
00513 
00514                     return rc;
00515                 } else {
00516                     unlock();
00517 
00518                     post (Event::ThreadSuspended, th);
00519                     m_jobAvailable.wait();
00520                 }
00521             }
00522         }
00523     }
00524 
00525     int Weaver::queueLength()
00526     {
00527         QMutexLocker l (m_mutex);
00528         return m_assignments.count();
00529     }
00530 
00531     bool Weaver::isIdle () const
00532     {
00533         QMutexLocker l (m_mutex);
00534         return isEmpty() && m_active == 0;
00535     }
00536 
00537     void Weaver::finish()
00538     {
00539         while ( !isIdle() )
00540         {
00541             debug (2, "Weaver::finish: not done, waiting.\n" );
00542             m_jobFinished.wait();
00543         }
00544         debug (1, "Weaver::finish: done.\n\n\n" );
00545     }
00546 
00547 }
00548 }
00549 
00550 #include "weaver.moc"
KDE Home | KDE Accessibility Home | Description of Access Keys