Publisher-Subscriber

Publisher-Subscriber is one example of an event-notification mechanism.

Publisher-Subscriber [Go4], [POSA], [LAR], [ROG]

Other Names

Publishers are also called senders, observables, subjects, and notifiers. Subscribers are also called receivers, listeners, observers and callbacks.

Problem

Various monitors need to be notified when a device changes state, but the number and types of monitors can vary dynamically. We want to avoid polling, and we don't want to make assumptions in the device code about the numbers and types of monitors.

Solution

The device should maintain a list of pointers to registered monitors. Monitors can be different, but each must implement the same interface, which includes an update() function that the device will call when it changes state. The list of monitor pointers can be managed by a reusable Publisher base class. An abstract Subscriber base class defines the interface monitors must implement.

Static Structure

In a class diagram the name of an abstract class, i.e., a class that isn't instantiated, is italicized. Subscriber is an abstract class because update() is a pure virtual function that will be implemented differently in different derived classes.

Dynamic Structure

Entries may also include sequence diagrams. Assume two monitors subscribe to a device. When the device changes state, monitor1.f() and monitor2.g() must be called:

A C++ Implementation of Publisher-Subscriber

Pattern catalog entries don't usually include implementations, because they are meant to be language independent. It is the programmer's job to provide the implementation. Some patterns are so useful, they are provided in well known libraries. For example, publisher and subscriber classes are provided in the Java utility package, where they are called Observable and Observer. Specializations of publisher and subscriber classes are also provided in the Microsoft Foundation Class library, where they are called CDocument and CView, and IBM's Visual Age library, where they are called Notifier and Subscriber. Unfortunately, the standard C++ library doesn't include publisher and subscriber, so we'll have to implement them ourselves. There are many approaches, we follow one loosely based on the Java implementation.

pubsub.h

We place the publisher and subscriber class declarations in a header file called pubsub.h. Here is the structure of the file:

// pubsub.h
#ifndef PUBSUB_H
#define PUBSUB_H
#include <list> // for our subscriber list
using namespace std; // remove list from std namespace
class Publisher; // forward reference

class Subscriber { ... };
class Publisher { ... };

#endif

The subscriber interface consists of a single function called update(). To allow for the possibility that a monitor might subscribe to multiple devices, we pass a pointer to the notifying publisher. To allow messages to be sent from the device, we provide an optional generic pointer:

class Subscriber
{
public:
   virtual void update(Publisher* who, void* what = 0) = 0;
};

A publisher maintains a list of pointers to subscribers. Monitors implementing the subscriber interface can add themselves to the list using subscribe(), and remove themselves using unsubscribe(). Devices can call the update() function of each registered monitor by calling notify(). A client may need to change the state of a device without notifying the monitors. This can be done by first setting the notifyEnabled flag to false.

class Publisher
{
public:
   Publisher() { notifyEnabled = true; }
   void subscribe(Subscriber* s) { subscribers.push_back(s); }
   void unsubscribe(Subscriber* s) { subscribers.remove(s); }
   void notify(void* what = 0);
   void setNotifyEnabled(bool flag) { notifyEnabled = flag; }
   bool getNotifyEnabled() const { return notifyEnabled; }
private:
   list<Subscriber*> subscribers;
   bool notifyEnabled;
};

pubsub.cpp

Including the implementation of notify() in the Publisher class declaration would make it inline, but some compilers won't allow inline functions containing iterative statements. Therefore, we move the implementation to a file called pubsub.cpp. We must not forget to include pubsub.h in this file:

// pubsub.cpp
#include "pubsub.h"

Here is our implementation of notify(). Notice that notifyEnabled is automatically set to true at the end of the function. This protects against clients who may forget to set the flag back to true after setting it to false:

void Publisher::notify(void* what /* = 0 */)
{
   if (notifyEnabled)
   {
      list<Subscriber*>::iterator p;
      for(p = subscribers.begin(); p != subscribers.end(); p++)
         (*p)->update(this, what);
   }
   notifyEnabled = true;
}

Example: Nuclear Reactor

Let's build a prototype for controller software that runs a nuclear reactor. Here are some of the include directives we will need:

class Reactor: public Publisher
{
public:
   Reactor(double t = 0, double c = 1500)
      : critical(c) { temp = t; }
   void incTemp(double amt = 4) { temp += amt; notify(); }
   void decTemp(double amt = 15) { temp -= amt; notify(); }
   double getTemp() const { return temp; }
   bool tooHot() const { return critical <= temp; }
private:
   double temp; // core temperature
   const double critical; // too hot!
};

Beeping Alarms

A beeping alarm rings the keyboard bell if the notifying reactor is too hot:

class BeepingAlarm: public Subscriber
{
public:
   void update(Publisher* p, void* what)
   {
      Reactor* r = (Reactor*) p;
      if (r->tooHot()) cout << '\a'; // sound alarm
   }
};

Printing Alarms

A printing alarm frantically displays warning messages on the monitor. To make the example more interesting, each printing alarm has a unique identification number, which is automatically generated from the static count variable.

class PrintingAlarm: public Subscriber
{
public:
   PrintingAlarm() { id = count++; }
   void update(Publisher* p, void* what)
   {
      Reactor* r = (Reactor*) p;
      if (r->tooHot())
         cerr << "Alarm #" << id << " Warning: reactor too hot!\n";
   }
private:
   int id;
   static int count;
};

Because count is static, its declaration inside PrintingAlarm is only a declaration, not a definition. We will need to define and initialize it elsewhere:

int PrintingAlarm::count = 0;

Notice that outside the class declaration, count must be qualified by the class name using the scope resolution operator. Where would be a good place to put this definition? Not in a header file. C++ uses the One Definition Rule, which states that a definition can only appear once in a program. If a definition is placed in a header file, which is subsequently included in several files, then the One Definition Rule is violated. This results is an error message from the linker. If the PrintingAlarm declarations is placed in a header file called palarm.h, then create a new file called palarm.cpp containing the definition of count.

Thermostats

Our thermostat class demonstrates a different style of subscriber. It maintains a pointer to its one and only publisher. During construction, the pointer is initialized and the thermostat subscribes itself. During destruction, the thermostat unsubscribes itself, thus avoiding a dangling reference in the publisher's subscribers list. This relieves the user from manually performing the calls to subscribe() and unsubscribe():

class Thermostat: public Subscriber
{
public:
   Thermostat(Reactor* r)
   {
      myReactor = r;
      myReactor->subscribe(this);
   }
   ~Thermostat() { myReactor->unsubscribe(this); }
   void update(Publisher* p, void* what);
private:
   Reactor* myReactor;
};

A thermostat's update() function repeatedly decrements the reactor's temperature until it dips below the critical level:

void Thermostat::update(Publisher* p, void* what);
{
   while(myReactor->tooHot()) // we can ignore p
   {
      myReactor->setNotifyEnabled(false);
      myReactor->decTemp();
   }
}

Failing to set the notifyEnabled flag to false launches a weird recursion, because decTemp() will call notify(), which will call all of the update() functions, including this one!