Multi-threading support for filters working on data types other than images

From OTBWiki
Revision as of 12:59, 4 March 2011 by SebastienDinot (Talk | contribs) (Vandalism reverted)

(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to: navigation, search

Introduction

Seamless multi-threading support is one of the great feature of the Orfeo ToolBox inherited from ITK. Take any image processing algorithm able to work on a sub-part of its output knowing its entire input, implement the following method:

virtual void ThreadedGenerateData(RegionType& outputRegionForThread,int threadId);

instead of the GenerateData() method and that is all. For more complex algorithm you might need to do some extra mono-threaded processing in the BeforeThreadedGenerateData() and AfterThreadedGenerateData(), but the important point is that the whole multithreading framework is set up.

Unfortunately, this simple framework only applies for images, and if you are writing algorithm working on other types of input (such as lists of objects, or graphs for instance), you are condemned to mono-threading processing, even on your brand new 8-cores toy.

Goal of the tutorial

This tutorial aims at explaining how multi-threading can be set up for algorithm working on other data types. This tutorial is only intended for skilled and experienced developers because of the coding tricks an concepts it describes.

Starting point

Let assume that we already have the following things properly set:

  • An undefined data type called Splitable, which can be split in some way. A splitted part of Splitable is defined by the type Part. For instance, if we were using a list, Part could be a pair of two indices defining a sub-part of the list.
  • A tremendous filter called TremendousSplitableToSplitableFilter, deriving from itk::ProcessObject and working on Splitable data type using an implementation of virtual voidGenerateData();.

Altogether it should look like this:

template < class TInputSplitable, class TOutputSplitable>
class TremendousSplitableToSplitableFilter
: public itk::ProcessObject
{
public:
typedef TOutputSplitable OutputSplitableType;
...
protected:
TremendousSplitableToSplitableFilter()
{
this->itk::ProcessObject::SetNthOutput(OutputSplitableType::New());
}
virtual void GenerateData();
...
}

Step one : Modifying the class header

First there are a few things to add in the class definition, in the protected section.

The static ThreaderCallback function

We need to define a static function which will be called by each thread instance:

static ITK_THREAD_RETURN_TYPE ThreaderCallback( void *arg );

The ThreadStruct structure

We also need a structure in which we will store a pointer to our filter so that in the static function defined above, we will be able to access to our filter instance.

struct ThreadStruct
{
  Pointer Filter;
};

New *GenerateData() methods

Now, in addition to the already existing GenerateData() method, we will define three new ones:

virtual void BeforeThreadedGenerateData();
virtual void AfterThreadedGenerateData();
virtual void ThreadedGenerateData(const Part& outputPartForThread,int threadId);

The first two methods allows you to implement additional processing before or after the multi-threaded section. The ThreadedGenerateData() method run the algorithm to produce the part outputPartForThread of the whole output.

The SplitData() method

We need one more new method, called SplitData(). This method will set the part reference argument to the part of the output will be produced by thread threadId out of threadCount. The part. It returns a number denoting the total number of splits the algorithm can split the output data into.

int SplitData(int threadId, int threadCount, Part& part);


Step two : Getting into the code

Now that the class header has been properly modified, we will focus on the class methods implementation.

Writing ThreadedGenerateData()

The first thing we need to do is to move the body of the old virtual void GenerateData() method into the body of the new virtual void ThreadedGenerateData(const Part& outputPartForThread,int threadId), and change slightly the implementation so that it is now producing only the subpart outputPartForThread of the output.

Rewriting GenerateData()

Once the body of GenerateData() has been moved to ThreadedGenerateData() and modified, we can rewrite it this way. First, we call the BeforeThreadedGenerateData():

this->BeforeThreadedGenerateData();

Then we create an instance of ThreadStruct and intialize the filter pointer using this

// Set up the multithreaded processing
ThreadStruct str;
str.Filter = this;

we set the multithreader number of threads.

// Setting up multithreader
this->GetMultiThreader()->SetNumberOfThreads(this->GetNumberOfThreads());

Then we tell the multi-threader which static method it should call (function pointer to ThreaderCallback), and we also pass by reference the ThreadStruct instance we built. This structure will be used to access the filter in the static function ThreaderCallback.

this->GetMultiThreader()->SetSingleMethod(this->ThreaderCallback, &str);

Trigger the multi-threaded execution

// multithread the execution
this->GetMultiThreader()->SingleMethodExecute();

Call the AfterThreadedGenerateData()

this->AfterThreadedGenerateData();

Writing the ThreaderCallback static function

We are almost done, there is only one thing left to do. We have to write the ThreaderCallback static function. First, we will retrieve the ThreadStruct instance that we passed to the MultiThreader in the previous section.

ThreadStruct *str = (ThreadStruct *)(((itk::MultiThreader::ThreadInfoStruct *)(arg))->UserData);

In a similar way, we retrieve the current threadId and the total number of threads available.

int threadId = ((itk::MultiThreader::ThreadInfoStruct *)(arg))->ThreadID;
int threadCount = ((itk::MultiThreader::ThreadInfoStruct *)(arg))->NumberOfThreads;

Then we will use the SplitData() to determine the part of Splitable we need to process.

Part subPart;
int total = str->Filter->SplitData(threadId,threadCount,subPart);

If we did not already process the entire data, we can call the ThreadedGenerateData() on the given subpart.

if (threadId < total)
  {
    str->Filter->ThreadedGenerateData(subPart,threadId);
  }

Last but not least, we return a value indicating that everything went fine for this thread.

return ITK_THREAD_RETURN_VALUE;

Conclusion

We have shown how to bring multi-threading support to these algorithms working on a different kind of data. For a real implementation example, one might look at the following files:

http://hg.orfeo-toolbox.org/OTB/file/762c93afef46/Code/SpatialReasoning/otbPolygonListToRCC8GraphFilter.h
http://hg.orfeo-toolbox.org/OTB/file/762c93afef46/Code/SpatialReasoning/otbPolygonListToRCC8GraphFilter.txx