Multi-threading in a RH Component

Introduction

A Redhawk component is itself a process that has many threads. Most notably is the serviceFunction which is normally responsible for the data processing that occurs in the component. This post will provide an example of how to increase the data processing capacity of the component by allowing multiple threads to run. There are many different design models where multi-threading can be used. This component was designed to have many threads all operate on the same input packet. The source code for the entire component can be obtained by sending us a request.

Header File

Beginning with the header file:

class MultithreadExample_i : public MultithreadExample_base
{
    ENABLE_LOGGING
    public:
        MultithreadExample_i(const char *uuid, const char *label);
        ~MultithreadExample_i();
        int serviceFunction();
        void start() throw (CF::Resource::StartError, CORBA::SystemException);
        void stop() throw (CF::Resource::StopError, CORBA::SystemException);
        void threadedFunction(unsigned int ThreadIndex);

    protected:
        std::vector<boost::thread*> threadVector;
        std::vector<boost::mutex*> startMutexVector;
        std::vector<boost::mutex*> stopMutexVector;

        boost::barrier* threadBarrier;

        bool stopThreads;
        bulkio::InShortPort::dataTransfer *currentPacket;
};

The start() and stop() functions are added and will be overridden in order to start and stop all of the addtional threads. The thread objects will be stored in a vector as well as the mutexes that are used for synchronization. A barrier is included in this example to show its use although it is not necessarily needed in all cases. A boolean is used to signal to the threads its time to exit, and a pointer to the current packet is kept at the class level to enable all the threads access.

Start Function

The start function is overridden in order to start the threads, and then the base class function is called to start the serviceFunction like normal.

void MultithreadExample_i::start() throw (CF::Resource::StartError, CORBA::SystemException){
  if(threadVector.empty() != true){
    LOG_WARN(MultithreadExample_i, __FUNCTION__ << "Thread Vector is not empty -- try calling stop(). Component NOT started.");
    return;
  }
  stopThreads = false;
  unsigned int numThreads = 4;
  for(unsigned int loop = 0; loop < numThreads; loop++){
    startMutexVector.push_back(new boost::mutex());
    stopMutexVector.push_back(new boost::mutex());
    startMutexVector[loop]->lock();
    stopMutexVector[loop]->lock();
    threadVector.push_back(new boost::thread(
      boost::bind(&MultithreadExample_i::threadedFunction, this, loop)));
  }
  threadBarrier = new boost::barrier(numThreads);
  MultithreadExample_base::start();
}

First, a check is done to make sure the component isn’t already running and is in a known state before trying to do anything. After that, all of the initialization begins. The most important part is where the thread is created, and in this example it is passed a ThreadId which it can use to know its place in the system. In this case it isn’t really needed but there are a variety of cases (especially when debugging) that it can be useful.

Stop Function

Skipping ahead a bit in the life cycle a bit is the stop function. Since it is related to the start it is shown next:

void MultithreadExample_i::stop() throw (CF::Resource::StopError, CORBA::SystemException){
  stopThreads = true;
  for(unsigned int loop = 0; loop < threadVector.size(); loop++){
    startMutexVector[loop]->unlock();
    threadVector[loop]->join();
    stopMutexVector[loop]->unlock();
    delete(startMutexVector[loop]);
    delete(stopMutexVector[loop]);
    delete(threadVector[loop]);
  }
  threadVector.clear();
  startMutexVector.clear();
  stopMutexVector.clear();

  if(threadBarrier != NULL){
    delete(threadBarrier);
    threadBarrier = NULL;
  }
  MultithreadExample_base::stop();
}

The first loop moves through the vector of threads stopping and joining each one individually. Once that is complete the rest of the variables are cleared and set to defaults to prepare for another possible start call.

Thread Function

The thread function is now where the majority of the processing will take place. Each thread will be told to start its operation on the current packet, and once it has completed it will go back and wait to be told the next packet is ready.

void MultithreadExample_i::threadedFunction(unsigned int ThreadIndex){
  std::vector<short> outputData;
  while(true){
    startMutexVector[ThreadIndex]->lock();
    if(stopThreads == true){
      startMutexVector[ThreadIndex]->unlock();
      return;
    }
    outputData.resize(currentPacket->dataBuffer.size());
    for(unsigned int loop = 0; loop < outputData.size(); loop++){
      outputData[loop] = currentPacket->dataBuffer[loop] + ThreadIndex;
    }
    dataShort_out->pushPacket(outputData, currentPacket->T, 
      currentPacket->EOS, currentPacket->streamID);
    threadBarrier->wait();
    stopMutexVector[ThreadIndex]->unlock();
  }
}

The function is set up to loop until the stopThreads boolean tells it to do otherwise. When that occurs then it exits. If it hasn’t been told to stop then it is able to grab the current input packet and perform whatever operation is needed. In this case, the thread adds its ID to each element of the packet and sends it out. A barrier was added just to show how it can be used. If there was an additional step that needed to be performed in each thread, the barrier could be used like this to guarantee all of the threads were once again synchronized before moving on. In this example it is not necessary that they be synced there.

Service Function

The serviceFunction at this point has very little to do. It is essentially just around to get packets off of the port and signal the threads that it is time to start their work. Then it waits for them to be finished and deletes the packet.

int MultithreadExample_i::serviceFunction(){

  currentPacket = dataShort_in->getPacket(0);

  if (not currentPacket) {
    return NOOP;
  }

  if(threadVector.empty() != true){
    for(unsigned int loop = 0; loop < threadVector.size(); loop++){
      startMutexVector[loop]->unlock();
    }

    for(unsigned int loop = 0; loop < threadVector.size(); loop++){
      stopMutexVector[loop]->lock();
    }
  }

  delete currentPacket;
  return NORMAL;
}

The synchronization between threads and the serviceFunction could be accomplished in a number of different ways using semaphores, barriers or mutexes. The dual mutex way was chosen since it is a fairly explicit way to show the progression through the threading operations. If you think there is a better or more clear way to do this, or have any ideas for future blog posts, please contact us:
Contact Us

Recent Posts

Ready for an exciting change?

Work with US!