Publisher-Subscriber

Other Names

Observer-Observable

Problem

Various "handler" objects need to be notified when a "device" object changes state. Unfortunately, the number and types of handlers can vary dynamically. We want to avoid tight coupling and polling.

Solution

Handlers should register with the devices they want to monitor using a subscribe() function provided by the device. Each handler provides a function called update() (of course the implementation of update() will depend on the type of handler). When the device changes state, it notifies each registered handler by calling the handler's update() function.

Devices inherit subscribe(), unsubscribe(), and notify() from a predefined Publisher base class. All handler classes are derived from an abstract Subscriber base class with a pure virtual update() function.

Static Structure

Dynamic Structure

Variations

The solution described above is called the pull version because it is the job of the handler to fetch the state of the device it monitors. In the push version the state of the device is an argument of the update function.

Sometimes handlers are more interested in events than the devices that produce the events. In this case we can introduce a single event manager object that maintains a table associating possible events with interested subscribers. A handler uses the event manager's subscribe() function to register itself to be notified for specific events. When a device changes state, it notifies the event manager, which notifies each subscriber interested in this event.

 

Implementation

pubsub.h

/***********************************************
class Subscriber
************************************************
*/
class Subscriber
{
public:
   virtual void update() = 0;
};

 

/***********************************************
class Publisher (baby version)
************************************************
*/
class Publisher
{
public:
   enum { MAX = 100 };
   Publisher() { numSubscribers = 0; }
   void subscribe(Subscriber* s);
   void unsubscribe(Subscriber* s);
   void notify();
private:
   Subscriber* subscribers[MAX];
   int numSubscribers;
};

pubsub.cpp

#include "pubsub.h"
#include "utils.h"

 

/************************************************/
void Publisher::subscribe(Subscriber* s)
/*   PURPOSE:      add subscriber to subscribers array
   RECEIVES:   subscriber to be added
   RETURNS:   
   REMARKS:      error if no more room
*/
{
   if (numSubscribers < MAX)
      subscribers[numSubscribers++] = s;
   else
      error("Publisher full"); // or return false?
}

 


/************************************************/
void Publisher::unsubscribe(Subscriber* s)
/*   PURPOSE:      remove subscriber from subscribers array
   RECEIVES:   subscriber to be removed
   RETURNS:   
   REMARKS:      only eliminates first occurence of s
*/
{
   bool found = false;
   for(int i = 0; i < numSubscribers; i++)
   {
      if (!found)
         found = subscribers[i] == s;
      if (found)
         subscribers[i] = subscribers[i + 1];
   }
   if (found) numSubscribers--;
}

 

/************************************************/
void Publisher::notify()
/*   PURPOSE:   notify subscribers that publisher's state has changed
   RECEIVES:   
   RETURNS:   
   REMARKS:   pull variant, subscribers must fetch state
*/

{
   for(int i = 0; i < numSubscribers; i++)
      subscribers[i]->update();
}

 

A Hipper Version

#include <list>

using namespace std;

 

class Publisher
{
public:
   void subscribe(Subscriber* s);
   void unsubscribe(Subscriber* s);
   void notify();
private:
   list<Subscriber*> subscribers;
};

 

/************************************************/
void Publisher::subscribe(Subscriber* s)
/*   PURPOSE:      add subscriber to subscribers array
   RECEIVES:   subscriber to be added
   RETURNS:   
   REMARKS:   
*/
{
   subscribers.push_back(s);
}

 


/************************************************/
void Publisher::unsubscribe(Subscriber* s)
/*   PURPOSE:      remove subscriber from subscribers array
   RECEIVES:   subscriber to be removed
   RETURNS:   
   REMARKS:   
*/
{
   subscribers.remove(s);
}

 

/************************************************/
void Publisher::notify()
/*   PURPOSE:   notify subscribers that publisher's state has changed
   RECEIVES:   
   RETURNS:   
   REMARKS:   pull variant, subscribers must fetch state
*/
{
   list<Subscriber*>::iterator it;
   for(it = subscribers.begin(); it != subscribers.end(); it++)
      (*it)->update();
}

 

Example

Reactor

 

#include "pubsub.h"

#include "utils.h"

 

class Reactor: public Publisher
{
public:
   enum { CRITICAL = 1500 };
   Reactor(double t = 1000) { temp = t; }

 

   void incTemp(double amt = 15)
   {
      temp += amt;
      if (temp > CRITICAL)
         notify();
   }

 

   void decTemp(double amt = 4) { temp -= amt; }

   bool tooHot() { return temp > CRITICAL; }

   double getTemp() { return temp; }

 

private:
   double temp; // reactor's temperature
};

 

Beeping Alarm

 

class Alarm: Subscriber
{
public:

   

   Alarm(Reactor* r)
   {
      myReactor = r;
      myReactor->subscribe(this);
   }

 

   ~Alarm() { myReactor->unsubscribe(this); }

 

   void update() { cout << '\a'; } // sound alarm

 

private:
   Reactor* myReactor;

};

 

Printing Alarm

 

class Alarm2: Subscriber
{
public:

 

   static int count;

 

   Alarm2(Reactor* r)
   {
      myReactor = r;
      myReactor->subscribe(this);
      id = count++;
   }

 

   ~Alarm2() { myReactor->unsubscribe(this); }

 

   void update()
   {
      cout << "Alarm #" << id << " Warning: reactor too hot!\n";
   }

 

private:
   Reactor* myReactor;
   int id;

};

 

int Alarm2::count = 0;

 

Thermostat

class Thermostat: public Subscriber
{
public:
   Thermostat(Reactor* r)
   {
      myReactor = r;
      myReactor->subscribe(this);
   }

 

   ~Thermostat() { myReactor->unsubscribe(this); }

 

   void update()
   {
      while (myReactor->tooHot())
         myReactor->decTemp();
   }
private:
   Reactor* myReactor;
};

 

Main

#include "reactor.h"
#include <iostream>
#include <string>
#include "utils.h"
using namespace std;

 

int main()
{
   Reactor r(1490);
   Alarm a(&r), b(&r);
   Alarm2 c(&r), d(&r), e(&r);
   Thermostat t(&r);

 

   bool done = false;
   string cmmd;

 

   while (!done)
   {
      cout << "command-> ";
      cin >> cmmd;

 

      if (cmmd == "quit")
      {
         cout << "bye";
         done = true;
      }
      else if (cmmd == "inc")
      {
         string temp;
         cin >> temp;
         double amt;
         fromString(amt, temp);
         r.incTemp(amt);
         cout << "temp = " << r.getTemp() << "\n";
      }
      else if (cmmd == "dec")
      {
         string temp;
         cin >> temp;
         double amt;
         fromString(amt, temp);
         r.decTemp(amt);
         cout << "temp = " << r.getTemp() << "\n";
      }
      else if (cmmd == "temp")
      {
         cout << "temp = " << r.getTemp() << "\n";
      }
      else
         cout << "Unrecognized command: " << cmmd << '\n';
   } // while

 

   return 0;

 

}// main

Program Output

command-> temp
temp = 1490
command-> inc 12
Alarm #0 Warning: reactor too hot!
Alarm #1 Warning: reactor too hot!
Alarm #2 Warning: reactor too hot!
temp = 1498
command-> inc 50
Alarm #0 Warning: reactor too hot!
Alarm #1 Warning: reactor too hot!
Alarm #2 Warning: reactor too hot!
temp = 1500
command-> inc 1
Alarm #0 Warning: reactor too hot!
Alarm #1 Warning: reactor too hot!
Alarm #2 Warning: reactor too hot!
temp = 1497
command->