Java Collaborative Computing
              Java collaborative computing across computers.
 
 
  Load Balance, Failover Web Cluster  
 
In this example we will build a cluster for a web application. It is made up by one master node which run tomcat and several slave nodes which run some time consuming services. When a http request comes, the master node will pick up a slave node and make a remote procedure call to do the calculation work. If this node failed, it will try another one.
To let the master know that slaves join or leaving, we need to implement a message receiver on the master node.
 
  Master Node  
  There will 4 classes run on master node:
  • Test: the servlet
  • NodeStatus: record slave node status
  • HellReceiver: receive hello message when slave node joined in.
  • ByeReceiver: receive bye message when slave node leave.
 
  Step 1: Record node status  
  We need to record slave node status on the master nodes.
public class NodeStatus {

	public static NodeStatus[] nodes = new NodeStatus[256];

	public int running = 0;//how many rpc calls currently run on this nodes
	public int successCount = 1;//RPCs executed on this node successfully.
	public int failCount = 0;//RPCs executed on this node failed.
	
	public boolean isAlive = true;//if this node is available
	private String host;//name of the host, needed by the rpc
	
	public NodeStatus(String host) {
		this.host = host;
	}

	public String getHost() {
		return host;
	}

	public boolean isAlive() {
		return isAlive;
	}

	public void recordCall(){
		running++;
	}
	
	public void recordSuccess(){
		running--;
		successCount++;
	}
	
	public void recordFail(){
		running--;
		failCount++;
		//if RPCs failed too many times on this node, the node may be failed.
		if(failCount*1.0/successCount>0.5){
			isAlive = false;
		}
	}
}
 
  Step 2: Create message receiver  
  When nodes join/leave the cluster, they will send out a message. When master node know there is slave node join/leave, it will add/remove nodestatus accordingly.
//Receiver for message when a node joined in.
public class HelloReceiver implements MessageReceiver {

	public static final int HELLO_SLOT = 0;
	
	@Override
	public int getSlot() {
		return HELLO_SLOT;
	}

	@Override
	public void receive(Message msg) {
		//The slave node joined will send a message including its host name.
		String host = msg.getData().toString();
		//The source of the message is actually the ip address of the host.
		NodeStatus.nodes[msg.getSource()&0xFF] = new NodeStatus(host);
		System.out.println(host + " connected");
	}

}
//Receiver for message when a node leave.
public class ByeReceiver implements MessageReceiver {

	public static final int BYE_SLOT = 1;
	
	@Override
	public int getSlot() {
		return BYE_SLOT;
	}

	@Override
	public void receive(Message msg) {
		NodeStatus.nodes[msg.getSource()&0xFF] = null;
	}

}
 
  Step 3: Create the servlet  
 
/**
 * Servlet implementation class Test
 */
public class Test extends HttpServlet {
	private static final long serialVersionUID = 1L;
	
    public Test() {
        super();
    }

	public void init(ServletConfig config) throws ServletException {
	}

	protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
	}

	protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
		doGet(request, response);
	}

}
 
  Step 4: Initialize EndPoint in the init(ServletConfig config) method  
  We also need a RemoteProcedureRegistry field to make rpc calls
public class Test extends HttpServlet {
	RemoteProcedureRegistry rpcService;
	...
	public void init(ServletConfig config) throws ServletException {
        try {
			Configuration.load(new File(config.getInitParameter("conf")));
		} catch (IOException e) {
			e.printStackTrace();
			return;
		}
		Configuration.print();
        EndPoint endPoint = EndPoint.getInstance();
        MessageService msgService = endPoint.getMessageService();
		msgService.registerReceiver(new HelloReceiver());
        msgService.registerReceiver(new ByeReceiver());
        rpcService = endPoint.getRemoteProcedureRegistry();
	}
	...
}
		
 
  Step 5: Strategy to pick a slave node  
  The strategy to dispatch request are vary in different load balance clusters. Here we using a new way: pick currently runs least task nodes.
public class Test extends HttpServlet {
	...
	private NodeStatus getNode(Collection<NodeStatus> excludes) {
		int count = Integer.MAX_VALUE;
		int id = -1;
		NodeStatus[] nodes = NodeStatus.nodes;
		for(int i=0;i<nodes.length;i++){
			NodeStatus n = nodes[i];
			if(n != null && !excludes.contains(n)&&n.isAlive()){
				if(n.running == 0){
					return nodes[i];
				}
				if(n.running<count){
					count = n.running;
					id = i;
				}
			}
		}
		if(id == -1){
			return null;
		}else{
			return nodes[id];
		}
	}
	...
}
 
  Step 6: Call the rpc in get() method and the whole Servlet class  
 
public class Test extends HttpServlet {
	private static final long serialVersionUID = 1L;
	
	RemoteProcedureRegistry rpcService;
       
    public Test() {
        super();
    }

	public void init(ServletConfig config) throws ServletException {
        try {
			Configuration.load(new File(config.getInitParameter("conf")));
		} catch (IOException e) {
			e.printStackTrace();
			return;
		}
		Configuration.print();
        EndPoint endPoint = EndPoint.getInstance();
        MessageService msgService = endPoint.getMessageService();
		msgService.registerReceiver(new HelloReceiver());
        msgService.registerReceiver(new ByeReceiver());
        rpcService = endPoint.getRemoteProcedureRegistry();
	}

	protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
		List args = extractArgs(request);
		Collection<NodeStatus> excludes = new ArrayList();
		for(int i=0;i<3;i++){
			NodeStatus node = getNode(excludes);
			if(node == null){
				response.getWriter().write("Server is busy.");//all nodes are busy or temporally unavailable
				return;
			}else{
				try {
					node.recordCall();
					Object result = rpcService.call(node.getHost(), ExampleProc.NAME, args);
					node.recordSuccess();
					extractResult(result, response);
					return;
				} catch (RemoteException e) {
					e.printStackTrace();
					node.recordFail();
					excludes.add(node);
				}
			}
		}
	}

	private NodeStatus getNode(Collection<NodeStatus> excludes) {
		int count = Integer.MAX_VALUE;
		int id = -1;
		NodeStatus[] nodes = NodeStatus.nodes;
		for(int i=0;i<nodes.length;i++){
			NodeStatus n = nodes[i];
			if(n != null && !excludes.contains(n)&&n.isAlive()){
				if(n.running == 0){
					return nodes[i];
				}
				if(n.running<count){
					count = n.running;
					id = i;
				}
			}
		}
		if(id == -1){
			return null;
		}else{
			return nodes[id];
		}
	}

	private void extractResult(Object result, HttpServletResponse response) throws IOException {
		response.getWriter().write(result.toString());
	}

	private List extractArgs(HttpServletRequest request) {
		ArrayList arr = new ArrayList();
		arr.add(request.getParameter("name"));
		return arr;
	}

	protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
		doGet(request, response);
	}

}
		
 
  Slave Node  
  Function of slave node is much more simple. It just response to the rpc calls. There are 2 classes in the slave node:
  • ExampleProc: a RemoteProcedure class to response calls from master
  • Main: class to instance the EndPoint and register the RemoteProcedure.
 
  Step 1: the RemoteProcedure  
 
	public static final String NAME = "exampleProc";
	private String name;//the name of this node
	
	public ExampleProc(String name){
		this.name = name;
	}

	@SuppressWarnings("unchecked")
	@Override
	public Object call(List args) throws RemoteException {
		try {
			Thread.sleep(3000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return name + ":Hello, " + args.get(0);
	}

	@Override
	public String getName() {
		return NAME;
	}

}
		
 
  Step 2: the Main class  
 
public class Main {

	public static void main(String[] args) {
		try {
			Configuration.load(new File(args[0]));
		} catch (IOException e) {
			e.printStackTrace();
		}
		Configuration.print();
		EndPoint endPoint = EndPoint.getInstance();
		String name = endPoint.getMembers().getLocal().getName();
		endPoint.getRemoteProcedureRegistry().register(new ExampleProc(name));//register rpc procedure
		try {
			//Telling master i'm ready.
			//Note that it is a broadcast message sent to all members. So you need't to know where the master is.
			endPoint.getMessageService().sendMessage(new StringMessage(HelloReceiver.HELLO_SLOT, name));
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

}
		
 
  Setup  
      First you should start the master node. You have to visit the servlet first to make the EndPoint instance initialized. You will see the message 'Server is busy' on the page. That's right because there is no cluster node available.
    Then run the Main class on the slave node. You can see the message "<slave node name> connected" being printed on the master node. Now the servlet is ready to accept request.
    To make a slave node offline you should send out a goodbye message on slave node. Master node will stop send request to it(Not included in this example). Then you can turn it down and do some maintenance work.