Copyright Tristan Aubrey-Jones May 2008.
Abstract: A project investigating and developing an implicitly concurrent programming language, based on a metaphor taken from the physical world is reported. Uses a programming paradigm where programs consist of systems of autonomous agents, or active objects which communicate via message passing. A language enhancing Java with actors and linear types is presented. Example programs are written, compiled, and executed to evaluate the usefulness of the language. The language found to provide a familiar notation for implicit parallelism, and a compelling new model for concurrency, combining the performance of shared variables with the elegance of message passing.
Introductory Slides (PDF),
Report (PDF),
ActiveJava compiler prototype (ajavac),
ActiveJava runtime library (ajava_lang).
Examples:
calc - pocket calculator actor program dining - dining philosophers actor program (never deadlocks) sort - parallel quicksort implementation ("SortBenchmark" sorts 10,000 random integers using actors, java threads, and sequentially and compares)To compile examples use:
compile.bat ./calc compile.bat ./sort compile.bat ./diningTo run examples use:
run ./calc Main run ./dining Main run ./dining Main fast run ./sort Main run ./sort SortingBenchmark
import org.taj.ajava.util.*;
import org.taj.ajava.lang.*;
public class DebugIntSorter extends Actor
{
private static final int MIN_PARTITION_SIZE = 200;
private int depth;
public DebugIntSorter()
{
this(0);
}
public DebugIntSorter(int depth)
{
this.depth = depth;
}
private static class IntegerArrayRequestMessage extends org.taj.ajava.runtime.ActorRequestMessage
{
public IntegerArray value;
public IntegerArrayRequestMessage(final org.taj.ajava.lang.Actor rsvp, final int reqId)
{
super(rsvp, reqId);
}
}
public class R1 extends org.taj.ajava.lang.Actor
{
int MSG_WAITING_COUNT;
R1(final DebugIntSorter THIS_LINK, final IntegerArrayRequestMessage requestMessage)
{
this.MSG_WAITING_COUNT = 0;
this.FORK_WAITING_COUNT = 1;
this.THIS_LINK = THIS_LINK;
this.requestMessage = requestMessage;
}
private void react_0(org.taj.ajava.runtime.Continue doContinuationMessage)
{
continuation();
}
public void deliver(org.taj.ajava.runtime.Continue doContinuationMessage)
{
bufferMessage(new org.taj.ajava.runtime.ActorMessage(doContinuationMessage, 0));
}
protected void react(org.taj.ajava.runtime.Continue doContinuationMessage)
{
react_0(doContinuationMessage);
}
void done()
{
THIS_LINK.deliver(new org.taj.ajava.runtime.UnblockActor());
}
public class R2 extends org.taj.ajava.lang.Actor
{
int MSG_WAITING_COUNT;
R2(final R1 OWNER_RECEPTIONIST)
{
this.MSG_WAITING_COUNT = 2;
this.OWNER_RECEPTIONIST = OWNER_RECEPTIONIST;
}
private void react_0(DebugIntSorter.Response responseMessage)
{
switch (responseMessage.getRequestID()) {
case 0:
{
parts[0] = responseMessage.value;
this.MSG_WAITING_COUNT--;
break;
}
case 1:
{
parts[1] = responseMessage.value;
this.MSG_WAITING_COUNT--;
break;
}
}
if (this.MSG_WAITING_COUNT <= 0) continuation();
}
public void deliver(DebugIntSorter.Response responseMessage)
{
bufferMessage(new org.taj.ajava.runtime.ActorMessage(responseMessage, 0));
}
protected void react(DebugIntSorter.Response responseMessage)
{
react_0(responseMessage);
}
private void react_1(org.taj.ajava.runtime.Continue doContinuationMessage)
{
continuation();
}
public void deliver(org.taj.ajava.runtime.Continue doContinuationMessage)
{
bufferMessage(new org.taj.ajava.runtime.ActorMessage(doContinuationMessage, 1));
}
protected void react(org.taj.ajava.runtime.Continue doContinuationMessage)
{
react_1(doContinuationMessage);
}
void done()
{
OWNER_RECEPTIONIST.deliver(new org.taj.ajava.runtime.ForkDone());
}
void continuation()
{
try
{
{
Stdout.getInstance().deliver(Integer.toString(depth) + ": Merging: " + parts[0].toString() + " and " + parts[1].toString() + "\n");
array.merge(parts);
Stdout.getInstance().deliver(Integer.toString(depth) + ": Merged into: " + array.toString() + "\n");
{
{
responseMessage = new Response(requestMessage);
}
responseMessage.value = array;
requestMessage.sendReply(responseMessage);
return;
}
}
}
finally
{
done();
}
}
Response responseMessage;
R1 OWNER_RECEPTIONIST;
protected void processMessage(org.taj.ajava.runtime.ActorMessage msg)
{
switch (msg.reactorId) {
case 0:
{
react_0(((DebugIntSorter.Response)msg.payload));
return;
}
case 1:
{
react_1(((org.taj.ajava.runtime.Continue)msg.payload));
return;
}
default:
{
super.processMessage(msg);
return;
}
}
}
}
void continuation()
{
{
{
{
array = requestMessage.value;
}
if (array.size() <= MIN_PARTITION_SIZE) {
SorterMethods.sortArray(array);
{
{
responseMessage = new Response(requestMessage);
}
responseMessage.value = array;
requestMessage.sendReply(responseMessage);
return;
}
}
else {
{
pivotIndex = SorterMethods.choosePivotIndex(array);
}
{
pivotNewIndex = SorterMethods.partitionArray(array, pivotIndex);
}
Stdout.getInstance().deliver(Integer.toString(depth) + ": Pivot Index: " + Integer.toString(pivotNewIndex) + "\n");
Stdout.getInstance().deliver(Integer.toString(depth) + ": Split : " + array.toString() + "\n");
{
indices = new int[1];
}
indices[0] = pivotNewIndex;
{
parts = array.split(indices);
}
Stdout.getInstance().deliver(Integer.toString(depth) + ": Split into: " + parts[0].toString() + " and " + parts[1].toString() + "\n");
{
lhs = new DebugIntSorter(depth + 1);
}
{
rhs = new DebugIntSorter(depth + 1);
}
{
R2 _R2 = new R2(this);
lhs.deliver(DebugIntSorter.Request.create(_R2, 0, parts[0]));
rhs.deliver(DebugIntSorter.Request.create(_R2, 1, parts[1]));
}
}
}
}
}
IntegerArray array;
Response responseMessage;
int pivotIndex;
int pivotNewIndex;
int[] indices;
IntegerArray[] parts;
DebugIntSorter lhs;
DebugIntSorter rhs;
int FORK_WAITING_COUNT;
private void react_1(org.taj.ajava.runtime.ForkDone responseMessage)
{
FORK_WAITING_COUNT--;
if (FORK_WAITING_COUNT <= 0) done();
}
public void deliver(org.taj.ajava.runtime.ForkDone responseMessage)
{
bufferMessage(new org.taj.ajava.runtime.ActorMessage(responseMessage, 1));
}
protected void react(org.taj.ajava.runtime.ForkDone responseMessage)
{
react_1(responseMessage);
}
DebugIntSorter THIS_LINK;
IntegerArrayRequestMessage requestMessage;
protected void processMessage(org.taj.ajava.runtime.ActorMessage msg)
{
switch (msg.reactorId) {
case 0:
{
react_0(((org.taj.ajava.runtime.Continue)msg.payload));
return;
}
case 1:
{
react_1(((org.taj.ajava.runtime.ForkDone)msg.payload));
return;
}
default:
{
super.processMessage(msg);
return;
}
}
}
}
private void react_0(IntegerArrayRequestMessage requestMessage)
{
blockActor();
{
R1 _R1 = new R1(this, requestMessage);
_R1.deliver(new org.taj.ajava.runtime.Continue());
}
}
public void deliver(IntegerArrayRequestMessage requestMessage)
{
bufferMessage(new org.taj.ajava.runtime.ActorMessage(requestMessage, 0));
}
protected void react(IntegerArrayRequestMessage requestMessage)
{
react_0(requestMessage);
}
public static class Request
{
public static IntegerArrayRequestMessage create(final org.taj.ajava.lang.Actor rsvp, final int reqId, final IntegerArray value)
{
IntegerArrayRequestMessage m = new IntegerArrayRequestMessage(rsvp, reqId);
m.value = value;
return m;
}
}
public static class Response extends org.taj.ajava.runtime.ActorResponseMessage
{
public IntegerArray value;
private Response(final org.taj.ajava.runtime.ActorRequestMessage request)
{
super(request);
}
}
protected void processMessage(org.taj.ajava.runtime.ActorMessage msg)
{
switch (msg.reactorId) {
case 0:
{
react_0(((IntegerArrayRequestMessage)msg.payload));
return;
}
default:
{
super.processMessage(msg);
return;
}
}
}
}