Scenario
服務(wù)器1:客戶端n 發(fā)送Notification給客戶端的后處理
服務(wù)器端給第一個(gè)客戶端發(fā)送notification,然后在限定時(shí)間內(nèi),等待客戶端作出回應(yīng)—向服務(wù)器發(fā)送request。如果客戶端一直沒有回復(fù),服務(wù)器會(huì)在到達(dá)限定時(shí)間后,向第二個(gè)客戶端發(fā)送notification。如果客戶端在限定時(shí)間內(nèi)回復(fù),服務(wù)器端放棄再給其他客戶端發(fā)送消息。
Design
服務(wù)器、客戶端使用socket發(fā)送和接收信息
發(fā)送端存在一個(gè)客戶端列表,每次發(fā)送一給一個(gè)客戶端,發(fā)送后,向Helper發(fā)送添加該客戶端id的請(qǐng)求。
客戶端收到信息會(huì)向Helper發(fā)送刪除該id的請(qǐng)求。
Helper收到add時(shí),啟動(dòng)一個(gè)ScheduledExecutorService類的schedule,延時(shí)啟動(dòng)一個(gè)線程,并將該schedule緩存。remove時(shí),從緩存里取出schedule并停止它。如果在延時(shí)時(shí)間內(nèi),線程沒有被停止,它會(huì)被執(zhí)行:從緩存中取出,告訴服務(wù)器向下一個(gè)客戶端發(fā)送請(qǐng)求。
服務(wù)器1:客戶端n 發(fā)送Notification給客戶端的后處理
服務(wù)器端給第一個(gè)客戶端發(fā)送notification,然后在限定時(shí)間內(nèi),等待客戶端作出回應(yīng)—向服務(wù)器發(fā)送request。如果客戶端一直沒有回復(fù),服務(wù)器會(huì)在到達(dá)限定時(shí)間后,向第二個(gè)客戶端發(fā)送notification。如果客戶端在限定時(shí)間內(nèi)回復(fù),服務(wù)器端放棄再給其他客戶端發(fā)送消息。
Design
服務(wù)器、客戶端使用socket發(fā)送和接收信息
發(fā)送端存在一個(gè)客戶端列表,每次發(fā)送一給一個(gè)客戶端,發(fā)送后,向Helper發(fā)送添加該客戶端id的請(qǐng)求。
客戶端收到信息會(huì)向Helper發(fā)送刪除該id的請(qǐng)求。
Helper收到add時(shí),啟動(dòng)一個(gè)ScheduledExecutorService類的schedule,延時(shí)啟動(dòng)一個(gè)線程,并將該schedule緩存。remove時(shí),從緩存里取出schedule并停止它。如果在延時(shí)時(shí)間內(nèi),線程沒有被停止,它會(huì)被執(zhí)行:從緩存中取出,告訴服務(wù)器向下一個(gè)客戶端發(fā)送請(qǐng)求。
UML
Code


public class Server {
public static void main(String[] args) throws Exception {
final String id = "100";
ServerSocket serverSocket = new ServerSocket(IO.BIO_TCP_PORT);
System.out.println("Server is listening on port: " + IO.BIO_TCP_PORT);
Socket socket = null;
try {
socket = serverSocket.accept();
} catch (Exception e) {
System.out.println("accept socket error.");
}
SendingNotification sender = new SendingNotification(id, socket);
sender.start();
ReceivingRequest receiver=new ReceivingRequest(socket);
receiver.start();
}


public class SendingNotification extends Thread {
private String id;
private Socket socket;
public SendingNotification(String sdId, Socket socket) {
this.id = sdId;
this.socket = socket;
}
@Override
public void run() {
Helper.getInstance().add(id);
OutputStream outputStream = null;
byte[] buffer = new byte[1024];
try {
outputStream = socket.getOutputStream();
buffer = (id+"\n").getBytes();
outputStream.write(buffer);
outputStream.flush();
} catch (Exception e) {
System.out.println("don't send success");
try {
outputStream.close();
socket.close();
} catch (Exception e1) {
}
}
}
}


public class ReceivingRequest extends Thread {
private Socket socket;
public ReceivingRequest(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
BufferedReader in;
boolean finished = false;
while (!finished) {
try {
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String line = in.readLine();
if (line == null) {
Thread.sleep(100);
continue;
}
Helper.getInstance().remove(line);
in.close();
socket.close();
finished = true;
} catch (Exception e) {
System.out.println("receive fails to run.");
}
}
}
}


public class Helper {
private static Helper instance = new Helper();
private ConcurrentHashMap<String, Schedule> cache = new ConcurrentHashMap<String, Schedule>();
private int timeout = 10;
public static Helper getInstance() {
return instance;
}
private Schedule addTask(final String id) {
final Schedule schedule = new Schedule();
schedule.schedule(new Runnable() {
public void run() {
doNext(id);
schedule.shutdown();
}
}, timeout, SECONDS);
return schedule;
}
private void doNext(String id) {
Schedule schedule = cache.remove(id);
System.out.println("time out and do next well.");
System.out.println("total time=" + schedule.getSeconds());
}
public void add(final String id) {
Schedule schedule = addTask(id);
cache.put(id, schedule);
System.out.println("Add to cache successfully");
}
public void remove(final String id) {
Schedule schedule = cache.remove(id);
if (schedule == null)
System.out.println("no schedule exist.");
else {
schedule.shutdown();
System.out.println("Remove to cache successfully");
}
}


public class Schedule {
ScheduledExecutorService excutor;
private long startTime;
public Schedule() {
excutor = Executors.newSingleThreadScheduledExecutor();
startTime = System.currentTimeMillis();
}
public long getTotalTime() {
long endTime = System.currentTimeMillis();
return endTime - startTime;
}
public String getSeconds() {
long s = getTotalTime() / 1000;
return s + " seconds";
}
public void schedule(Runnable command, long delay, TimeUnit unit) {
excutor.schedule(command, delay, unit);
}
public void shutdown() {
excutor.shutdownNow();
}
}


public class Client {
public static void main(String[] args) {
Socket socket;
try {
socket = new Socket(IO.SERVER_IP, IO.BIO_TCP_PORT);
readLine(socket);
} catch (UnknownHostException e) {
} catch (IOException e) {
} catch (InterruptedException e) {
}
}
private static void readLine(Socket socket) throws IOException, InterruptedException {
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
boolean flag = true;
while (flag) {
String command = in.readLine();
if (command == null) {
flag = true;
continue;
} else {
//Thread.sleep(2000);
out.println(command);
out.flush();
out.close();
in.close();
socket.close();
flag = false;
}
}
}


public interface IO {
String SERVER_IP = "127.0.0.1";//"192.168.225.166";
int BIO_TCP_PORT = 9109;