PIPES: A Pipeline Toolkit

In UNIX a filter is a program that reads its input from the standard input stream (stdin) and writes its output to the standard output stream (stdout);

Normally, the source of stdin is the keyboard while the destination of stdout is the monitor. However, using pipes and redirection, users can easily reset the source of stdin to be a file or the stdout of another filter. Similarly, the destination of stdout can be a file or the stdin of another filter. Thus, users can chain together filters to form pipelines. For example, the shell command:

filter1 < file1 | filter2 > file2

creates and starts the pipeline:

Note that each filter runs as a separate process, reading from stdin, processing the data read, then writing to stdout. Thus, while filter1 is processing data from the end of file1, filter 2 is simultaneously processing data that originally came from the start of file1.

Pipeline architectures join Peer-to-Peer architectures and Client-Server architectures as popular designs for distributed applications. With this in mind, we will construct a toolkit called PIPES that allows users to assemble pipelines. The PIPES package comes with several predefined classes:

PipeLine: a sequence of connected filters

PipeLineError: exception thrown when problems arise

Filter: abstract base class for all filters

FileInputFilter: extends Filter, input stream is connected to a file

ConsoleInputFilter: extends Filter, input comes from console prompt

FileOutputFilter: extends Filter, output goes to a file

ConsoleOutputFilter: extends Filter, output goes to console

PipeFilter: extends Filter, input and output are piped streams

Basically, a filter is a thread that perpetually reads strings from a reader. When a non-null string is read, the string is processed using an abstract update method that must be implemented in a subclass. The output of the update method, if it's not null, is then written to the filter's writer. Exceptions thrown by the update method must be caught by the filter. If one filter shuts down, the pipeline should shut down all of the filters and terminate.

To use PIPES, the user must first create a few extensions of the PipeFilter class. For example, suppose a user wants to create a pipeline for processing integer messages entered from the console. He begins by creating a few custom extensions of the PipeFilter class:

class Square extends PipeFilter {
   String update(String msg) throws PipeLineError {
      if (msg == null) throw new PipeLineError("bad input");
      int val = 0;
      try {
         val = Integer.parse(msg);
      } catch (NumberFormatException e) {
         throw new PipeLineError(e.getMessage());
      }
      return "" + (val * val);
   }
}

class isEven extends PipeFilter {
   String update(String msg) throws PipeLineError {
      if (msg == null) throw new PipeLineError("bad input");
      int val = 0;
      try {
         val = Integer.parse(msg);
      } catch (NumberFormatException e) {
         throw new PipeLineError(e.getMessage());
      }
      return (val % 2 == 0)? msg: null;
   }
}

class Accum extends PipeFilter {
   int total = 0;
   String update(String msg) throws PipeLineError {
      if (msg == null) throw new PipeLineError("bad input");
      int val = 0;
      try {
         val = Integer.parse(msg);
      } catch (NumberFormatException e) {
         throw new PipeLineError(e.getMessage());
      }
      total += val;
      return "" + total;
   }
}

The PIPES user interface is a CUI. Here is how the user makes a pipeline:

-> makeFilter f1 ConsoleInputFilter
ok
-? makeFilter f2 Square
ok
-> makeFilter f3 isEven
ok
-> makeFilter f4 Accum
ok
-> makeFilter f5 ConsoleOutputFilter
ok
-> makePipeLine p1 f1 f2 f3 f4 f5
ok

Internally, here is what p1 looks like:

Note: stdin and stdout are fields defined in the Console class.

Starting a pipeline starts all of its filters:

-> start p1
messages: 2 3 4 5
4
messages: 6 7 8
20
56

messages: quit
120
pipeline is shutting down

->

The "messages" prompt is perpetually printed by the console input filter, which then reads a line from stdin, breaks it into tokens, then writes each token to its output pipe.

Hint, consider use String.split() instead of StringTokenizer:

String msgs = stdin.readLine();
String[] tokens = msgs.split("\\s");
for(int i = 0; i < tokens.length; i++) {
   outStream.write(tokens[i]);
}

The other messages are coming from the ConsoleOutputFilter. These are simply the successive values of accum. Of course the output from the console input and output filters are not synchronized in any way.

Design of PIPES

Implementation of PIPES

abstract class Filter extends Thread {
   protected BufferedReader inPipe;
   protected BufferedWriter outPipe;
   protected abstract String update(String msg) throws PipeLineError;
   public void run() {
      while(true) {
         try {
            msg = inPipe.readLine();
            if (msg.equals("halt")) break;
            if (msg == null) { // end of stream
               outPipe.write("halt");
               break;
            }
            msg = update(msg);
            if (msg == null) continue;
            outPipe.write(msg);
         } catch(PipeLineError e) {
            outPipe.write("halt");
            System.err.println(e.getMessage());
            break;
         } catch (Exception e) {
            outPipe.write("halt");
            System.err.println(e.getMessage());
            break;
         }
      }
   }    
   // etc.
}

Hint: A buffered reader is like a filter. It can be constructed from another reader, including a PipedReader. Similarly, a buffered writer can be constructed from another writer.

public class PipeLine extends Thread {
   private List filters;
   public Filter makeFilter() { ??? }
   public PipeLine(List filters) { ??? }
   public void run() {
      start all filters
      wait for last filter to stop
   }
   // etc?
}