Shapejam

Trying to follow Buckminster Fuller's example in life, one step at a time to bring you brain scrapings for a new millennium.

Parallel Programming with Parallels

07 Apr 2013 - Nic Ho Chee

I've been a Microsoft Windows user for a while, and after some mishaps with traditional Windows laptops ended up migrating to running Windows on a Macbook using Bootcamp. Recently some of my hardware was obsoleted on Windows 7 so I decided to go the whole hog and try OSX for my daily needs with Windows running as a virtual machine through Parallels for any programming tasks needing Visual Studio. After using this setup for a few months, I was surprised by how quickly it ran my compiled programs, I wasn't noticing the difference in responsiveness that I was expecting so I thought I'd benchmark a known problem so I could get a sense of how fast virtualisation actually was...

My initial idea was to try benchmarking a parallelisable algorithm under a variety of implementations on Windows and OSX to get a feel for what the difference in running times were like between the native(-ish) OSX and virtualised Windows implementations. I settled on a simple Kohonen neural network implementation as its eminently parallelisable and i'd be able to quickly scrape some data to run through it. My initial idea was to test implementations using the following patterns/APIs:

The Results...

The table below shows the results generated averaged over 5 runs for 2D Kohonen networks with 10x10 and 100x100 nodes against a training set of 320 groups of 7 greyscale values from 0x00 to 0xFF. All timings are in milliseconds:

10 x 10 nodes 100 x 100 nodes
Naive Implementation, Windows 56.25306 ms 4774.4 ms
Naive Optimised Implementation, Windows 2.35398 ms 168.0156 ms
Parallel Patterns Implementation, Windows 4.26502 ms 251.52 ms
Naive Implementation, Mac OSX 71.952 ms 7018.11 ms
Naive Optimised Implementation, Mac OSX 1.8228 ms 104.171 ms
OpenCL Implementation, Mac OSX 23.0954 ms 24.6672 ms

A few things jump out from this:

How It Was Done

In overview, the training algorithms were split into a common class and a derived class which carried out the various test permutation specialisations. The code was put together in a relatively quick-and-dirty fashion for me to prototype the various implementations in a decent time, I would need to look at cleaner implementations with more error checking if I went on to use this code for anything else. Here is the base training class:

//
// Trainer.h
// Simple trainer base class.
//

#pragma once

#include "Network.h"
#include "Distance.h"
#include "WeightVector.h"
#include "TrainingSet.h"

#define D_NUM_TRAINING_ITERATIONS	1000

class Trainer
{
	/// Reference to the node setup.
	Network* m_network;

	/// Current step in the training set.
	unsigned int m_step;

	/// Total number of iterations to run.
	unsigned int m_iterations;

	/// Get a neighbourhood scalar value based on distance to the winner.
	virtual double 
		GetNeighbourhoodValue( 
			Node<Weight<double>> const& winner, 
			Node<Weight<double>> const& neighbour ) = 0; 

	/// Calculate a simple distance check from a given position.
	virtual double 
		CalculateDistance( 
			WeightVector<Weight<double>> const& weights, 
			const int x, 
			const int y );

protected:

	/// Get the current node setup.
	Network*
		GetNetwork( void );
	
	/// Get the total number of iterations for this trainer
	const unsigned int 
		GetIterations( void ) const;

	/// Get a simple learning coefficient which decreases with some 
	/// function as the number of steps increases.
	virtual const double 
		GetLearningCoefficient( void ) const;
	
public:

	/// Constructor which takes the node setup and number of iterations 
	/// we're expecting to run.
	Trainer( 
		Network* network, 
		const unsigned int iterations = D_NUM_TRAINING_ITERATIONS );

	/// Its the destructor.
	~Trainer( void );

	/// Run a complete set of iterations as training steps.
	virtual void 
		Train( 
			TrainingSet const* trainingSet ) = 0;

	/// Run a single iteration of training.
	virtual void 
		Iterate( 
			TrainingSet const* trainingSet ) = 0;
};
...and here is the derived class for the naive implementation:
// SimpleTrainer.h
// Simple, very dumb training algorithm for a Kohonen network
// Wv(s + 1) = Wv(s) + Θ(u, v, s) α(s)(D(t) - Wv(s))
//

#pragma once

#include "Trainer.h"
#include "TrainingSet.h"
#include "PerformanceCounter.h"

/// Rough and ready Kohonen network trainer.
class SimpleTrainer : public Trainer
{
protected: 

	/// Get a simple neighbourhood scalar value based on distance to the winner.
	virtual double 
		GetNeighbourhoodValue( 
			Node<Weight<double>> const& winner, 
			Node<Weight<double>> const& neighbour );

	/// Calculate the winning node closest to the incoming set of weights.
	virtual Node<Weight<double>>* 
		CalculateBestMatchingNode( 
			WeightVector<Weight<double>> const& weights );

	/// Calculate the next step value after finding a winning node.
	void CalculateIncrement( 
		const WeightVector<Weight<double>>& weights, 
		const Node<Weight<double>>& winner );

public:

	/// Constructor, takes the node network and the number of iterations to run for.
	SimpleTrainer( 
		Network* network, 
		const unsigned int iterations );
	
	/// Run a complete set of iterations as training steps.
	virtual void 
		Train( 
			TrainingSet const* trainingSet );

	/// Run a single iteration of training.
	virtual void 
		Iterate( 
			TrainingSet const* trainingSet );
};

The Parallel Pattern based implementation is pretty simple, and whilst there is some locking in there it doesn't look like this is the cause of its tardiness compared to the other implementations. There may be a need for specific programming practices to make any use of it which aren't immediately obvious from reading the MSDN overview and API documentation which is a shame as you would hope there would be a scheduler to handle this in a similar manner as with GPGPU programming.

// ParallelTrainer.cpp
// 

#include "stdafx.h"
#include "ParallelTrainer.h"

#include <ppl.h>
#include <atomic>
#include <thread>

using namespace concurrency;

ParallelTrainer::ParallelTrainer( 
	Network* network, 
	const unsigned int iterations ) : 
		SimpleTrainer( network, iterations )
{
}

void 
	ParallelTrainer::Train( 
		TrainingSet const* trainingSet )
{
	PerformanceCounter counter( __FILE__, __FUNCTION__, "" );

	Trainer::Train( trainingSet );

	for (unsigned int i = 0; i < GetIterations(); ++i)
	{
		Iterate( trainingSet );
	}
}

void 
	ParallelTrainer::Iterate( 
		TrainingSet const* trainingSet )
{
	PerformanceCounter counter( __FILE__, __FUNCTION__, "" );

	unsigned int count = trainingSet->GetCount();

	for (unsigned int i = 0; i < count; ++i)
	{
		WeightVector<Weight<double>> const& weights = *(trainingSet->GetWeights(i));

		Node<Weight<double>>* node = CalculateBestMatchingNode( weights );
		CalculateIncrement( weights, *node );		
	}
}

Node<Weight<double>>* 
	ParallelTrainer::CalculateBestMatchingNode( 
		WeightVector<Weight<double>> const& weights )
{
	unsigned int nodeCount = GetNetwork()->GetCount();
	double minDistance = DBL_MAX;
	Node<Weight<double>>* winner = NULL;
	std::atomic_flag lock = ATOMIC_FLAG_INIT;

	parallel_for((unsigned int) 0, nodeCount, [&](unsigned int i){
		Node<Weight<double>>* node = GetNetwork()->GetNode( i );
		double distance = node->CalculateDistance( weights );

		while (lock.test_and_set(std::memory_order_acquire));
		if ( distance < minDistance )
		{
			winner = node;
			minDistance = distance;
		}
		lock.clear(std::memory_order_release); 
	});

	return ( winner );
}

void 
	ParallelTrainer::CalculateIncrement( 
		const WeightVector<Weight<double>>& weights, 
		const Node<Weight<double>>& winner )
{
	unsigned int nodeCount = GetNetwork()->GetCount();
	
	parallel_for((unsigned int) 0, nodeCount, [&](unsigned int i){

		Node<Weight<double>>* node = GetNetwork()->GetNode( i );
		double scale = GetNeighbourhoodValue( winner, *node ) * GetLearningCoefficient();
		const unsigned int count = node->GetWeights().Count();

		for ( unsigned int j = 0; j < count; ++j )
		{
			double value = ( winner.GetWeights().operator[](j)->Get() - node->GetWeights().operator[](j)->Get() ) * scale;
			node->GetWeights().operator[](j)->Set( value );
		}
	});
}

Finally the OpenCL implementation is very much a C-like implementation, with data unpacked from OO C++ structures into memory arrays to be dumped into GPU memory. The header file:

//
//  ClTrainer.h
//  ParallelTest
//

#ifndef __ParallelTest__ClTrainer__
#define __ParallelTest__ClTrainer__

#pragma once

#include <iostream>
#include <OpenCL/opencl.h>

#include "SimpleTrainer.h"

/// OpenCL based Kohonen network trainer.
class ClTrainer : public SimpleTrainer
{
    /// Identifies a specific OpenCL device.
    cl_device_id        m_deviceId;
    
    /// An OpenCL context which groups commands, programs etc.
    cl_context         m_context;
    
    /// An ordered set of commands to carry out for various program objects.
    cl_command_queue    m_commandQueue;
    
    /// A given OpenCL program which can contain one or more kernels.
    cl_program          m_program;
    
    /// A kernel function which is used to find the node with the smallest distance given some input weights.
    cl_kernel           m_distanceKernel;
    
    /// A kernel function which is used to calculate the incremented state of the network.
    cl_kernel           m_incrementKernel;
    
    /// Is everything ready to use initialised
    bool                m_initialised;
    
public:
    
	/// Constructor, takes the node network and the number of iterations to run for.
	ClTrainer(
              Network* network,
              const unsigned int iterations );
	
	/// Destructor, tear down the clTrainer.
	~ClTrainer( void );
    
	/// Run a complete set of iterations as training steps.
	virtual void
        Train(
              TrainingSet const* trainingSet );
    
	/// Run a single iteration of training.
	virtual void
        Iterate(
                TrainingSet const* trainingSet );
};

#endif /* defined(__ParallelTest__ClTrainer__) */
...and the implementation of the header is shown below. Unpacking data to be sent on to the GPU completely breaks the object oriented nature of programming. The hope is that with a unified memory architecture and some form of PS2 -style DMA packet stitching a machine of the future would be able to make use of GPGPU without having to unpack and repack memory in order to have access to a large number of parallel processes.
//
//  ClTrainer.cpp
//  ParallelTest
//

#include "ClTrainer.h"
#include <assert.h>

const char* ProgramKernels = "\n" \
"__kernel void kohonenDistance(                                                                 \n" \
"   __global double* weights,                                                                   \n" \
"   __global double* nodeWeights,                                                               \n" \
"   __global double* distance,                                                                  \n" \
"   unsigned int weightCount)                                                                   \n" \
"{                                                                                              \n" \
"   int nodeWeightId = get_global_id(0);                                                        \n" \
"   double euclidDistance = 0.0;                                                                \n" \
"   for (unsigned int i = 0; i < weightCount; ++i)                                              \n" \
"   {                                                                                           \n" \
"       double a = weights[i] - nodeWeights[weightCount*nodeWeightId+i];                        \n" \
"       euclidDistance += (a * a);                                                              \n" \
"   }                                                                                           \n" \
"   euclidDistance = sqrt(euclidDistance);                                                      \n" \
"   distance[nodeWeightId] = euclidDistance;                                                    \n" \
"}                                                                                              \n" \
"__kernel void kohonenIncrement(                                                                \n" \
"   __global double* winner,                                                                    \n" \
"   __global double* nodeWeights,                                                               \n" \
"   __global int* nodePositions,                                                                \n" \
"   double learningCoefficient,                                                                 \n" \
"   int winnerX,                                                                                \n" \
"   int winnerY,                                                                                \n" \
"   unsigned int weightCount)                                                                   \n" \
"{                                                                                              \n" \
"   int nodeWeightId = get_global_id(0);                                                        \n" \
"   const double oneOverSix = 1.0/6.0;                                                          \n" \
"   const double oneOverThree = 1.0/3.0;                                                        \n" \
"   double x = winnerX - nodePositions[nodeWeightId*2];                                         \n" \
"   double y = winnerY - nodePositions[nodeWeightId*2+1];                                       \n" \
"   double euclidDistance = sqrt( x*x + y*y );                                                  \n" \
"   double scale = 0.0;                                                                         \n" \
"   if ( euclidDistance < 6.0 )                                                                 \n" \
"   {                                                                                           \n" \
"       euclidDistance *= oneOverSix;                                                           \n" \
"       scale = 1.0 - pow( euclidDistance, oneOverThree );                                      \n" \
"   }                                                                                           \n" \
"   for (unsigned int i = 0; i < weightCount; ++i)                                              \n" \
"   {                                                                                           \n" \
"       double a = winner[i] - nodeWeights[weightCount*nodeWeightId+i];                         \n" \
"       a *= scale * learningCoefficient;                                                       \n" \
"       nodeWeights[weightCount*nodeWeightId+i] += a;                                            \n" \
"   }                                                                                           \n" \
"}                                                                                              \n" \
"\n";

ClTrainer::ClTrainer(
                     Network* network,
                     const unsigned int iterations ) :
                        SimpleTrainer( network, iterations )
{
    int result = 0;
    bool useGpu = true; //If it fails it will crash the GPU, test with the CPU before doing this!
    m_initialised = false;
    
    m_program = 0;
    m_distanceKernel = 0;
    m_incrementKernel = 0;
    m_commandQueue = 0;
    m_context = 0;
    m_deviceId = 0;
    
    // Get the required device id, we need this to setup the rest of the CL system.
    result = clGetDeviceIDs(NULL, useGpu ? CL_DEVICE_TYPE_GPU : CL_DEVICE_TYPE_CPU, 1, &m_deviceId, NULL);
    
    if (result == CL_SUCCESS)
    {
        m_context = clCreateContext(0, 1, &m_deviceId, NULL, NULL, &result);
        
        if (m_context)
        {
            m_commandQueue = clCreateCommandQueue(m_context, m_deviceId, 0, &result);
            m_program = clCreateProgramWithSource(m_context, 1, (const char**)&ProgramKernels, NULL, &result);
            
            if (m_program)
            {
                result = clBuildProgram(m_program, 0, NULL, NULL, NULL, NULL);
                
                if (result != CL_SUCCESS)
                {
                    char error[4096];
                    size_t length;
                    
                    clGetProgramBuildInfo(m_program, m_deviceId, CL_PROGRAM_BUILD_LOG, sizeof(error), error, &length);
                    std::cout << "Error, build failed: " << error;
                }
                else
                {
                    m_distanceKernel = clCreateKernel(m_program, "kohonenDistance", &result);
                    m_incrementKernel = clCreateKernel(m_program, "kohonenIncrement", &result);
                                                       
                    if (m_distanceKernel && m_incrementKernel)
                        m_initialised = true;
                }
            }
        }
        
    }
}

ClTrainer::~ClTrainer( void )
{
    if (m_program)
        clReleaseProgram(m_program);
    
    if (m_distanceKernel)
        clReleaseKernel(m_distanceKernel);
    
    if (m_incrementKernel)
        clReleaseKernel(m_incrementKernel);
    
    if (m_commandQueue)
        clReleaseCommandQueue(m_commandQueue);
    
    if (m_context)
        clReleaseContext(m_context);
    
    if (m_deviceId)
        clReleaseDevice(m_deviceId);
}

void
    ClTrainer::Train(
                     TrainingSet const* trainingSet )
{
    PerformanceCounter counter( __FILE__, __FUNCTION__, "" );
    
    assert(m_initialised);
    
	Trainer::Train( trainingSet );
    
	for (unsigned int i = 0; i < GetIterations(); ++i)
	{
		Iterate( trainingSet );
	}
}

void
    ClTrainer::Iterate(
                       TrainingSet const* trainingSet )
{
	PerformanceCounter counter( __FILE__, __FUNCTION__, "" );
    
	unsigned int count = trainingSet->GetCount();
    
    // Assumption that all trainingSet inputs have the same count... at least check we have elements in
    // the training set and there are some weights there.
    
    assert(count > 0 && trainingSet->GetWeights(0)->Count() > 0);
    unsigned int nodeCount = GetNetwork()->GetCount();
    unsigned int weightCount = static_cast<unsigned int>(trainingSet->GetWeights(0)->Count());
    
    double* rawWeights = new double [weightCount];
    double* rawNodeWeights = new double [weightCount * nodeCount];
    double* rawDistance = new double [nodeCount];
    int* rawNodePositions = new int [nodeCount * 2];
    
    cl_mem weights = clCreateBuffer(m_context, CL_MEM_READ_ONLY,  sizeof(double) * weightCount, NULL, NULL);
    cl_mem nodeWeights = clCreateBuffer(m_context, CL_MEM_READ_ONLY,  sizeof(double) * weightCount * nodeCount, NULL, NULL);
    cl_mem distance = clCreateBuffer(m_context, CL_MEM_WRITE_ONLY, sizeof(double) * nodeCount, NULL, NULL);
    
    cl_mem nodeRwWeights = clCreateBuffer(m_context, CL_MEM_READ_WRITE,  sizeof(double) * weightCount * nodeCount, NULL, NULL);
    cl_mem nodePositions = clCreateBuffer(m_context, CL_MEM_READ_ONLY, sizeof(int) * 2 * nodeCount, NULL, NULL);
    
    // Store the node weights.  We're going to reuse these in both kernels so only have to store these once
    // and write back once computation is finished.
    for (unsigned int i = 0; i < nodeCount; ++i)
    {
        Node<Weight<double>>* node = GetNetwork()->GetNode( i );
        std::vector<Weight<double>*> const& a = node->GetWeights().GetWeights();
        
        for (unsigned int j = 0; j < weightCount; ++j)
            rawNodeWeights[i * weightCount + j] = a[j]->Get();
        
        rawNodePositions[i*2] = node->GetX();
        rawNodePositions[i*2+1] = node->GetY();
    }
    
    // The localWorkSize and globalWorkSize control the threads which are spawned.  In this case we have
    // a 1D weight set of N elements to "multiply" through a 2D weight set of N-by-Number of Nodes which
    // generates a set of distances with count equal to the number of Nodes.  These are usable in both
    // kernels :).
    
    size_t localWorkSize[3];
    size_t globalWorkSize[3];
    
    localWorkSize[0] = nodeCount;
    localWorkSize[1] = 0;
    localWorkSize[2] = 0;
    globalWorkSize[0] = nodeCount;
    globalWorkSize[1] = 0;
    globalWorkSize[2] = 0;
    
	for (unsigned int i = 0; i < count; ++i)
	{
		std::vector<Weight<double>*> const& inputWeights = (trainingSet->GetWeights(i))->GetWeights();
        
        // Run the first kernel.
        
        for (unsigned int j = 0; j < weightCount; ++j)
            rawWeights[j] = inputWeights[j]->Get();
        
        int result = 0;
        result = clEnqueueWriteBuffer(m_commandQueue, weights, CL_TRUE, 0, sizeof(double) * weightCount, rawWeights, 0, NULL, NULL);
        result = clEnqueueWriteBuffer(m_commandQueue, nodeWeights, CL_TRUE, 0, sizeof(double) * weightCount * nodeCount, rawNodeWeights, 0, NULL, NULL);
        	
        result = 0;
        result  = clSetKernelArg(m_distanceKernel, 0, sizeof(cl_mem), &weights);
        result |= clSetKernelArg(m_distanceKernel, 1, sizeof(cl_mem), &nodeWeights);
        result |= clSetKernelArg(m_distanceKernel, 2, sizeof(cl_mem), &distance);
        result |= clSetKernelArg(m_distanceKernel, 3, sizeof(unsigned int), &weightCount);
        
        // Do the work.
        result = clEnqueueNDRangeKernel(m_commandQueue, m_distanceKernel, 1, NULL, globalWorkSize, localWorkSize, 0, NULL, NULL);
        
        // Get the data back.
        result = clEnqueueReadBuffer(m_commandQueue, distance, CL_TRUE, 0, sizeof(double) * nodeCount, rawDistance, 0, NULL, NULL);
        
        // Now we have a set of distances and need to find the smallest distance.
        unsigned int winner = 0;
        double minDistance = std::numeric_limits<double>::max();
        
        for (unsigned int j = 0; j < nodeCount; ++j)
        {
            if (rawDistance[j] < minDistance)
            {
                minDistance = rawDistance[j];
                winner = j;
            }
        }
        
        // Prepare for running the second kernel.
        
        Node<Weight<double>>* winningNode = GetNetwork()->GetNode( winner );
        std::vector<Weight<double>*> const& winningWeights = winningNode->GetWeights().GetWeights();
        int winnerX = winningNode->GetX();
        int winnerY = winningNode->GetY();
        double learningCoefficient = GetLearningCoefficient();
        
        // Store the winning weights
        for (unsigned int j = 0; j < weightCount; ++j)
            rawWeights[j] = winningWeights[j]->Get();
        
        result = clEnqueueWriteBuffer(m_commandQueue, weights, CL_TRUE, 0, sizeof(double) * weightCount, rawWeights, 0, NULL, NULL);
        result = clEnqueueWriteBuffer(m_commandQueue, nodeRwWeights, CL_TRUE, 0, sizeof(double) * weightCount * nodeCount, rawNodeWeights, 0, NULL, NULL);
        result = clEnqueueWriteBuffer(m_commandQueue, nodePositions, CL_TRUE, 0, sizeof(int) * 2 * nodeCount, rawNodePositions, 0, NULL, NULL);
        
        result = 0;
        result  = clSetKernelArg(m_incrementKernel, 0, sizeof(cl_mem), &weights);
        result |= clSetKernelArg(m_incrementKernel, 1, sizeof(cl_mem), &nodeWeights);
        result |= clSetKernelArg(m_incrementKernel, 2, sizeof(cl_mem), &nodePositions);
        result |= clSetKernelArg(m_incrementKernel, 4, sizeof(int), &learningCoefficient);
        result |= clSetKernelArg(m_incrementKernel, 5, sizeof(int), &winnerX);
        result |= clSetKernelArg(m_incrementKernel, 6, sizeof(int), &winnerY);
        result |= clSetKernelArg(m_incrementKernel, 7, sizeof(unsigned int), &weightCount);
        
        // Do the work.
        result = clEnqueueNDRangeKernel(m_commandQueue, m_incrementKernel, 1, NULL, globalWorkSize, localWorkSize, 0, NULL, NULL);
        
        // Get the data back.
        result = clEnqueueReadBuffer(m_commandQueue, nodeRwWeights, CL_TRUE, 0, sizeof(double) * nodeCount, rawNodeWeights, 0, NULL, NULL);
	}

    // Writeback the weights..
    for (unsigned int i = 0; i < nodeCount; ++i)
    {
        Node<Weight<double>>* node = GetNetwork()->GetNode( i );
        
        for (unsigned int j = 0; j < weightCount; ++j)
            *(node->GetWeights()[j]) = rawNodeWeights[i * weightCount + j];
    }

    delete [] rawWeights;
    delete [] rawNodeWeights;
    delete [] rawDistance;
    delete [] rawNodePositions;
    
    clReleaseMemObject(weights);
    clReleaseMemObject(nodeWeights);
    clReleaseMemObject(distance);
    clReleaseMemObject(nodeRwWeights);
    clReleaseMemObject(nodePositions);
}