Message Handlers

Threads usually communicate through shared data structures. They can also communicate by message passing. Message passing is useful when threads have process boundaries between them, but the technique is also important for threads within a process. Assume t1 and t2 are active objects. If t2 has a method called M, and if t1 calls t2.M(), then this method runs in t1's thread. How can t1 get t2.M() to run in t2's thread? If t2 can receive messages, then t1 can send a request asking it to invoke M:

t2.send(new Message("please invoke M"));

Messages

class Message {
   public static final int QUERY = 0, REQUEST = 1, INFORM = 2;
   private int type;
   private String content;
   public Message(String content, int type) {
      this.type = type;
      this.content = content;
   }
   public Message(String content) { this(content, INFORM); }
   public int getType() { return type; }
   public String getContent() { return content; }
   public String toString() {
      return "content: " + content + ", type = " + type;
   }
}

Message Handlers

class Handler extends Thread {
   protected Message msg = null;
   public Handler(Message m) { msg = m; }
   // override in a subclass:
   protected void handle() {
      System.out.println("(Handling: " + msg + ")");
   }
   public void run() {
      if (msg == null) return;
      handle();
   }
}

class QueryHandler extends Handler {
   public QueryHandler(Message msg) { super(msg); }
}

class RequestHandler extends Handler {
   public RequestHandler(Message msg) { super(msg); }
}

class InfoHandler extends Handler {
   public InfoHandler(Message msg) { super(msg); }
}

Listeners

class Listener extends Thread {

   private List messages = new ArrayList();
   public void send(Message msg) {
      synchronized(messages) { messages.add(msg); }
   }

   private boolean run = true;
   public void kill() { run = false; }

   protected Handler makeHandler(Message msg) {
      switch(msg.getType()) {
         case Message.QUERY: return new QueryHandler(msg);
         case Message.REQUEST: return new RequestHandler(msg);
         case Message.INFO: return new InfoHandler(msg);
         default: return null;
      }
   }
  
   public void run() {
      while(run) {
         synchronized(messages) {
            Iterator p = messages.iterator();
            while (p.hasNext()) {
               Message msg = (Message)p.next();
               p.remove();
               Handler slave = makeHandler(msg);
               slave.start();
            } // while
         } // synchronized
         try {
            Thread.sleep(500);
         } catch (InterruptedException ie) {
            System.err.println(ie.getMessage());
         }
      } // while
      System.out.println("Server is shutting down");
   }
   // etc.
}

Test driver


   public static void main(String[] args) {

      Listener server = new Listener();
      System.out.println("Starting server");
      server.start();
      while(true) {
         jutil.IOUtils.print("\nmessage for server ->");
         String msg = jutil.IOUtils.readLine();
         if (msg.equals("quit")) break;
         server.send(new Message(msg));
      }
      server.kill();
      System.out.println("Master will now die");
   }
}

Program output

Starting server

message for server ->aaa

message for server ->
(Handling: content: aaa, type = 2)
bbb

message for server ->
(Handling: content: bbb, type = 2)
ccc

message for server ->
(Handling: content: ccc, type = 2)
quit
Master will now die
Server is shutting down