PIPES: A Pipeline Toolkit

PIPES is a package containing reusable declarations of pipes and filters. Programmers create filters (transformers, testers, producers, and consumers) by creating extensions of PIPES classes. Pipelines are constructed by instantiating these extension classes, then connecting them with pipes. (Thus, PIPES is a toolkit, not a framework.)

For example, here is how a PIPES user constructs a pipeline that sums the squares of odd integers read from the keyboard. The partial sums are displayed in the console window.

The first step is to import the pipes package:

import pipes.*;

Next, a pipeline class is declared. We have chosen to declare the particular types of filters we are interested in as inner classes. The constructor actually constructs the pipeline and starts it:

public class SOS {

   class Square extends Transformer { ... }
   class OddTester extends Tester { ... }
   class Accum extends Consumer { ... }
   class NumReader extends Producer { ... }
  
   public SOS() {
      Pipe p1 = new Pipe(), p2 = new Pipe(), p3 = new Pipe();
      NumReader f1 = new NumReader(p1);
      OddTester f2 = new OddTester(p1, p2);
      Square f3 = new Square(p2, p3);
      Accum f4 = new Accum(p3);
      f1.start();
   }
  
   public static void main(String[] args) {
      SOS pipeline = new SOS();
   }
}

Program Output

Here is the output produced by a sample run of the program:

enter a number: 2
enter a number: 3
accum = 9
enter a number: 4
enter a number: 5
accum = 34
enter a number: 6
enter a number: 7
accum = 83
enter a number: q
PipelineError: java.lang.NumberFormatException: q

Square Transformer

PIPES provides a PipelineError extension of Java's Exception class. Programmers throw these exceptions whenever pipeline-specific errors occur:

class Square extends Transformer {
   public Square(Pipe ip, Pipe op) {
      super(ip, op);
   }
   public Object transform(Object num) throws PipelineError {
      if (!(num instanceof Integer))
         throw new PipelineError("message must be an integer");
      int n = ((Integer)num).intValue();
      return new Integer(n * n);
   }
}

Odd Tester

class OddTester extends Tester {
   public OddTester(Pipe ip, Pipe op) {
      super(ip, op);
   }
   public boolean test(Object num) throws PipelineError {
      if (!(num instanceof Integer))
         throw new PipelineError("message must be an integer");
      int n = ((Integer)num).intValue();
      return (n%2 != 0);
   }
}

Accumulator (Consumer)

class Accum extends Consumer {
   private int accum = 0;
   public Accum(Pipe ip) {
      super(ip);
   }
   public void consume(Object num) throws PipelineError {
      if (!(num instanceof Integer))
         throw new PipelineError("message must be an integer");
      int n = ((Integer)num).intValue();
      accum += n;
      System.out.println("accum = " + accum);
   }
}

NumReader (Producer)

class NumReader extends Producer {
   BufferedReader stdin;
   public NumReader(Pipe op) {
      super(op);
      stdin = new BufferedReader(new InputStreamReader(System.in));
   }
   public Object produce() throws PipelineError {
      try {
         System.out.print("enter a number: ");
         String s = stdin.readLine();
         return Integer.valueOf(s);
      } catch (IOException e) {
         throw new PipelineError("" + e);
      } catch (NumberFormatException e) {
         throw new PipelineError("" + e);
      }
   }
}