//! @file async-StaticWorker-example.cpp
//! @brief ZeroMQ / async::static_worker_t example
//! @author Oliver 'kfs1' Smith <oliver@kfs.org>
//! @note Uses clock_gettime which, under linux, requires -lrt

#include "async-worker.h"		// The Asyc:: classes.
#include <vector>				// For std::vector.
#include <stdio.h>				// For printf
#include <stdlib.h>				// For rand
#include <math.h>				// For sin, cos, etc
#include <time.h>				// Timing functions.
#include <stdint.h>				// For uint64_t.


//! Typedef for the kind of numbers we will be working with.
typedef uint64_t Number ;

//! Shortcut for the vector we intend to work with.
typedef std::vector< Number > Numbers ;

//! @class CrunchNumbersRange

//! Class that demonstrates work-sharing through
//! message passing with async worker.
//!
//! static_worker_t does not delete the worker
//! objects in Result(), making it suitable for
//! use as a local variable.
//!
//! Crunches a range of numbers and produces a
//! result, which is then returned to the originator
//! via the Result() function.

class CrunchNumbersRange : public async::static_worker_t
{
	//! Constructor

public: CrunchNumbersRange()
			: static_worker_t()
			, m_sum(0)
			{}

	//! Execution function.
	//! Describes the work to be executed by the worker.
public:	virtual void Work()
		{
			Numbers::iterator it ;
			for ( it = m_start ; it != m_end ; ++it )
			{
				// I'm making the calculation non-trivial to increase
				// processing time.
				double value = *it ;
				value = (value * 321.0234) + sin((float)value) + cos(std::min((float)value, (float)1.0)) + abs((float)value / (float)value) ;
				int x = (int)value % 2000000 ;
				value = value + (float) x ;
				x = (int)value % 3000000 ;
				value = (value + (float) x) / 3 ;
				x = (int)value % 4000000 ;
				value += x ;
				m_sum += (int)(value / (2 * 321)) % 999 + abs(sin((float)value / 100)) + abs(cos((float)value / 251)) ;
			}
		}

	//! Result processor.
	//! Note: Result()s are executed <em> in serial </em> when you
	//! call async::GetResults().
	//! @note Not const.
public: virtual void Result()
	{
		// Add our calculated value to the accumulator.
		*m_finalDestination += m_sum ;
	}

	//! Copy of the iterators.
public: Numbers::iterator m_start, m_end ;
	//! Value calculated
private: mutable Number m_sum ;
	//! Pointer for storing the value in Result()
public: Number* m_finalDestination ;

} ;

// Calculate miliseconds between two timespecs.
static unsigned long time_delta(const timespec& start, const timespec& end)
{
	uint64_t usec = ((end.tv_sec * 1000000) + end.tv_nsec / 1000) - ((start.tv_sec * 1000000) + start.tv_nsec / 1000) ;
	return (unsigned long)(usec / 1000) ;
}

int main(int argc, const char* const argv[])
{
	// First, we create a large array of numbers.
	static const size_t NumberOfElements = 20000000 ;
	printf("Creating vector of %u numbers\n", NumberOfElements) ;

	// Storage for calculated results.
	Number serialResult = 0, parallelResult = 0 ;
	// Storage for array of numbers.
	Numbers numbers ;

	numbers.resize(NumberOfElements) ;
	for ( size_t i = 0 ; i < NumberOfElements ; ++i )
	{
		numbers[i] = (rand() & 65535) + 1 ;
	}

	// Create a local instance so we can call Work directly.
	CrunchNumbersRange serial ;
	serial.m_start = numbers.begin() ;
	serial.m_end = numbers.end() ;
	serial.m_finalDestination = &serialResult ;

	// Timers for each step.
	timespec timeStart, timeEnd ;
	unsigned long serialMsec, dispatchMsec, collectMsec ;

	printf("Calculating sum serially...\n") ;
	clock_gettime(CLOCK_MONOTONIC, &timeStart) ;

		serial.Work() ;
		serial.Result() ;

	clock_gettime(CLOCK_MONOTONIC, &timeEnd) ;
	serialMsec = time_delta(timeStart, timeEnd) ;
	printf("Done. Calculated sum as %lu. Took %lums.\n", (unsigned long int)serialResult, serialMsec) ;

	Numbers::iterator it = numbers.begin() ;
	static const size_t GroupSize = 8192 ;

	std::vector<CrunchNumbersRange> workloads ;
	workloads.resize((NumberOfElements + GroupSize - 1) / GroupSize) ;

	printf("Dispatching %u workloads to background threads in groups of %u numbers per workload.\n", NumberOfElements / GroupSize, GroupSize) ;

	clock_gettime(CLOCK_MONOTONIC, &timeStart) ;

		std::vector<CrunchNumbersRange>::iterator workload ;
		for ( workload = workloads.begin() ; workload != workloads.end() ; ++workload )
		{
			Numbers::iterator end = std::min(it + GroupSize, numbers.end()) ;
			(*workload).m_start = it ;
			(*workload).m_end = end ;
			(*workload).m_finalDestination = &parallelResult ;
			async::Queue(&(*workload)) ;
			it = end ;
		}

	clock_gettime(CLOCK_MONOTONIC, &timeEnd) ;
	dispatchMsec = time_delta(timeStart, timeEnd) ;
	printf("Done. Took %lums.\n", dispatchMsec) ;
	printf("Collecting results.\n") ;
	clock_gettime(CLOCK_MONOTONIC, &timeStart) ;

		async::GetResults() ;

	clock_gettime(CLOCK_MONOTONIC, &timeEnd) ;
	collectMsec = time_delta(timeStart, timeEnd) ;

	printf("Done. Calculated sum as %lu. Took %lums.\n", (unsigned long int)parallelResult, collectMsec) ;

	unsigned long parallelMsec = dispatchMsec + collectMsec ;
	printf("Total time for parallel run: %lums.\n", parallelMsec) ;

	// If it's not even 25% faster ... something is wrong.
	if ( parallelMsec >= serialMsec * 75 / 100 )
	{
		if ( parallelMsec > serialMsec )
			printf("Parallel run took %lums longer than serial.", parallelMsec - serialMsec) ;
		printf("Try adjusting GroupSize to increase parallel performance.\n") ;
	}
	else
	{
		printf("Parallel run took %lums less than serial version. Yay :)\n", serialMsec - parallelMsec) ;
	}

	return 0 ;
}

