/* * Copyright (C) 2003 Peter Jonathan Klauser. All rights reserved. * * This software is published under the terms of the Jumpi Software * License version 1.1, a copy of which has been included with this * distribution in the LICENSE-jumpi.txt file. * */ package org.jumpi.examples.tcp; import org.jumpi.Destination; import org.jumpi.Handle; import org.jumpi.Jumpi; import org.jumpi.Selector; import org.jumpi.Status; import org.jumpi.examples.JumpiClientApplication; import org.jumpi.examples.JumpiClientApplicationRunner; import java.util.Hashtable; import java.util.Vector; /** * Receive messages from any number of Clients and send responses back. A large * delay of 1s per incoming message shows how a slow consumer can result in * message loss if the flowcontroltimeout positive and less than 1sRequestors. * A Selector is used to store all send and receive operation handles and * allows the SlowServer to do a blocking select. The SlowServer is * single-threaded handling received messages and responding with the same * Thread. */ public class SlowServer implements JumpiClientApplication { /** The runner. */ JumpiClientApplicationRunner runner_ = null; /** * Set the runner. * * @param runner the runner. */ public void setJumpiClientApplicationRunner( JumpiClientApplicationRunner runner) { runner_ = runner; } /** * Return the configuration values to use in doWork(). * * @return the configuration values to use. */ public Hashtable getConfiguration() { Hashtable config = new Hashtable(); config.put("org.jumpi.configurationloader", "org.jumpi.impl.ConfigurationLoader"); config.put("org.jumpi.controller", "ordering"); config.put("org.jumpi.controller.ordering.classname", "org.jumpi.impl.controller.ordering.OrderingController"); config.put("org.jumpi.controller.ordering.connector", "tcp"); config.put("org.jumpi.controller.ordering.connector.tcp.classname", "org.jumpi.impl.connector.tcp.TcpConnector"); config.put("org.jumpi.controller.ordering.connector.tcp.protocol", "tcp"); config.put("org.jumpi.controller.ordering.connector.tcp.servermode", "true"); config.put("org.jumpi.controller.ordering.connector.tcp.flowcontroltimeout", "-1"); config.put("org.jumpi.controller.ordering.connector.tcp.acceptor", "accept"); config.put("org.jumpi.controller.ordering.connector.tcp.acceptor.accept.classname", "org.jumpi.impl.connector.tcp.TcpConnectionAcceptor"); config.put("org.jumpi.controller.ordering.connector.tcp.acceptor.accept.port", "8888"); config.put("org.jumpi.controller.ordering.connector.tcp.acceptor.accept.address", "127.0.0.1"); config.put("org.jumpi.controller.ordering.connector.tcp.sendtransformer", "serialize"); config.put("org.jumpi.controller.ordering.connector.tcp.sendtransformer.serialize.classname", "org.jumpi.impl.transformer.serialization.SerializeTransformer"); config.put("org.jumpi.controller.ordering.connector.tcp.recvtransformer", "deserialize"); config.put("org.jumpi.controller.ordering.connector.tcp.recvtransformer.deserialize.classname", "org.jumpi.impl.transformer.serialization.DeserializeTransformer"); return config; } /** * Run the example code. * * @param j the started, configured Jumpi instance to use in the example. * * @throws Exception if any failure condition occurs. */ public void doWork(Jumpi j) throws Exception { // we keep a record of the last message sent from each peer Hashtable lastMessages = new Hashtable(); // We use a wildcard receive to accept from any port on the // local machine. Alternatively tcp://*:* for any host and any port. Destination anyone = j.getDestination("tcp://localhost:*"); // perform a receive to receive from anyone on the local host. Handle anyoneHdl = j.recv(anyone); // the selector holds all receiving handles Selector allHandles = j.getSelector(); // place the Handle receiving from anyone in the Selector. allHandles.addHandle(anyoneHdl); long startTime = System.currentTimeMillis(); long now = startTime; // loop for one minute while ((startTime + 60000) > now) { // asynchronously receive a message from a client. runner_.log("Starting to select."); allHandles.select(60000); // once a handle has reached a final non blocking state then // it should be removed, otherwise later selects never block. Vector readyToReceiveHandles = allHandles.removeNonBlockingHandles(); for (int i = 0; i < readyToReceiveHandles.size(); i++) { Handle hdl = (Handle) readyToReceiveHandles.get(i); if (hdl.isRecvHandle()) { // if the handle belongs to a receive operation // figure out who is the sender Destination sender = hdl.getSender(); Object message = hdl.getMessage(); if (hdl.getStatusCode() == Status.STATUS_SUCCESS) { // print out infos handleMessage(sender, message, lastMessages); // respond to the client - asynchronously, adding the handle to the selector Handle sendHdl = j.send(sender, "Received: " + message, allHandles); // simulate work being done synchronously which prohibits concurrent receives. Thread.currentThread().sleep(1000); // receive the next messages directly from the specific sender // add the receiving handle direcly to the selector. Handle peerHdl = j.recv(sender, allHandles); } else { // if we were unsuccessful receiving - then so what // the client will eventually reconnect. runner_.evaluateHandle(hdl); } if (hdl == anyoneHdl) { // we schedule another receive from any destination to // accept more new connections anyoneHdl = j.recv(anyone, allHandles); } } else { // we have a send operation finished - so just indicate the status runner_.evaluateHandle(hdl); } } now = System.currentTimeMillis(); } // after one minute - we go through all the remaining // handles and cancel them Vector remainingHandles = allHandles.getAllHandles(); for (int i = 0; i < remainingHandles.size(); i++) { Handle hdl = (Handle) remainingHandles.get(i); hdl.cancel(); } allHandles.clear(); } private void handleMessage(Destination sender, Object message, Hashtable lastMessages) { if (message instanceof Integer) { Integer msg = (Integer) message; Integer lastMsg = (Integer) lastMessages.remove(sender.getUrl()); if (lastMsg == null) { runner_.log("First message received from : " + sender.getUrl()); } else { if ((lastMsg.intValue() + 1) != msg.intValue()) { // missing some messages runner_.log("Missing messages from: " + lastMsg + " to : " + msg + " from : " + sender.getUrl()); } else { runner_.log("Inline message : " + msg + " from : " + sender.getUrl()); } } lastMessages.put(sender.getUrl(), msg); } else { runner_.log("Invalid message type received from: " + sender.getUrl()); } } } /* * Version History * $Log: SlowServer.java,v $ * Revision 1.2 2003/06/09 21:37:03 klp * CONFIG_LOADER_KEY refactored to be 'org.jumpi.configurationloader' property * * Revision 1.1 2003/05/20 19:40:59 klp * transfer to sf.net * */