ActiveJava

Copyright Tristan Aubrey-Jones May 2008.

Abstract: A project investigating and developing an implicitly concurrent programming language, based on a metaphor taken from the physical world is reported. Uses a programming paradigm where programs consist of systems of autonomous agents, or active objects which communicate via message passing. A language enhancing Java with actors and linear types is presented. Example programs are written, compiled, and executed to evaluate the usefulness of the language. The language found to provide a familiar notation for implicit parallelism, and a compelling new model for concurrency, combining the performance of shared variables with the elegance of message passing.

Introductory Slides (PDF), Report (PDF),
ActiveJava compiler prototype (ajavac), ActiveJava runtime library (ajava_lang).

Examples:

calc - pocket calculator actor program
dining - dining philosophers actor program (never deadlocks)
sort - parallel quicksort implementation ("SortBenchmark" sorts 10,000 random integers using actors, java threads, and sequentially and compares)
To compile examples use:
compile.bat ./calc
compile.bat ./sort
compile.bat ./dining
To run examples use:
run ./calc Main
run ./dining Main
run ./dining Main fast
run ./sort Main
run ./sort SortingBenchmark

IntSorter.java

home Home   up Up   ( Download )



import org.taj.ajava.util.*;
import org.taj.ajava.lang.*;

public class IntSorter extends Actor
{
   
private static class IntegerArrayRequestMessage extends org.taj.ajava.runtime.ActorRequestMessage
   
{
       
public IntegerArray value;
       
public IntegerArrayRequestMessage(final org.taj.ajava.lang.Actor rsvp, final int reqId)
       
{
           
super(rsvp, reqId);
       
}
   
}
   
public class R3 extends org.taj.ajava.lang.Actor
   
{
       
int MSG_WAITING_COUNT;
        R3
(final IntSorter THIS_LINK, final IntegerArrayRequestMessage requestMessage)
       
{
           
this.MSG_WAITING_COUNT = 0;
           
this.FORK_WAITING_COUNT = 1;
           
this.THIS_LINK = THIS_LINK;
           
this.requestMessage = requestMessage;
       
}
       
private void react_0(org.taj.ajava.runtime.Continue doContinuationMessage)
       
{
            continuation
();
       
}
       
public void deliver(org.taj.ajava.runtime.Continue doContinuationMessage)
       
{
            bufferMessage
(new org.taj.ajava.runtime.ActorMessage(doContinuationMessage, 0));
       
}
       
protected void react(org.taj.ajava.runtime.Continue doContinuationMessage)
       
{
            react_0
(doContinuationMessage);
       
}
       
void done()
       
{
            THIS_LINK
.deliver(new org.taj.ajava.runtime.UnblockActor());
       
}
       
public class R4 extends org.taj.ajava.lang.Actor
       
{
           
int MSG_WAITING_COUNT;
            R4
(final R3 OWNER_RECEPTIONIST)
           
{
               
this.MSG_WAITING_COUNT = 2;
               
this.OWNER_RECEPTIONIST = OWNER_RECEPTIONIST;
           
}
           
private void react_0(IntSorter.Response responseMessage)
           
{
               
switch (responseMessage.getRequestID()) {
                   
case 0:
                   
{
                        parts
[0] = responseMessage.value;
                       
this.MSG_WAITING_COUNT--;
                       
break;
                   
}
                   
case 1:
                   
{
                        parts
[1] = responseMessage.value;
                       
this.MSG_WAITING_COUNT--;
                       
break;
                   
}
               
}
               
if (this.MSG_WAITING_COUNT <= 0)                 continuation();

           
}
           
public void deliver(IntSorter.Response responseMessage)
           
{
                bufferMessage
(new org.taj.ajava.runtime.ActorMessage(responseMessage, 0));
           
}
           
protected void react(IntSorter.Response responseMessage)
           
{
                react_0
(responseMessage);
           
}
           
private void react_1(org.taj.ajava.runtime.Continue doContinuationMessage)
           
{
                continuation
();
           
}
           
public void deliver(org.taj.ajava.runtime.Continue doContinuationMessage)
           
{
                bufferMessage
(new org.taj.ajava.runtime.ActorMessage(doContinuationMessage, 1));
           
}
           
protected void react(org.taj.ajava.runtime.Continue doContinuationMessage)
           
{
                react_1
(doContinuationMessage);
           
}
           
void done()
           
{
                OWNER_RECEPTIONIST
.deliver(new org.taj.ajava.runtime.ForkDone());
           
}
           
void continuation()
           
{
               
try
               
{
                   
{
                        array
.merge(parts);
                       
{
                           
{
                                responseMessage
= new Response(requestMessage);
                           
}
                            responseMessage
.value = array;
                            requestMessage
.sendReply(responseMessage);
                           
return;
                       
}
                   
}
               
}
               
finally
               
{
                   
done();
               
}
           
}
           
Response responseMessage;
            R3 OWNER_RECEPTIONIST
;
           
protected void processMessage(org.taj.ajava.runtime.ActorMessage msg)
           
{
               
switch (msg.reactorId) {
                   
case 0:
                   
{
                        react_0
(((IntSorter.Response)msg.payload));
                       
return;
                   
}
                   
case 1:
                   
{
                        react_1
(((org.taj.ajava.runtime.Continue)msg.payload));
                       
return;
                   
}
                   
default:
                   
{
                       
super.processMessage(msg);
                       
return;
                   
}
               
}
           
}
       
}
       
void continuation()
       
{
           
{
               
{
                   
{
                        array
= requestMessage.value;
                   
}
                   
if (array.size() <= SorterMethods.MIN_PARTITION_SIZE)                     {
                       
SorterMethods.sortArray(array);
                       
{
                           
{
                                responseMessage
= new Response(requestMessage);
                           
}
                            responseMessage
.value = array;
                            requestMessage
.sendReply(responseMessage);
                           
return;
                       
}
                   
}

                   
else                     {
                       
{
                            pivotIndex
= SorterMethods.choosePivotIndex(array);
                       
}
                       
{
                            pivotNewIndex
= SorterMethods.partitionArray(array, pivotIndex);
                       
}
                       
{
                            indices
= new int[1];
                       
}
                        indices
[0] = pivotNewIndex;
                       
{
                            parts
= array.split(indices);
                       
}
                       
{
                            lhs
= new IntSorter();
                       
}
                       
{
                            rhs
= new IntSorter();
                       
}
                       
{
                            R4 _R4
= new R4(this);
                            lhs
.deliver(IntSorter.Request.create(_R4, 0, parts[0]));
                            rhs
.deliver(IntSorter.Request.create(_R4, 1, parts[1]));
                       
}
                   
}

               
}
           
}
       
}
       
IntegerArray array;
       
Response responseMessage;
       
int pivotIndex;
       
int pivotNewIndex;
       
int[] indices;
       
IntegerArray[] parts;
       
IntSorter lhs;
       
IntSorter rhs;
       
int FORK_WAITING_COUNT;
       
private void react_1(org.taj.ajava.runtime.ForkDone responseMessage)
       
{
            FORK_WAITING_COUNT
--;
           
if (FORK_WAITING_COUNT <= 0)             done();

       
}
       
public void deliver(org.taj.ajava.runtime.ForkDone responseMessage)
       
{
            bufferMessage
(new org.taj.ajava.runtime.ActorMessage(responseMessage, 1));
       
}
       
protected void react(org.taj.ajava.runtime.ForkDone responseMessage)
       
{
            react_1
(responseMessage);
       
}
       
IntSorter THIS_LINK;
       
IntegerArrayRequestMessage requestMessage;
       
protected void processMessage(org.taj.ajava.runtime.ActorMessage msg)
       
{
           
switch (msg.reactorId) {
               
case 0:
               
{
                    react_0
(((org.taj.ajava.runtime.Continue)msg.payload));
                   
return;
               
}
               
case 1:
               
{
                    react_1
(((org.taj.ajava.runtime.ForkDone)msg.payload));
                   
return;
               
}
               
default:
               
{
                   
super.processMessage(msg);
                   
return;
               
}
           
}
       
}
   
}
   
private void react_0(IntegerArrayRequestMessage requestMessage)
   
{
        blockActor
();
       
{
            R3 _R3
= new R3(this, requestMessage);
            _R3
.deliver(new org.taj.ajava.runtime.Continue());
       
}
   
}
   
public void deliver(IntegerArrayRequestMessage requestMessage)
   
{
        bufferMessage
(new org.taj.ajava.runtime.ActorMessage(requestMessage, 0));
   
}
   
protected void react(IntegerArrayRequestMessage requestMessage)
   
{
        react_0
(requestMessage);
   
}
   
public static class Request
   
{
       
public static IntegerArrayRequestMessage create(final org.taj.ajava.lang.Actor rsvp, final int reqId, final IntegerArray value)
       
{
           
IntegerArrayRequestMessage m = new IntegerArrayRequestMessage(rsvp, reqId);
            m
.value = value;
           
return m;
       
}
   
}
   
public static class Response extends org.taj.ajava.runtime.ActorResponseMessage
   
{
       
public IntegerArray value;
       
private Response(final org.taj.ajava.runtime.ActorRequestMessage request)
       
{
           
super(request);
       
}
   
}
   
protected void processMessage(org.taj.ajava.runtime.ActorMessage msg)
   
{
       
switch (msg.reactorId) {
           
case 0:
           
{
                react_0
(((IntegerArrayRequestMessage)msg.payload));
               
return;
           
}
           
default:
           
{
               
super.processMessage(msg);
               
return;
           
}
       
}
   
}
}