ActiveJava

Copyright Tristan Aubrey-Jones May 2008.

Abstract: A project investigating and developing an implicitly concurrent programming language, based on a metaphor taken from the physical world is reported. Uses a programming paradigm where programs consist of systems of autonomous agents, or active objects which communicate via message passing. A language enhancing Java with actors and linear types is presented. Example programs are written, compiled, and executed to evaluate the usefulness of the language. The language found to provide a familiar notation for implicit parallelism, and a compelling new model for concurrency, combining the performance of shared variables with the elegance of message passing.

Introductory Slides (PDF), Report (PDF),
ActiveJava compiler prototype (ajavac), ActiveJava runtime library (ajava_lang).

Examples:

calc - pocket calculator actor program
dining - dining philosophers actor program (never deadlocks)
sort - parallel quicksort implementation ("SortBenchmark" sorts 10,000 random integers using actors, java threads, and sequentially and compares)
To compile examples use:
compile.bat ./calc
compile.bat ./sort
compile.bat ./dining
To run examples use:
run ./calc Main
run ./dining Main
run ./dining Main fast
run ./sort Main
run ./sort SortingBenchmark

DebugIntSorter.java

home Home   up Up   ( Download )


import org.taj.ajava.util.*; import org.taj.ajava.lang.*; public class DebugIntSorter extends Actor { private static final int MIN_PARTITION_SIZE = 200; private int depth; public DebugIntSorter() { this(0); } public DebugIntSorter(int depth) { this.depth = depth; } private static class IntegerArrayRequestMessage extends org.taj.ajava.runtime.ActorRequestMessage { public IntegerArray value; public IntegerArrayRequestMessage(final org.taj.ajava.lang.Actor rsvp, final int reqId) { super(rsvp, reqId); } } public class R1 extends org.taj.ajava.lang.Actor { int MSG_WAITING_COUNT; R1(final DebugIntSorter THIS_LINK, final IntegerArrayRequestMessage requestMessage) { this.MSG_WAITING_COUNT = 0; this.FORK_WAITING_COUNT = 1; this.THIS_LINK = THIS_LINK; this.requestMessage = requestMessage; } private void react_0(org.taj.ajava.runtime.Continue doContinuationMessage) { continuation(); } public void deliver(org.taj.ajava.runtime.Continue doContinuationMessage) { bufferMessage(new org.taj.ajava.runtime.ActorMessage(doContinuationMessage, 0)); } protected void react(org.taj.ajava.runtime.Continue doContinuationMessage) { react_0(doContinuationMessage); } void done() { THIS_LINK.deliver(new org.taj.ajava.runtime.UnblockActor()); } public class R2 extends org.taj.ajava.lang.Actor { int MSG_WAITING_COUNT; R2(final R1 OWNER_RECEPTIONIST) { this.MSG_WAITING_COUNT = 2; this.OWNER_RECEPTIONIST = OWNER_RECEPTIONIST; } private void react_0(DebugIntSorter.Response responseMessage) { switch (responseMessage.getRequestID()) { case 0: { parts[0] = responseMessage.value; this.MSG_WAITING_COUNT--; break; } case 1: { parts[1] = responseMessage.value; this.MSG_WAITING_COUNT--; break; } } if (this.MSG_WAITING_COUNT <= 0) continuation(); } public void deliver(DebugIntSorter.Response responseMessage) { bufferMessage(new org.taj.ajava.runtime.ActorMessage(responseMessage, 0)); } protected void react(DebugIntSorter.Response responseMessage) { react_0(responseMessage); } private void react_1(org.taj.ajava.runtime.Continue doContinuationMessage) { continuation(); } public void deliver(org.taj.ajava.runtime.Continue doContinuationMessage) { bufferMessage(new org.taj.ajava.runtime.ActorMessage(doContinuationMessage, 1)); } protected void react(org.taj.ajava.runtime.Continue doContinuationMessage) { react_1(doContinuationMessage); } void done() { OWNER_RECEPTIONIST.deliver(new org.taj.ajava.runtime.ForkDone()); } void continuation() { try { { Stdout.getInstance().deliver(Integer.toString(depth) + ": Merging: " + parts[0].toString() + " and " + parts[1].toString() + "\n"); array.merge(parts); Stdout.getInstance().deliver(Integer.toString(depth) + ": Merged into: " + array.toString() + "\n"); { { responseMessage = new Response(requestMessage); } responseMessage.value = array; requestMessage.sendReply(responseMessage); return; } } } finally { done(); } } Response responseMessage; R1 OWNER_RECEPTIONIST; protected void processMessage(org.taj.ajava.runtime.ActorMessage msg) { switch (msg.reactorId) { case 0: { react_0(((DebugIntSorter.Response)msg.payload)); return; } case 1: { react_1(((org.taj.ajava.runtime.Continue)msg.payload)); return; } default: { super.processMessage(msg); return; } } } } void continuation() { { { { array = requestMessage.value; } if (array.size() <= MIN_PARTITION_SIZE) { SorterMethods.sortArray(array); { { responseMessage = new Response(requestMessage); } responseMessage.value = array; requestMessage.sendReply(responseMessage); return; } } else { { pivotIndex = SorterMethods.choosePivotIndex(array); } { pivotNewIndex = SorterMethods.partitionArray(array, pivotIndex); } Stdout.getInstance().deliver(Integer.toString(depth) + ": Pivot Index: " + Integer.toString(pivotNewIndex) + "\n"); Stdout.getInstance().deliver(Integer.toString(depth) + ": Split : " + array.toString() + "\n"); { indices = new int[1]; } indices[0] = pivotNewIndex; { parts = array.split(indices); } Stdout.getInstance().deliver(Integer.toString(depth) + ": Split into: " + parts[0].toString() + " and " + parts[1].toString() + "\n"); { lhs = new DebugIntSorter(depth + 1); } { rhs = new DebugIntSorter(depth + 1); } { R2 _R2 = new R2(this); lhs.deliver(DebugIntSorter.Request.create(_R2, 0, parts[0])); rhs.deliver(DebugIntSorter.Request.create(_R2, 1, parts[1])); } } } } } IntegerArray array; Response responseMessage; int pivotIndex; int pivotNewIndex; int[] indices; IntegerArray[] parts; DebugIntSorter lhs; DebugIntSorter rhs; int FORK_WAITING_COUNT; private void react_1(org.taj.ajava.runtime.ForkDone responseMessage) { FORK_WAITING_COUNT--; if (FORK_WAITING_COUNT <= 0) done(); } public void deliver(org.taj.ajava.runtime.ForkDone responseMessage) { bufferMessage(new org.taj.ajava.runtime.ActorMessage(responseMessage, 1)); } protected void react(org.taj.ajava.runtime.ForkDone responseMessage) { react_1(responseMessage); } DebugIntSorter THIS_LINK; IntegerArrayRequestMessage requestMessage; protected void processMessage(org.taj.ajava.runtime.ActorMessage msg) { switch (msg.reactorId) { case 0: { react_0(((org.taj.ajava.runtime.Continue)msg.payload)); return; } case 1: { react_1(((org.taj.ajava.runtime.ForkDone)msg.payload)); return; } default: { super.processMessage(msg); return; } } } } private void react_0(IntegerArrayRequestMessage requestMessage) { blockActor(); { R1 _R1 = new R1(this, requestMessage); _R1.deliver(new org.taj.ajava.runtime.Continue()); } } public void deliver(IntegerArrayRequestMessage requestMessage) { bufferMessage(new org.taj.ajava.runtime.ActorMessage(requestMessage, 0)); } protected void react(IntegerArrayRequestMessage requestMessage) { react_0(requestMessage); } public static class Request { public static IntegerArrayRequestMessage create(final org.taj.ajava.lang.Actor rsvp, final int reqId, final IntegerArray value) { IntegerArrayRequestMessage m = new IntegerArrayRequestMessage(rsvp, reqId); m.value = value; return m; } } public static class Response extends org.taj.ajava.runtime.ActorResponseMessage { public IntegerArray value; private Response(final org.taj.ajava.runtime.ActorRequestMessage request) { super(request); } } protected void processMessage(org.taj.ajava.runtime.ActorMessage msg) { switch (msg.reactorId) { case 0: { react_0(((IntegerArrayRequestMessage)msg.payload)); return; } default: { super.processMessage(msg); return; } } } }