An example of using the async::worker_t class for parallelization.
// ZeroMQ / async::worker_T example. // Oliver 'kfs1' Smith <oliver@kfs.org> // Uses clock_gettime which, under linux, requires -lrt // Because the example contains a lot of additional // code (for benchmarking) I have highlighted the key // sections of the main() function with lines of ///s. #include "async-worker.h" // The Asyc:: classes. #include <vector> // For std::vector. #include <stdio.h> // For printf #include <stdlib.h> // For rand #include <math.h> // For sin, cos, etc #include <time.h> // Timing functions. #include <stdint.h> // For uint64_t. typedef uint64_t Number ; typedef std::vector< Number > Numbers ; class CrunchNumbersRange : public async::worker_t { public: CrunchNumbersRange(const Numbers::iterator& start, const Numbers::iterator& end, Number* finalDestination) : worker_t() , m_start(start) , m_end(end) , m_sum(0) , m_finalDestination(finalDestination) {} public: virtual void Work() { Numbers::iterator it ; for ( it = m_start ; it != m_end ; ++it ) { // I'm making the calculation non-trivial to increase // processing time. double value = *it ; value = (value * 321.0234) + sin((float)value) + cos(std::min((float)value, (float)1.0)) + abs((float)value / (float)value) ; int x = (int)value % 2000000 ; value = value + (float) x ; x = (int)value % 3000000 ; value = (value + (float) x) / 3 ; x = (int)value % 4000000 ; value += x ; m_sum += (int)(value / (2 * 321)) % 999 + abs(sin((float)value / 100)) + abs(cos((float)value / 251)) ; } } public: virtual void Result() { // Add our calculated value to the accumulator. *m_finalDestination += m_sum ; } private: Numbers::iterator m_start, m_end ; private: mutable Number m_sum ; private: Number* m_finalDestination ; } ; #ifdef SHORT_VERSION int main(int argc, const char* const argv[]) { static const size_t NumberOfElements = 20000000 ; static const size_t GroupSize = 8192 ; Numbers numbers ; numbers.resize(NumberOfElements) ; for ( size_t i = 0 ; i < NumberOfElements ; ++i ) { numbers[i] = (rand() & 65535) + 1 ; } uint64_t parallelResult = 0 ; // Dispatch groups of numbers to workers. Numbers::iterator it = numbers.begin() ; do { Numbers::iterator end = std::min(it + GroupSize, numbers.end()) ; async::Queue(new CrunchNumbersRange(it, end, ¶llelResult)) ; it = end ; } while ( it != numbers.end() ) ; // Wait for all the results, calling Result() on each // returned object to produce a total. async::GetResults() ; printf("Done. Calculated sum as %lu.\n", (unsigned long int)parallelResult) ; return 0 ; } #endif // Calculate miliseconds between two timespecs. static unsigned long time_delta(const timespec& start, const timespec& end) { uint64_t usec = ((end.tv_sec * 1000000) + end.tv_nsec / 1000) - ((start.tv_sec * 1000000) + start.tv_nsec / 1000) ; return (unsigned long)(usec / 1000) ; } int main(int argc, const char* const argv[]) { // First, we create a large array of numbers. static const size_t NumberOfElements = 20000000 ; printf("Creating vector of %u numbers\n", NumberOfElements) ; // Storage for calculated results. Number serialResult = 0, parallelResult = 0 ; // Storage for array of numbers. Numbers numbers ; numbers.resize(NumberOfElements) ; for ( size_t i = 0 ; i < NumberOfElements ; ++i ) { numbers[i] = (rand() & 65535) + 1 ; } // Create a local instance so we can call Work directly. CrunchNumbersRange serial(numbers.begin(), numbers.end(), &serialResult) ; // Timers for each step. timespec timeStart, timeEnd ; unsigned long serialMsec, dispatchMsec, collectMsec ; printf("Calculating sum serially...\n") ; clock_gettime(CLOCK_MONOTONIC, &timeStart) ; serial.Work() ; serial.Result() ; clock_gettime(CLOCK_MONOTONIC, &timeEnd) ; serialMsec = time_delta(timeStart, timeEnd) ; printf("Done. Calculated sum as %lu. Took %lums.\n", (unsigned long int)serialResult, serialMsec) ; Numbers::iterator it = numbers.begin() ; static const size_t GroupSize = 8192 ; printf("Dispatching %u workloads to background threads in groups of %u numbers per workload.\n", NumberOfElements / GroupSize, GroupSize) ; clock_gettime(CLOCK_MONOTONIC, &timeStart) ; // Dispatch groups of numbers to workers. do { Numbers::iterator end = std::min(it + GroupSize, numbers.end()) ; async::Queue(new CrunchNumbersRange(it, end, ¶llelResult)) ; it = end ; } while ( it != numbers.end() ) ; clock_gettime(CLOCK_MONOTONIC, &timeEnd) ; dispatchMsec = time_delta(timeStart, timeEnd) ; printf("Done. Took %lums.\n", dispatchMsec) ; printf("Collecting results.\n") ; clock_gettime(CLOCK_MONOTONIC, &timeStart) ; // Wait for all the results, calling Result() on each // returned object to produce a total. async::GetResults() ; clock_gettime(CLOCK_MONOTONIC, &timeEnd) ; collectMsec = time_delta(timeStart, timeEnd) ; printf("Done. Calculated sum as %lu. Took %lums.\n", (unsigned long int)parallelResult, collectMsec) ; unsigned long parallelMsec = dispatchMsec + collectMsec ; printf("Total time for parallel run: %lums.\n", parallelMsec) ; // If it's not even 25% faster ... something is wrong. if ( parallelMsec >= serialMsec * 75 / 100 ) { if ( parallelMsec > serialMsec ) printf("Parallel run took %lums longer than serial.", parallelMsec - serialMsec) ; printf("Try adjusting GroupSize to increase parallel performance.\n") ; } else { printf("Parallel run took %lums less than serial version. Yay :)\n", serialMsec - parallelMsec) ; } return 0 ; }