#ifndef PN_ASYNC_WORKER_H #define PN_ASYNC_WORKER_H 1 #include ////////////////////////////////////////////////////////////////////// //! @file async-worker.h //! @author Oliver 'kfs1' Smith //! @brief Headers for the async::workload_t classes. //! @see async //! @example async-Dispatch-example.cpp //! An example of using the async::dispatch_t class to //! offload work to a database. //! @example async-Worker-example.cpp //! An example of using the async::worker_t class for //! parallelization. //! @example async-StaticWorker-example.cpp //! An example of using the async::static_worker_t class //! for parallelization. // // Copyright (c) 2010, Oliver 'kfs1' Smith // All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions // are met: // // - Redistributions of source code must retain the above copyright notice, // this list of conditions and the following disclaimer. // // - Redistributions in binary form must reproduce the above copyright notice, // this list of conditions and the following disclaimer in the documentation // and/or other materials provided with the distribution. // // - Neither the name of KingFisher Software nor the names of its contributors // may be used to endorse or promote products derived from this software // without specific prior written permission. // // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT // HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED // TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR // PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF // LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING // NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // /*! * @namespace async * @brief Message-passing asynchronous worker classes. * @author Oliver 'kfs1' Smith * * Overview * * The Async Workers library is a small collection of classes * that leverage ZeroMQ (http://www.zeromq.org/) and the * OpenThreads(*) library (http://openthreads.sourceforge.net/) * to provide a convenient means of implementing offloading * of asynchronous workloads. * * (* OpenThreads is used for the creation and destruction of * threads. If OpenThreads is not available for your * platform or not an option for your project, it will be * a trivial few-minutes work to make the WorkerPool use * your native API) * * It can also be used to implement a very efficient, Erlang- * style messaging-based form of parallelism, similar to the * "pragma task" feature of OpenMP 3.0. * * Async Workers creates a pool of worker threads, one for * each available CPU core. Each thread calls zmq_recv(), * and then waits in an OS-scheduler-friendly IO wait for * a message to arrive. * * Work is delivered to a thread as a pointer to an instance * of an object derived from one of the base classes, eliminating * copying and allowing data to remain "hot" in CPU cache if a * thread is available to process it immediately. * * Using Async Workers * * Encpsulate your work load in one of the following base * clases or see async::workload_t for the fundamental base * class if you need to derive something different. * * Implement the "Work()" function with the workload you want * doing; Use the Queue() function to dispatch the work to * a worker thread. * * By default workers use the delete method on themselves once * used, to override this default behavior, e.g. for a local * stack instance, overload the Destroy() member function. * * Retrieving results * * To collect results from workers or re-use the objects, use * the async::worker_t or async::static_worker_t class as * your base, and implement both the Work() and Result() virtual * functions. * * When the work has been executed, the worker thread will send * the object back where they will be retrieved when you call * async::GetResults(). * * @note async::GetResults() will block until all dispatched * returnable objects have been both executed, returned and * processed. This includes workloads that the worker pool * has not yet received. * * Base classes * * - async::dispatch_t * Provides a dispatch-only workload; assumes the object * is dynamically allocated via new and defaults to automatic * destruction of the object via delete in the Destroy() * operator. * * - async::worker_t * Encapsulates a workload such as a reduction which can be * executed in parallel but which must be returned to the * parent thread to complete execution (member Result()). * Assumes dynamic allocation via new and defaults to automatic * destruction of the object via delete in the Destroy() * operator. * * - async::static_worker_t * Provides an async::worker_t with a no-op Destroy() * operator suitable for static or stack-allocated usage. * * * Notes and caveats * * - Always try to marshall all the data your work will need * to operate on. * * - Avoid small workloads: While the work will likely be * executed on a separate CPU/core in parallel to the * originating code, but the workload has to more than * exceed the overhead of the message passing. * Aim for around 1000+ cpu cycles of work, not including * data preparation etc. * * - Use pointers with care lest someone else modify or, worse, * free the resource pointed to. * * - Avoid passing indirect references to data/items that are * not thread safe (such as a connection identifier in a * server process) or that requires a mutable lookup. On a * multi-core system it is likely to prove more efficient * to expend the extra cpu cycles packaging the data into * your workload object. * */ namespace async { // Functions //! Dispatch a worker instance for processing and deletion by a worker thread. //! workload_t::Queue(new MyWorker(...)) is the preferred method for //! dispatching background work. //! @warning Be aware that the worker object will call the "Destroy()" //! @warning member function to delete itself or perform custom behavior. //! @warning Work occurs in the background, so all operations MUST be thread-safe. //! @param[in] instance Pointer to the entity to process. extern void Queue(const class workload_t* const instance) ; //! Retrieve all currently pending results. Blocks the caller //! until all work loads that were marked to return results //! have completed and the results have been retrieved. //! @note Blocks until all dispatched returnable work //! has been completed, returned and processed. //! @return true/false whether any results were retrieved. extern bool GetResults() ; //! Get a count of how many results are pending. //! @return number of outstanding results expected. extern size_t PendingResults() ; class workload_t { public: // Constructor. workload_t() {} public: //! Dispatch this worker instance for processing and deletion by a worker thread. //! MyWorker* worker = new MyWorker(...) ; //! ... initialization ... //! worker->Queue() ; //! @see workload_t::Queue() void Queue() const { async::Queue(this) ; } public: //! Abstract virtual function to be implemented in derived class. //! All operations within must be thread-safe. virtual void Work() = 0 ; public: //! Virtual function for handling results returned to the parent //! thread. If your derived class needs to do something with the //! workload when it returns, use this function. //! @note Not constant because you are the owner. virtual void Result() {} public: //! Returns true/false whether the worker needs to be sent //! back to the parent thread. virtual bool HasResults() const { return false ; } public: //! Overload to not-call delete if you are using your own //! allocation. virtual void Destroy() const { delete const_cast(this) ; } } ; //! @class async::dispatch_t //! @see async::worker_t //! @see async::static_worker_t //! Describes a workload that does not need to return any results, //! that will be dispatched as a heap-allocated object. //! Derived classes must implement the "virtual void Work()" member function. //! @warning Assumes heap allocation with 'new'. //! @warning Instances will attempt to 'delete' themseleves upon //! completion. If this behavior is not desired, overload //! the 'Destroy' member function. class dispatch_t : public workload_t { public: //! Constructor. dispatch_t() : workload_t() {} } ; //! @class worker_t //! @see async::FireAndForget //! @see async::static_worker_t //! @see async::GetResults() //! @see async::PendingResults() //! Describes a workload that returns results to be consumed and //! processed by the caller, that will be dispatched as a heap- //! allocated object. //! Once dispatched, each async::worker_t increments the pending //! result counter. To retrieve completed workloads, call //! async::GetResults(). //! Implement member function "void Work()" with your //! workload, and implement "void Result()" with your //! code for receiving the returned results. //! @warning Assumes heap-allocation with 'new'. //! @warning Instances will attempt to 'delete' themseleves upon //! completion. If this behavior is not desired, use //! async::static_worker_t instead. class worker_t : public workload_t { public: //! Constructor worker_t() : workload_t() {} public: virtual bool HasResults() const { return true ; } } ; //! @class static_worker_t //! @see async::FireAndForget //! @see async::worker_t //! @see async::GetResults() //! @see async::PendingResults() //! Describes a workload that returns results to be consumed and //! processed by the caller, that will be dispatched as a local //! object. //! Once dispatched, each async::static_worker_t increments the //! pending result counter. To retrieve completed workloads, call //! async::GetResults(). //! Implement member function "void Work()" with your //! workload, and implement "void Result()" with your //! code for receiving the returned results. //! @warning Assumes local (stack) allocation, and does not //! attempt to self-delete. //! @warning If this behavior is not desired, see the //! async::worker_t class instead. class static_worker_t : public workload_t { public: //! Constructor static_worker_t() : workload_t() {} public: virtual bool HasResults() const { return true ; } public: virtual void Destroy() const { } } ; } // namespace async #endif // PN_ASYNC_WORKER_H