Publisher-Subscriber
Other Names
Observer-Observable
Problem
Various "handler" objects need to be notified when a "device" object changes state. Unfortunately, the number and types of handlers can vary dynamically. We want to avoid tight coupling and polling.
Solution
Handlers should register with the devices they want to monitor using a subscribe() function provided by the device. Each handler provides a function called update() (of course the implementation of update() will depend on the type of handler). When the device changes state, it notifies each registered handler by calling the handler's update() function.
Devices inherit subscribe(), unsubscribe(), and notify() from a predefined Publisher base class. All handler classes are derived from an abstract Subscriber base class with a pure virtual update() function.
Static Structure
Dynamic Structure
Variations
The solution described above is called the pull version because it is the job of the handler to fetch the state of the device it monitors. In the push version the state of the device is an argument of the update function.
Sometimes handlers are more interested in events than the devices that produce the events. In this case we can introduce a single event manager object that maintains a table associating possible events with interested subscribers. A handler uses the event manager's subscribe() function to register itself to be notified for specific events. When a device changes state, it notifies the event manager, which notifies each subscriber interested in this event.
Implementation
pubsub.h
/***********************************************
class Subscriber
************************************************
*/
class Subscriber
{
public:
virtual void update() = 0;
};
/***********************************************
class Publisher (baby version)
************************************************
*/
class Publisher
{
public:
enum { MAX = 100 };
Publisher() { numSubscribers = 0; }
void subscribe(Subscriber* s);
void unsubscribe(Subscriber* s);
void notify();
private:
Subscriber* subscribers[MAX];
int numSubscribers;
};
pubsub.cpp
#include "pubsub.h"
#include "utils.h"
/************************************************/
void Publisher::subscribe(Subscriber* s)
/* PURPOSE: add subscriber to subscribers array
RECEIVES: subscriber to be added
RETURNS:
REMARKS: error if no more room
*/
{
if (numSubscribers < MAX)
subscribers[numSubscribers++] = s;
else
error("Publisher full"); // or return false?
}
/************************************************/
void Publisher::unsubscribe(Subscriber* s)
/* PURPOSE: remove subscriber from subscribers array
RECEIVES: subscriber to be removed
RETURNS:
REMARKS: only eliminates first occurence of s
*/
{
bool found = false;
for(int i = 0; i < numSubscribers; i++)
{
if (!found)
found = subscribers[i] == s;
if (found)
subscribers[i] = subscribers[i + 1];
}
if (found) numSubscribers--;
}
/************************************************/
void Publisher::notify()
/* PURPOSE: notify subscribers that publisher's state has changed
RECEIVES:
RETURNS:
REMARKS: pull variant, subscribers must fetch state
*/
{
for(int i = 0; i < numSubscribers; i++)
subscribers[i]->update();
}
A Hipper Version
#include <list>
using namespace std;
class Publisher
{
public:
void subscribe(Subscriber* s);
void unsubscribe(Subscriber* s);
void notify();
private:
list<Subscriber*> subscribers;
};
/************************************************/
void Publisher::subscribe(Subscriber* s)
/* PURPOSE: add subscriber to subscribers array
RECEIVES: subscriber to be added
RETURNS:
REMARKS:
*/
{
subscribers.push_back(s);
}
/************************************************/
void Publisher::unsubscribe(Subscriber* s)
/* PURPOSE: remove subscriber from subscribers array
RECEIVES: subscriber to be removed
RETURNS:
REMARKS:
*/
{
subscribers.remove(s);
}
/************************************************/
void Publisher::notify()
/* PURPOSE: notify subscribers that publisher's state has changed
RECEIVES:
RETURNS:
REMARKS: pull variant, subscribers must fetch state
*/
{
list<Subscriber*>::iterator it;
for(it = subscribers.begin(); it != subscribers.end(); it++)
(*it)->update();
}
Example
Reactor
#include "pubsub.h"
#include "utils.h"
class Reactor: public Publisher
{
public:
enum { CRITICAL = 1500 };
Reactor(double t = 1000) { temp = t; }
void incTemp(double amt = 15)
{
temp += amt;
if (temp > CRITICAL)
notify();
}
void decTemp(double amt = 4) { temp -= amt; }
bool tooHot() { return temp > CRITICAL; }
double getTemp() { return temp; }
private:
double temp; // reactor's temperature
};
Beeping Alarm
class Alarm: Subscriber
{
public:
Alarm(Reactor* r)
{
myReactor = r;
myReactor->subscribe(this);
}
~Alarm() { myReactor->unsubscribe(this); }
void update() { cout << '\a'; } // sound alarm
private:
Reactor* myReactor;
};
Printing Alarm
class Alarm2: Subscriber
{
public:
static int count;
Alarm2(Reactor* r)
{
myReactor = r;
myReactor->subscribe(this);
id = count++;
}
~Alarm2() { myReactor->unsubscribe(this); }
void update()
{
cout << "Alarm #" << id << " Warning: reactor too hot!\n";
}
private:
Reactor* myReactor;
int id;
};
int Alarm2::count = 0;
Thermostat
class Thermostat: public Subscriber
{
public:
Thermostat(Reactor* r)
{
myReactor = r;
myReactor->subscribe(this);
}
~Thermostat() { myReactor->unsubscribe(this); }
void update()
{
while (myReactor->tooHot())
myReactor->decTemp();
}
private:
Reactor* myReactor;
};
Main
#include "reactor.h"
#include <iostream>
#include <string>
#include "utils.h"
using namespace std;
int main()
{
Reactor r(1490);
Alarm a(&r), b(&r);
Alarm2 c(&r), d(&r), e(&r);
Thermostat t(&r);
bool done = false;
string cmmd;
while (!done)
{
cout << "command-> ";
cin >> cmmd;
if (cmmd == "quit")
{
cout << "bye";
done = true;
}
else if (cmmd == "inc")
{
string temp;
cin >> temp;
double amt;
fromString(amt, temp);
r.incTemp(amt);
cout << "temp = " << r.getTemp() << "\n";
}
else if (cmmd == "dec")
{
string temp;
cin >> temp;
double amt;
fromString(amt, temp);
r.decTemp(amt);
cout << "temp = " << r.getTemp() << "\n";
}
else if (cmmd == "temp")
{
cout << "temp = " << r.getTemp() << "\n";
}
else
cout << "Unrecognized command: " << cmmd << '\n';
} // while
return 0;
}// main
Program Output
command-> temp
temp = 1490
command-> inc 12
Alarm #0 Warning: reactor too hot!
Alarm #1 Warning: reactor too hot!
Alarm #2 Warning: reactor too hot!
temp = 1498
command-> inc 50
Alarm #0 Warning: reactor too hot!
Alarm #1 Warning: reactor too hot!
Alarm #2 Warning: reactor too hot!
temp = 1500
command-> inc 1
Alarm #0 Warning: reactor too hot!
Alarm #1 Warning: reactor too hot!
Alarm #2 Warning: reactor too hot!
temp = 1497
command->