Reactor

Reactor

Event-driven applications, such as GUIs or servers, often apply the architecture pattern Reactor. A Reactor can accept multiple requests simultaneously and distribute them to different handlers.

 Reactor

The Reactor Pattern is an event-driven framework to concurrently demultiplex and dispatch service requests to various service providers. The requests are processed synchronously.

Reactor

Also known as

  • Dispatcher
  • Notifier

Problem

  • A server should
    • answer several client requests simultaneously
    • be performant, stable, and scalable
    • be extendable to support new or improved services
  • The application should be hidden from multi-threading and synchronization challenges

 Solution

  • Each supported service is encapsulated in a handler
  • The handlers are registered within the Reactor
  • The Reactor uses an event demultiplexer to wait synchronously on all incoming events
  • When the Reactor is notified, it dispatches the service request to the specific handler

Structure

 reactorUML

 

reactorCRCUpdate

Handles

  • The handles identify different event sources, such as network connections, open files, or GUI events.
  • The event source generates events such as connect, read, or write queued on the associated handle.

Synchronous event demultiplexer

  • The synchronous event demultiplexer waits for one or more indication events and blocks until the associated handle can process the event.
  • The system calls select, poll, epoll, kqueue, or WaitForMultipleObjects enable it to wait for indication events.

Event handler

 

Rainer D 6 P2 500x500Modernes C++ Mentoring

  • "Fundamentals for C++ Professionals" (open)
  • "Design Patterns and Architectural Patterns with C++" (open)
  • "C++20: Get the Details" (open)
  • "Concurrency with Modern C++" (open)
  • "Generic Programming (Templates) with C++": October 2024
  • "Embedded Programming with Modern C++": October 2024
  • "Clean Code: Best Practices for Modern C++": March 2025
  • Do you want to stay informed: Subscribe.

     

    • The event handler defines the interface for processing the indication events.
    • The event handler defines the supported services of the application.

    Concrete event handler

    • The concrete event handler implements the interface of the application defined by the event handler.

    Reactor

    • The Reactor supports an interface to register and deregister the concrete event handler using file descriptors.
    • The Reactor uses a synchronous event demultiplexer to wait for indication events. An indication event can be a reading event, a writing event, or an error event.
    • The Reactor maps the events to their concrete event handler.
    • The Reactor manages the lifetime of the event loop.

    The Reactor (not the application) waits for the indication events to demultiplex and dispatch the event. The concrete event handlers are registered within the Reactor. The Reactor inverts the flow of control. This inversion of control is often called Hollywood principle.

    The dynamic behavior of a Reactor is pretty interesting.

    Dynamic Behavior

    The following points illustrate the control flow between the Reactor and the event handler.

    • The application registers an event handler for specific events in the Reactor.
    • Each event handler provides its specific handler to the Reactor.
    • The application starts the event loop. The event loop waits for indication events.
    • The event demultiplexer returns to the Reactor when an event source becomes ready.
    • The Reactor dispatches the handles to the corresponding event handler.
    • The event handler processes the event.

    Let’s study the Reactor in action.

    Example

    This example uses the POCO framework. “The POCO C++ Libraries are powerful cross-platform C++ libraries for building network- and internet-based applications that run on desktop, server, mobile, IoT, and embedded systems.”

    // reactor.cpp
    
    #include <fstream>
    #include <string>
    
    #include "Poco/Net/SocketReactor.h"
    #include "Poco/Net/SocketAcceptor.h"
    #include "Poco/Net/SocketNotification.h"
    #include "Poco/Net/StreamSocket.h"
    #include "Poco/Net/ServerSocket.h"
    #include "Poco/Observer.h"
    #include "Poco/Thread.h"
    #include "Poco/Util/ServerApplication.h"
    
    using Poco::Observer;
    using Poco::Thread;
    
    using Poco::Net::ReadableNotification;
    using Poco::Net::ServerSocket;
    using Poco::Net::ShutdownNotification;
    using Poco::Net::SocketAcceptor;
    using Poco::Net::SocketReactor;
    using Poco::Net::StreamSocket;
    
    using Poco::Util::Application;
    
    class EchoHandler {
     public:
      EchoHandler(const StreamSocket& s, SocketReactor& r): socket(s), reactor(r) {           // (11)
        reactor.addEventHandler(socket, 
          Observer<EchoHandler, ReadableNotification>(*this, &EchoHandler::socketReadable));
      }
    
      void socketReadable(ReadableNotification*) {
        char buffer[8];
        int n = socket.receiveBytes(buffer, sizeof(buffer));
        if (n > 0) {
          socket.sendBytes(buffer, n);                                                       // (13)                                                    
        }
        else {
          reactor.removeEventHandler(socket,                                                 // (12)
    	      Observer<EchoHandler, ReadableNotification>(*this, &EchoHandler::socketReadable));
          delete this;
        }
      }
    
     private:
      StreamSocket socket;
      SocketReactor& reactor;
    };
    
    class DataHandler {
     public:
    
      DataHandler(StreamSocket& s, SocketReactor& r): socket(s), reactor(r), 
                                                      outFile("reactorOutput.txt") {
        reactor.addEventHandler(socket,                                                   // (14) 
          Observer<DataHandler, ReadableNotification>(*this, &DataHandler::socketReadable));
        reactor.addEventHandler(socket,                                                   // (15)
          Observer<DataHandler, ShutdownNotification>(*this, &DataHandler::socketShutdown));
        socket.setBlocking(false);
      }
    
      ~DataHandler() {                                                                    // (16)
        reactor.removeEventHandler(socket, 
          Observer<DataHandler, ReadableNotification>(*this, &DataHandler::socketReadable));
        reactor.removeEventHandler(socket, 
          Observer<DataHandler, ShutdownNotification>(*this, &DataHandler::socketShutdown));
      }
    
      void socketReadable(ReadableNotification*) {
        char buffer[64];
        int n = 0;
        do {
          n = socket.receiveBytes(&buffer[0], sizeof(buffer));
          if (n > 0) {
            std::string s(buffer, n);
            outFile << s << std::flush;                                                 // (17)
          }
          else break;
        } while (true);
      }
    
      void socketShutdown(ShutdownNotification*) {
        delete this;
      }
    
     private:
      StreamSocket socket;
      SocketReactor& reactor;
      std::ofstream outFile;
    };
    
    class Server: public Poco::Util::ServerApplication {
    
     protected:
      void initialize(Application& self) {                                   // (3)
        ServerApplication::initialize(self);
      }
    		
      void uninitialize() {                                                  // (4)
        ServerApplication::uninitialize();
      }
    
      int main(const std::vector<std::string>&) {                            // (2)
    		
        ServerSocket serverSocketEcho(4711);                                 // (5)
        ServerSocket serverSocketData(4712);                                 // (6)
        SocketReactor reactor;
        SocketAcceptor<EchoHandler> acceptorEcho(serverSocketEcho, reactor); // (7)
        SocketAcceptor<DataHandler> acceptorData(serverSocketData, reactor); // (8)
        Thread thread;
        thread.start(reactor);                                               // (9)
        waitForTerminationRequest();
        reactor.stop();                                                      // (10)
        thread.join();
            
        return Application::EXIT_OK;
    
      }
    
    };
    
    int main(int argc, char** argv) {
    
      Server app;                                                             // (1)
      return app.run(argc, argv);
    
    }
    

     

    Line (1) generates the TCP server. The server performs the main function (line 2) and is initialized in line (3) and uninitialized in line (4). The main function of the TCP server creates two server sockets, listening on port 4711 (line 5) and port 4712 (line 6). Lines (7) and (5) generate the server sockets using the EchoHandler and the DataHandler. The SocketAcceptor models the Acceptor component of the Accepter-Connector design pattern. The reactor runs in a separate thread (line 9) until it gets its termination request (line 10). The EchoHandler registers its read handle in the constructor (line 11), and it unregisters its read handle in the member function socketReadable (line 12). The echo service sends the client’s message back (line 13). On the contrary, the DataHandler enables a client to transfer data to the server. The handler registers in its constructor its action for reading events (line 14) and shutdown events (line 15). Both handlers are unregistered in the destructor of DataHandler (line 16). The result of the data transfer is directly written to the file handle outFile (line 17).

    The following output shows on the left the server and on the right both clients. A telnet session serves as a client. The first client connects to port 4711: telnet 127.0.0.1 4711. This client connects with the echo server and displays its request. The second client connects to port 4712: telnet 127.0.0.1 4712. The server’s output shows that the client data is transferred to the server.

    reactorRun

    What are the pros and cons of the Reactor?

    Pros and Cons

    Pros

    • A clear separation of framework and application logic.
    • The modularity of various concrete event handlers.
    • The Reactor can be ported to various platforms because the underlying event demultiplexing functions such as select, poll, epoll, kqueue, or WaitForMultipleObjects is available on Unix (select, epoll), and Windows platforms (WaitForMultipleObjects).
    • The separation of interface and implementation enables easy adaption or extension of the services.
    • Overall structure supports the concurrent execution.

    Cons

    • Requires an event demultiplexing system call.
    • A long-running event handler can block the Reactor.
    • The inversion of control makes testing and debugging more difficult.

    What’s Next?

    There are many well-established patterns used in the concurrency domain. They deal with synchronization challenges such as sharing and mutation but also with concurrent architectures. In my next post, I will start with the patterns focusing on data sharing.

     

    Thanks a lot to my Patreon Supporters: Matt Braun, Roman Postanciuc, Tobias Zindl, G Prvulovic, Reinhold Dröge, Abernitzke, Frank Grimm, Sakib, Broeserl, António Pina, Sergey Agafyin, Андрей Бурмистров, Jake, GS, Lawton Shoemake, Jozo Leko, John Breland, Venkat Nandam, Jose Francisco, Douglas Tinkham, Kuchlong Kuchlong, Robert Blanch, Truels Wissneth, Mario Luoni, Friedrich Huber, lennonli, Pramod Tikare Muralidhara, Peter Ware, Daniel Hufschläger, Alessandro Pezzato, Bob Perry, Satish Vangipuram, Andi Ireland, Richard Ohnemus, Michael Dunsky, Leo Goodstadt, John Wiederhirn, Yacob Cohen-Arazi, Florian Tischler, Robin Furness, Michael Young, Holger Detering, Bernd Mühlhaus, Stephen Kelley, Kyle Dean, Tusar Palauri, Juan Dent, George Liao, Daniel Ceperley, Jon T Hess, Stephen Totten, Wolfgang Fütterer, Matthias Grün, Phillip Diekmann, Ben Atakora, Ann Shatoff, Rob North, Bhavith C Achar, Marco Parri Empoli, Philipp Lenk, Charles-Jianye Chen, Keith Jeffery,and Matt Godbolt.

    Thanks, in particular, to Jon Hess, Lakshman, Christian Wittenhorst, Sherhy Pyton, Dendi Suhubdy, Sudhakar Belagurusamy, Richard Sargeant, Rusty Fleming, John Nebel, Mipko, Alicja Kaminska, Slavko Radman, and David Poole.

    My special thanks to Embarcadero
    My special thanks to PVS-Studio
    My special thanks to Tipi.build 
    My special thanks to Take Up Code
    My special thanks to SHAVEDYAKS

    Seminars

    I’m happy to give online seminars or face-to-face seminars worldwide. Please call me if you have any questions.

    Standard Seminars (English/German)

    Here is a compilation of my standard seminars. These seminars are only meant to give you a first orientation.

    • C++ – The Core Language
    • C++ – The Standard Library
    • C++ – Compact
    • C++11 and C++14
    • Concurrency with Modern C++
    • Design Pattern and Architectural Pattern with C++
    • Embedded Programming with Modern C++
    • Generic Programming (Templates) with C++
    • Clean Code with Modern C++
    • C++20

    Online Seminars (German)

    Contact Me

    Modernes C++ Mentoring,

     

     

    0 replies

    Leave a Reply

    Want to join the discussion?
    Feel free to contribute!

    Leave a Reply

    Your email address will not be published. Required fields are marked *