ConcurrencyPatterns

Active Object

The active object design pattern decouples method execution from method invocation for objects that each reside in their own thread of control.The goal is to introduce concurrency, by using asynchronous method invocation and a scheduler for handling requests. (Wikipedia:Active Object)

 ConcurrencyPatterns

The Active Object decouples method invocation from method execution. The method invocation is performed on the client thread, but the method execution is on the Active Object. The Active Object has its thread and a list of method request objects (short request) to be executed. The client’s method invocation enqueues the requests on the Active Object’s list. The requests are dispatched to the servant.

Also known as

  • Concurrent object

Problem

When many threads access a shared object synchronized, the following challenges must be solved:

  • A thread invoking a processing-intensive member function should not block the other threads invoking the same object for too long.
  • It should be easy to synchronize access to a shared object.
  • The concurrency characteristics of the executed requests should be adaptable to the concrete hardware and software.

Solution

  • The client’s method invocation goes to a proxy, which represents the interface of the Active Object.
  • The servant implements these member functions and runs in the Active Object’s thread. At run time, the proxy transforms the invocation into a method invocation on the servant. This request is enqueued in an activation list by the scheduler.
  • A scheduler’s event loop runs in the same thread as the servant, deques the requests from the activation list, removes them and dispatches them to the servant.
  • The client obtains the method invocation result via a future from the proxy.

Components

The Active Object pattern consists of six components:

  1. The proxy provides an interface for the accessible member functions on the Active Object. The proxy triggers the construction of a request into the activation list. The proxy runs in the client thread.
  2. The method request class defines the interface for the method executing on an Active Object. This interface also contains guard methods, indicating if the job is ready to run. The request includes all context information to be executed later.
  3. The activation list maintains the pending requests. The activation list decouples the client’s thread from the Active Object thread. The proxy inserts the request object, and the scheduler removes them. Consequently, the access to the activation list must be serialized.
  4. The scheduler runs in the thread of the Active Object and decides which request from the activation list is executed next. The scheduler evaluates the guards of the request to determine if the request can run.
  5. The servant implements the Active Object and runs in the active object’s thread. The servant implements the interface of the method request, and the scheduler invokes its member functions.
  6. The proxy creates the future and s only necessary if the request returns a result. Therefore, the client receives the future and can obtain the method invocation result on the Active Object. The client can wait for the outcome or poll for it.

Dynamic Behavior

The dynamic behavior of the Active Object consists of three phases:

  1. Request construction and scheduling: The client invokes a method on the proxy. The proxy creates a request and passes it to the scheduler. The scheduler enqueues the request on the activation list. Additionally, the proxy returns a future to the client if the request returns a result.
  2. Member function execution: The scheduler determines which request becomes runnable by evaluating the guard method of the request. It removes the request from the activation list and dispatches it to the servant. 
  3. Completion: When the request returns something, it is stored in the future. The client can ask for the result. When the client has the result, the request and the future can be deleted.
The following picture shows the sequence of messages.

 

 ActiveObject

 

 

Rainer D 6 P2 500x500Modernes C++ Mentoring

  • "Fundamentals for C++ Professionals" (open)
  • "Design Patterns and Architectural Patterns with C++" (open)
  • "C++20: Get the Details" (open)
  • "Concurrency with Modern C++" (open)
  • "Generic Programming (Templates) with C++": October 2024
  • "Embedded Programming with Modern C++": October 2024
  • "Clean Code: Best Practices for Modern C++": March 2025
  • Do you want to stay informed: Subscribe.

     

    Pros and Cons

    Before I present a minimal example of the Active Object, here are its pros and cons.

    Pros

    • Synchronization is only required on the Active Object’s thread but not on the client’s threads.
    • Clear separation between the client (user) and the server (implementer). The synchronization challenges are on the implementer’s side. 
    • Enhanced throughput of the system because of the asynchronous execution of the requests. Invoking processing-intensive member functions do not block the entire system.
    • The scheduler can implement various strategies to execute the pending requests. If so, the jobs can be executed in a different order they are enqueued.

    Cons

    • If the requests are too fine-grained, the Active Object pattern’s performance overhead, such as the proxy, the activation list, and the scheduler, may be excessive.
    • Due to the scheduler’s scheduling strategy and the operating system’s scheduling, debugging the Active Object is often tricky. In particular, if the jobs are executed in a different order, they are enqueued.

    Example

    The following example presents a simplified implementation of the Active Object. In particular, I don’t define an interface for the method requests on the Active Object, which the proxy and the servant should implement. Further, the scheduler executes the next job when asked for it, and the run member function of the Active Object creates the threads.

    The involved types future<vector<future<pair<bool, int>>>> are often quite verbose. To improve the readability, I heavily applied using declarations (line 1). This example requires a solid knowledge of promises and futures in C++. My posts about tasks provide more details.
     

    // activeObject.cpp
    
    #include <algorithm>
    #include <deque>
    #include <functional>
    #include <future>
    #include <iostream>
    #include <memory>
    #include <mutex>
    #include <numeric>
    #include <random>
    #include <thread>
    #include <utility>
    #include <vector>
    
    using std::async;                                               // (1)
    using std::boolalpha;
    using std::cout;
    using std::deque;
    using std::distance;
    using std::for_each;
    using std::find_if;
    using std::future;
    using std::lock_guard;
    using std::make_move_iterator;
    using std::make_pair;
    using std::move;
    using std::mt19937;
    using std::mutex;
    using std::packaged_task;
    using std::pair;
    using std::random_device;
    using std::sort;
    using std::jthread;
    using std::uniform_int_distribution;
    using std::vector;
    
    class IsPrime {                                                 // (8)
     public:
        IsPrime(int num): numb{num} {} 
        pair<bool, int> operator()() {
            for (int j = 2; j * j <= numb; ++j){
                if (numb % j == 0) return make_pair(false, numb);
            }
            return make_pair(true, numb);
        }
     private:
        int numb;       
    };
    
    class ActiveObject {
     public:
        
        future<pair<bool, int>> enqueueTask(int i) {
            IsPrime isPrime(i);
            packaged_task<pair<bool, int>()> newJob(isPrime);
            auto isPrimeFuture = newJob.get_future();
            {
                lock_guard<mutex> lockGuard(activationListMutex);
                activationList.push_back(move(newJob));            // (6)
            }
            return isPrimeFuture;
        }
    
        void run() {
            std::jthread j([this] {                                 // (12)
                while ( !runNextTask() );                           // (13)
            });
        }
    
     private:
    
        bool runNextTask() {                                        // (14)
            lock_guard<mutex> lockGuard(activationListMutex);
            auto empty = activationList.empty();
            if (!empty) {                                           // (15)
                auto myTask= std::move(activationList.front());
                activationList.pop_front();
                myTask();
            }
            return empty;
        }
    
        deque<packaged_task<pair<bool, int>()>> activationList;      //(7)
        mutex activationListMutex;
    };
    
    vector<int> getRandNumbers(int number) {
        random_device seed;
        mt19937 engine(seed());
        uniform_int_distribution<> dist(1'000'000, 1'000'000'000);  // (4)
        vector<int> numbers;
        for (long long i = 0 ; i < number; ++i) numbers.push_back(dist(engine)); 
        return numbers;
    }
    
    future<vector<future<pair<bool, int>>>> getFutures(ActiveObject& activeObject, 
                                                       int numberPrimes) {
        return async([&activeObject, numberPrimes] {
            vector<future<pair<bool, int>>> futures;
            auto randNumbers = getRandNumbers(numberPrimes);        // (3)
            for (auto numb: randNumbers){
                futures.push_back(activeObject.enqueueTask(numb));  // (5)
            }
            return futures;
        });
    }
        
    
    int main() {
        
        cout << boolalpha << '\n';
        
         ActiveObject activeObject;
            
        // a few clients enqueue work concurrently                  // (2)
        auto client1 = getFutures(activeObject, 1998);
        auto client2 = getFutures(activeObject, 2003);
        auto client3 = getFutures(activeObject, 2011);
        auto client4 = getFutures(activeObject, 2014);
        auto client5 = getFutures(activeObject, 2017);
        
        // give me the futures                                      // (9)
        auto futures = client1.get();
        auto futures2 = client2.get();
        auto futures3 = client3.get();
        auto futures4 = client4.get();
        auto futures5 = client5.get();
        
        // put all futures together                                 // (10)
        futures.insert(futures.end(),make_move_iterator(futures2.begin()), 
                                     make_move_iterator(futures2.end()));
        
        futures.insert(futures.end(),make_move_iterator(futures3.begin()), 
                                     make_move_iterator(futures3.end()));
        
        futures.insert(futures.end(),make_move_iterator(futures4.begin()), 
                                     make_move_iterator(futures4.end()));
        
        futures.insert(futures.end(),make_move_iterator(futures5.begin()), 
                                     make_move_iterator(futures5.end()));
            
        // run the promises                                         // (11)
        activeObject.run();
        
        // get the results from the futures
        vector<pair<bool, int>> futResults;
        futResults.reserve(futures.size());
        for (auto& fut: futures) futResults.push_back(fut.get());   // (16)
        
        sort(futResults.begin(), futResults.end());                 // (17)
        
        // separate the primes from the non-primes
        auto prIt = find_if(futResults.begin(), futResults.end(), 
                            [](pair<bool, int> pa){ return pa.first == true; });
     
        cout << "Number primes: " << distance(prIt, futResults.end()) << '\n';       // (19)
        cout << "Primes:" << '\n';
        for_each(prIt, futResults.end(), [](auto p){ cout << p.second << " ";} );    // (20)
        
        cout << "\n\n";
        
        cout << "Number no primes: " << distance(futResults.begin(), prIt) << '\n';  // (18)
        cout << "No primes:" << '\n';
        for_each(futResults.begin(), prIt, [](auto p){ cout << p.second << " ";} );
        
        cout << '\n';
        
    }
    
     
    First, the example’s general idea is that clients can enqueue jobs concurrently on the activation list. The servant determines which numbers are prime, and the activation list is part of the Active Object. The Active Object runs the jobs enqueued in the activation list on a separate thread, and the clients can ask for the results.  

    Here are the details.
    The five clients enqueue the work (line 2) on the activeObject via the getFutures function. getFutures takes the activeObject and a number numberPrimes. numberPrimes random numbers are generated (line 3) between 1’000’000 and 1’000’000’000 (line 4) and pushed on the return value: vector<future<pair<bool, int>>. future<pair<bool, int> holds a bool and an int. The bool indicates if the int is a prime. Let’s have a closer look at line (5): futures.push_back(activeObject.enqueueTask(numb)). This call triggers that a new job is enqueued on the activation list (line 6). All calls on the activation list have to be protected. The activation list is a deque of promises (line 7): deque<packaged_task<pair<bool, int>()>>. Each promise performs the function object IsPrime (line 8) when called. The return value is a pair of a bool and an int. The bool indicates if the number int is prime.

    Now, the work packages are prepared. Let’s start the calculation. All clients return in line (9)  their handles to the associated futures. Putting all futures together (line 10) makes my job easier. The call activeObject.run() in line (11) starts the execution. The member function run (line 12) creates the thread to execute the member function runNextTask (line 13). runNextTask (line 14) determines if the deque is not empty (line 15) and creates the new task. By calling futResults.push_back(fut.get()) (line 16) on each future, all results are requested and pushed on futResults. Line (17) sorts the vector of pairs: vector<pair<bool, int>>. The remaining lines present the calculation. The iterator prIt in line 18 holds the first iterator to a pair that has a prime number.                                                                                                                                                                                                                                                          
    The screenshot shows the number of primes distance(prIt, futResults.end()) (line 19) and the primes (line 20). Only the first non-primes are displayed.
     
    activeObject

    What’s Next?

    The Active Object and Monitor Object synchronize and schedule member function invocation. The main difference is that the Active Object executes its member function in a different thread, but the Monitor Object is in the same thread as the client. In my next post, I will present the Monitor Object.

     

     

    Thanks a lot to my Patreon Supporters: Matt Braun, Roman Postanciuc, Tobias Zindl, G Prvulovic, Reinhold Dröge, Abernitzke, Frank Grimm, Sakib, Broeserl, António Pina, Sergey Agafyin, Андрей Бурмистров, Jake, GS, Lawton Shoemake, Jozo Leko, John Breland, Venkat Nandam, Jose Francisco, Douglas Tinkham, Kuchlong Kuchlong, Robert Blanch, Truels Wissneth, Mario Luoni, Friedrich Huber, lennonli, Pramod Tikare Muralidhara, Peter Ware, Daniel Hufschläger, Alessandro Pezzato, Bob Perry, Satish Vangipuram, Andi Ireland, Richard Ohnemus, Michael Dunsky, Leo Goodstadt, John Wiederhirn, Yacob Cohen-Arazi, Florian Tischler, Robin Furness, Michael Young, Holger Detering, Bernd Mühlhaus, Stephen Kelley, Kyle Dean, Tusar Palauri, Juan Dent, George Liao, Daniel Ceperley, Jon T Hess, Stephen Totten, Wolfgang Fütterer, Matthias Grün, Phillip Diekmann, Ben Atakora, Ann Shatoff, Rob North, Bhavith C Achar, Marco Parri Empoli, Philipp Lenk, Charles-Jianye Chen, Keith Jeffery, Matt Godbolt, and Honey Sukesan.

    Thanks, in particular, to Jon Hess, Lakshman, Christian Wittenhorst, Sherhy Pyton, Dendi Suhubdy, Sudhakar Belagurusamy, Richard Sargeant, Rusty Fleming, John Nebel, Mipko, Alicja Kaminska, Slavko Radman, and David Poole.

    My special thanks to Embarcadero
    My special thanks to PVS-Studio
    My special thanks to Tipi.build 
    My special thanks to Take Up Code
    My special thanks to SHAVEDYAKS

    Modernes C++ GmbH

    Modernes C++ Mentoring (English)

    Do you want to stay informed about my mentoring programs? Subscribe Here

    Rainer Grimm
    Yalovastraße 20
    72108 Rottenburg

    Mobil: +49 176 5506 5086
    Mail: schulung@ModernesCpp.de
    Mentoring: www.ModernesCpp.org

    Modernes C++ Mentoring,

     

     

    0 replies

    Leave a Reply

    Want to join the discussion?
    Feel free to contribute!

    Leave a Reply

    Your email address will not be published. Required fields are marked *