//////////////////////////////////////////////////////////////////////
// Copyright (c) 2010, Oliver 'kfs1' Smith <oliver@kfs.org>
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
//
// - Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// - Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// - Neither the name of KingFisher Software nor the names of its contributors
// may be used to endorse or promote products derived from this software
// without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
// TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
//////////////////////////////////////////////////////////////////////
// Implements the async::workload_t (background work execution process)
// using "ZeroMQ" to pass pointers to marshalled work objects.
//
// User derives work loads from subclasses of the async::workload_t class.
//

#include <zmq.hpp>
#include "async-worker.h"
#include "OpenThreads/Thread"
#include <vector>

static zmq::context_t zmqContext(1) ;
static const char zmqWorkSocketName[] = "inproc://async-work" ;
static const char zmqDoneSocketName[] = "inproc://async-done" ;

//! @internal
namespace async
{

//////////////////////////////////////////////////////////////////////
//! @internal Background worker threads consume pointers to
//! async::workload_t objects from a 0MQ socket, call the object->Work()
//! function and then either Destroy() the object or send the pointer
//! back to the parent thread as a result item.

class WorkerThread : public OpenThreads::Thread
{
public:
	//! @internal Worker threads receive and execute work payloads.
	WorkerThread() {}

public:
	//! @internal Implements the body of the worker threads.
	//! Essentially, each worker sits blocking on a
	//! recv() function, which is a threading-friendly
	//! kernel-recognizable block condition that avoids
	//! the overheads associatd with mutex queues etc.
	virtual void run()
	{
		zmq::socket_t inSocket(zmqContext, ZMQ_UPSTREAM) ;
		zmq::socket_t outSocket(zmqContext, ZMQ_DOWNSTREAM) ;
		inSocket.connect(zmqWorkSocketName) ;
		outSocket.connect(zmqDoneSocketName) ;

		while ( true )
		{
			zmq::message_t msg ;
			// We get an error when the context goes away.
			if ( !inSocket.recv(&msg, 0) )
			{
				return ;
			}
			// Empty message = stop running.
			if ( msg.size() == 0 || msg.data() == 0 )
			{
				return ;
			}

			async::workload_t* const instance = *(async::workload_t**)(msg.data()) ;
			if ( instance == NULL )
				continue ;

			// Incase the user tries to overload the HasResults function.
			const bool hasResults = instance->HasResults() ;
			instance->Work() ;

			// Send work-done reply.
			if ( hasResults )
			{
				outSocket.send(msg, 0) ;
			}
			else
				instance->Destroy() ;
		}
	}
} ;

//////////////////////////////////////////////////////////////////////
//! @internal WorkerPool implements a pool of N worker threads to
//! which async::workload_t objects can be dispatched for execution.

class WorkerPool
{
public:
	//! @internal Creates a pool of N threads for work execution via a 0MQ socket.
	WorkerPool(size_t numThreads)
		: m_threads()
		, m_sendSocket(zmqContext, ZMQ_DOWNSTREAM)
		, m_recvSocket(zmqContext, ZMQ_UPSTREAM)
	{
		assert( numThreads > 0 ) ;

		const size_t maxThreads = OpenThreads::GetNumberOfProcessors() * 2 ;
		if ( numThreads > maxThreads )
			numThreads = maxThreads ;

		m_sendSocket.bind(zmqWorkSocketName) ;
		m_recvSocket.bind(zmqDoneSocketName) ;

		// Create threads
		for ( size_t i = 0 ; i < numThreads ; ++i )
		{
			m_threads.push_back(new WorkerThread) ;
			m_threads.back()->start() ;
		}
	}

	//! @internal Destructor cancels all remaining threads.
	~WorkerPool()
	{
		// Empty message tells a client to shut down.
		for ( size_t i = 0 ; i < m_threads.size() ; ++i )
		{
			zmq::message_t emptyMsg ;
			m_sendSocket.send(emptyMsg, ZMQ_NOBLOCK) ;
			OpenThreads::Thread::YieldCurrentThread() ;
		}
		// tell the threads to shut down.
		for ( size_t i = 0 ; i < m_threads.size() ; ++i )
		{
			m_threads[i]->cancel() ;
		}

		OpenThreads::Thread::YieldCurrentThread() ;
	}

private:
	std::vector<OpenThreads::Thread*> m_threads ;

private:
	zmq::socket_t m_sendSocket ;
	zmq::socket_t m_recvSocket ;
	size_t pendingResults ;

public:
	//! @internal Dispatch a workload object to the thread pool.
	//! This is done by simply Send()ing a pointer.
	//! @attention I tried sending &instancePtr instead of
	//! having to construct storage in the message,
	//! but it didn't work.
	bool Send(const async::workload_t* const instancePtr)
	{
		const bool expectResults = instancePtr->HasResults() ;

		// Send pointer to this instance to the worker pool for one of the threads
		// to pick up.
		zmq::message_t msg(sizeof(instancePtr)) ;
		*(const async::workload_t**)msg.data() = instancePtr ;
		if ( !m_sendSocket.send(msg, 0) )
			return false ;

		if ( expectResults )
			++pendingResults ;

		return true ;
	}

	//! @internal Receive result objects from workers that
	//! require parent-thread processing/completion.
	void Recv()
	{
		zmq::message_t msg ;
		if ( !m_recvSocket.recv(&msg, 0) )
			return ;

		// Note: Result is not const.
		async::workload_t* const instancePtr = *(async::workload_t**)(msg.data()) ;
		if ( instancePtr != NULL )
		{
			instancePtr->Result() ;
			instancePtr->Destroy() ;
		}
		--pendingResults ;
	}

	//! @internal Retrieve all outstanding result objects.
	bool GetResults()
	{
		if ( pendingResults == 0 )
			return false ;
		// Receive all outstanding results.
		while ( pendingResults > 0 )
		{
			Recv() ;
		}
		return true ;
	}

	//! @internal Get the count of pending result objects.
	size_t PendingResults() const
	{
		return pendingResults ;
	}
} ;

//////////////////////////////////////////////////////////////////////
// @internal
// Singleton, on-demand, workerpool creation. Creates enough threads
// to occupy all processors.

static WorkerPool& getWorkerPool()
{
	static WorkerPool workerPool(OpenThreads::GetNumberOfProcessors()) ;
	return workerPool ;
}

//////////////////////////////////////////////////////////////////////
// Dispatching a workload is as simple as Send()ing a pointer to it.
// ZeroMQ will handle delivery to one of the listening sockets for us.

void Queue(const async::workload_t* const workload)
{
	getWorkerPool().Send(workload) ;
}

//////////////////////////////////////////////////////////////////////
//! @internal Fetch all pending results. Because this uses stream IO,
//! it will block in a kernel-recognizable way, allowing more worker-
//! thread concurrency, providing a significant advantage over work-
//! stealing employed in systems such as OpenMP or Intel's TBB.

bool GetResults()
{
	return getWorkerPool().GetResults() ;
}

//////////////////////////////////////////////////////////////////////
//! @internal Get the number of responses that have not yet been consumed.

size_t PendingResults()
{
	return getWorkerPool().PendingResults() ;
}

} // namespace async

