The Dispense Package

Short Description

The Dispense Package is designed to be an out of the box way to do distributed programming.  For example, suppose you want to count the number of twin primes (prime numbers p for which p-2 is also prime).  Suppose you have a program count-primes which takes as input a number b, and outputs the number of twin primes between 1,000,000,000 b and 1,000,000,000 (b+1).  If you want a count of primes up to 1014, you create a file primes-input which consists of lines, each containing an integer, starting at 0, and ending at 99999.  So to get the desired count, you would type:

    ./count-primes < primes-input > primes-output

and then add up all the numbers in the file primes-output.

Well this program could take a long time to run.  So you want to spread the load over several computers.  This package allows you to do that.  You have a central task dispensing computer (call it dispense.domain) where you run the program:

    dispense pass-key 3000 10 < primes-input > primes-output

and then on several other computers, you type

    enact ./count-primes pass-key dispense.domain 3000

Each enact program asks the dispense program for a line from primes-input.  (In this instance, the dispense program is listening on port 3000, and uses pass-key to verify that the enact program is working on the correct problem.)  The enact program works on that problem piece, and then sends the answer back to the dispense program.

If for some reason the enact program dies, after 10 minutes the dispense program will give that same problem piece to another enact program.  The dispense program collects all the outputs from the enact programs, and places them in primes-output, along with other information such as the I.P. address of the computer which did that particular computation.

How to Build

This package was designed on FreeBSD.  It should work fine on Linux.  If you want to help me port it to other Unix's (or even Windows) that would be great.

This package needs the Berkeley database revision 3.  You can find it at http://www.sleepycat.com/

Download the package dispense-0.23.tar.gz.  After un-tarring the distribution, and changing to the created directory, type
    make all-enact
    make install-enact
to create and install the enact programs, and
    make all-dispense
    make install-dispense
to make and install the dispense programs.  To make and install everything, type
    make all
    make install

How to Use

To use this package, you will need to create two items.  You need to create a program that does your computations, and you need to create a file that has a list of input values to your program.  (For example, in this package, there is an example of a program called add, that takes two numbers and outputs their sum.)

The Program

The program should accept one line of input, and then create one line of output in response.  Then it should repeat this over again.  The basic structure of the program should be something like this:

main () {
  char input[1000], output[1000];
  setlinebuf(stdout);
  while ((fgets(input,1000,stdin)!=NULL) {
    do_computations(input,output);
    puts(output);
  }
  exit(0);
}

Be sure that output consists of a single line terminated by the sequence "\n\0".  The setlinebuf(stdout) is necessary, unless you use the -p option described below.  Your program should always produce some kind of output.  If the input data is invalid, you might like to have it some kind of error message.  The program may be a script.

The Enacting Program

On each computer where you wish to do the computations, you start the enact program as follows:

    enact freddy program-name dispense.domain 3000

where 3000 is replaced by whatever port number the dispense program is using, and "freddy" is a password so that the dispense program knows that it is communicating with a legitimate enact program.

You may run this program with some options:

The Dispensing Program

One computer serves as the central dispenser, which gives out the pieces of the problem.  On this computer, run the dispense program as follows:

    dispense freddy 3000 5 < input-file >> output-file

The word "freddy" is a password so that only legitimate enact programs can connect.  Replace 3000 with the port you want to use.  Generally any number between 1024 and 65535 should be OK.  If you get a "cannot bind" message when you try to start the dispense program, wait a minute, or try another port (see the remarks below).  The last number, in our case 5, is the minimum number of minutes to wait until the dispense program decides that an enact program has taken too long to solve a particular problem piece.

You may run this program with a couple of options.

Format of Output File

Each line in the input file will result in 3 lines in the output file, which will look something like this

Data from 1.2.3.4:11 (fred@dom.com) [Thu Dec  6 08:39:13 2001 -6:00]
input data
output data

(The identifier in parenthesis will only be there if the enact program is run with the -i option.)

At the beginning of the output file will be the single line

Starting [Thu Dec  6 08:30:56 2001 -6:00]

and when the dispense program finishes, it will put at the end of the output file the single line

Finished [Thu Dec  6 08:39:19 2001 -6:00]

If you want to see how far the project has progressed, count the number of lines in the output file by typing

    wc -l output-file

which, when the project is finished, will have two more than three times as many lines as the input file.

If you want to see which hosts have submitted data, and how many lines each has submitted, type

    check-ips output-file

Remarks

1.  Make sure that each problem piece takes a reasonable amount of time.  Otherwise you might flood your internet connection.  If your input file consist of some lines that will take a long time, and some lines that will take a small amount of time, you can mix up the input file using the randomize command:

    randomize < input-file > randomized-input-file

2.  The purpose of the password is not to provide any kind of real security --- rather if one is doing several projects, it helps to stop them getting mixed up (so you should give each project a different password).

3.  If the dispense program quits for some reason, on many versions of Unix, the port that it was using may not be available for a minute or so.  If you wish to restart the dispense program, you will get a "cannot bind" error message.  The solution is to wait  minute or two.

4.  The input lines should be unique.  If you require duplicate input lines (for example, maybe you are doing Monte Carlo integration), add an artificial extra parameter to each input line, simply to make them unique.

5.  The lines of input and output must be less than 50,000 characters.

6.  This program is early in its development, and it is the author's first exercise in threaded internet applications, so it may be buggy.

7.  This package includes a directory example-primes, which is a program to calculate the number of primes p for which p-a1,p-a2,...,p-ak are also prime, for selected sequences a1, a2,...,ak.  If it is only counting twin primes that you wish to do, you may be better off with the sophisticated code available at http://www.cs.rpi.edu/research/twinp/.

8.  This package also includes a directory example-polyominoes, which are programs to count the number of ways to place polyominoes into various shapes.  The code is derived from the package polyomino-0.4, available at http://faculty.missouri.edu/~stephen/software/#polyominoes, and was used to verify some of the results at http://www.xs4all.nl/~gp/PolyominoSolver/Polyomino.html.

What if the Programs Die or the Connection goes down?

If an enact program dies, that is not a problem.  If that enact program was working on a particular problem piece, the dispense program will wait a while, then give that piece to another enact program.  You may restart the enact program any time you like.

If the dispense program dies, the enact programs will keep retrying to connect every minute for up to two days.  Before restarting the dispense program, you want to weed the input file so that lines that have already been computed don't get done again.  So type

    filter-input -o output-file < input-file > new-input-file

to create a new input file.  (filter-input will also check for duplicate input lines.)

Optional Topics:

Killing the Project

If you simply wish to stop the dispense program, use the kill command.  If you wish to kill the project, that is, to have the dispense program continue to run, but for all the enact programs that contact it to quit, use the kill command as follows:

    kill -USR1 pid

where pid is the process identifier of the dispense program (use ps to find this process identifier).  On some systems, you may be able to achieve the same effect with the command

    killall -USR1 dispense

Security for the Dispensing Program

If your system supports it, it is possible to have incoming connections to dispense checked using tcp_wrappers.  Uncomment the line
#LIBWRAP=-DLIBWRAP -lwrap
in the Makefile before creating the software, and then run the dispense program with the -w option.  Refer to your local documentation on how to use tcp_wrappers on your system.

Make Your Own Enacting Programs

You can make your own special purpose enacting programs.  They go something like this (you write the do_computations function):

#include <dispense.h>

void do_computations(char *input, char *output);

int main(int argc, char **argv) {
  char *input=(void*)0;
  char output[1000];
  prepare_enact(argc,argv,"",0);
  while (1) {
    get_dispensed(&input);
    do_computations(input,output);
    send_enacted(output);
  }
}

Compile the program like this:

    cc enact-my-job.c -o enact-my-job -ldispense -pthread

Invoke the program thus:

    ./enact-my-job pass-key dispense.domain 3000

with the options -f, -n and -t if you like.

Catching the Output of the Dispense Program

The dispense program outputs some useful messages to standard error (for example, abortive attempts to connect, or how the dispense.allow file is interpreted).  Normally these messages will simply be output to the screen.  You can save these messages, depending upon which shell you are using.  If you are using sh or bash, type something like

    dispense freddy 3000 5<input-file>>output-file 2>dispense.log&

If you are using csh or tcsh, the following will work:

    (dispense freddy 3000 5<input-file>>output-file)>&dispense.log&

Note the use of >> instead of > in collecting the output to output-file.  This is appropriate if for some reason you are restarting dispense, as it appends to output-file instead of writing over it.  Note also the final & at the end of the command line --- this will cause dispense to run in the background.

Keeping the Queue Database when Stopping and Starting the Dispense Program

It is possible that you may wish to stop the dispense program, and then restart it (perhaps to change some parameter like the timeout, or to add more data to the input file).  This can mean that some computations are duplicated, because the program will lose its database of inputs sent out that have not received an input.  To this end, if you kill dispense with the kill command, dispense will send a list of all items in its queue to stderr.  You may store these values in a file, and then invoke dispense with the -q option with the name of this file.  This will greatly reduce the number of duplicated computations.  You can extract the most recent queue listed in your log file with the command:

    log-to-queue < dispense.log > queue-file

Each item in the resulting queue-file has two lines.  The first line is the actual input line.  The second line has two numbers, first how many minutes it has left before it times out (or -1 if it is ready for use), the second number is how many different enact programs have asked to work on this line.

You can remove the appropriate inputs with a command line something like:

    filter-input -o output-file -q queue_file < input-file > new-input-file

Sending intermediate results

There is another protocol for sending intermediate results.  This is useful if the calculation of each piece takes a very long time, and you want to get a glimpse of upcoming results as soon as possible.  This is only possible in a "home rolled" enact program.  This is done by calling the send_intermediate(char *s) function where s is a '\n' terminated string that you want to send.

How it Works (The Protocol)

Overview

The dispense and enact programs have a server-client relationship --- the dispense program acts as the server, and the enact program acts as the client.  Thus it is the enact program that initiates contact, both to request data to use as input to the program program-name (specified in the argument list of enact), and to send data obtained as output from program-name.

Note that all lines sent and received over the TCP/IP network by these programs must be terminated by the sequence "\r\n", otherwise the programs will not accept these lines.

The Enacting Program

When the enact program is started, it attempts to connect to the dispense.domain host on the appropriate port, as specified in the argument list.  If it does not succeed, it will keep trying again every minute, until two days are up, when the program will quit.  After it connects, it sends the lines

    connection from enact
    pass-key
    number

to dispense.domain where number is the requested timeout (0 if none is requested).  If the enact program is started with the -i option, then the first line is replaced by

    connection from enact: identifier

If the enact program wants data to work on, it sends the line

    request data

It then expects to receive one line of data, the input line.  If the input line is "Finished", the enact program quits.  If the input line is "No data" this means that there is no data available right now, but the project is not finished.  The enact program will wait a minute, and then ask again.

If the enact program wants to send the results of computations, it sends three lines

    sending data
    input data
    output data

If the enact program fails to send data or receive in any way, or if the TCP/IP connection is inactive for over a minute, the connection is broken.  When more communication is required, the connection is reestablished as described above.

If the enact is working with an internal queue of length greater than one, it may request or send data in batches.  The protocol for requesting data is then something like

    request data 6

when dispense should then send 6 lines of inputs.  These might include lines like "No data" or "Finished", in which case no more lines of input will be sent (even if it means sending less than 6 lines).  The protocol for sending data will be

    sending data 2
    input line 1
    output line 1
    input line 2
    output line 2

The maximum number of lines that may be requested or sent in this manner is 256.

The Dispensing Program

The dispense program attempts to dispense the various lines sent to it via stdin to the enact programs that connect to it.  After the enact programs have worked on each line of input, the dispense program collects back the outputs from the enact programs, and sends those outputs, with their corresponding inputs, to stdout.

The program maintains two internal databases, a queue of inputs queue_db, and a list of inputs ready to use, use_db.  Initially both of these databases are empty.  The items in queue_db have a number attached to them, which represents the number of minutes before that item should be moved to use_db.  Every minute, the dispense program runs a de_queue sub-program, which decreases these numbers in queue_db by one, and places those with a negative number into use_db.

The dispense program listens on the appropriate port.  When it receives a connection (and assuming that the connection passes the rules in dispense.allow, otherwise the connection is immediately terminated), it looks at the first three lines.  It expects the first line to be "connection from enact", the second line to match pass-key, and the third line to be a number which is a request timeout.  Then the program enters into a loop, providing or receiving data as requested, until either the client breaks the connection, or until the protocol is broken.

If next line is "request data", the program looks in its database use_db.  If there are any items there, it sends one of them down the connection.  If not, the dispense program gets a line from stdin, and if there is such a line, sends that.  If stdin is at the end of file, the dispense program repeatedly calls the de_queue sub-program until there is an item in use_db, which it then sends.  If the queue_db is also empty, the dispense program acts as follows depending upon whether the -c option has been set or not.  If the option is set, the program simply quits.  Otherwise the program sends the line "Finished".

Then the dispense program puts the sent data into queue_db, with an attached number equal to the timeout value specified in the argument list.

If the next line received is "sending data", the dispense program receives two more lines.  It checks the second line to see if it is in queue_db or use_db.  If that is the case, the item is removed from the database that it is in, and the received second and third lines are sent to stdout, along with information about which host send the data.

Querying Inputs

If the enact program has a large amount of queued data, it may ask the dispense program if certain input lines are still in the queue_db or use_db.  This is useful so that the enact program can check to see if certain input lines are still useful computations to make (perhaps because some other enact program may have finished working on it).  In this case, the enact program sends lines like

    query 3
    line 1

    line 2
    line 3

where this example is asking to check on three input lines.  (The maximum number of lines that may be queried at a time is 256.)  The dispense program then answers with a single line:

    yny

where each character ('y' for yes, 'n' for no) corresponds to each input line being queried.  (If enact wishes only to query one line, the number may be ommitted.)

Relinquishing Inputs

The enact program might decide the it does not have time to work on some of its inputs.  In that case it may relinquish its hold on input lines, so that other enact programs may immediately receive them (so we do not need to wait for them to timeout):

    relinquish data 3
    line 1
    line 2
    line 3

Again, if enact only wants to relinquish one line, then the number is not needed.

Sending Intermediate Results

The enact program can send intermediate results to the server program using lines like

    sending intermediate 2
    input line 1
    output line 1
    input line 2
    output line 2

As usual the number is not required if enact only wants to send one line.  These will appear in stdout of dispense as lines like

Intermediate from xx.xx.xx.xx [date-time]
input line
output line

License Terms

This program is free software, and available under a very non-restrictive BSD style license.  Read the file LICENSE for details.

History

This package is derived from special purpose programs written by Stephen Montgomery-Smith to solve certain combinatorial problems.  The first project was to compute the number of ways to place the 12 pentominoes and 5 tetrominoes in a 10x8 rectangle - the answer turned out to be 3,386,001,688.  (See http://www.xs4all.nl/~gp/PolyominoSolver/Polyomino.html for details.)