1 Overview
??? JGroups是一個用于建立可靠的組播通信的工具包(這里指的組播并不一定是IP
Multicast,JGroups同樣支持使用TCP作為傳輸協議)。其中可靠性是指通過適當的配置可以保證:消息在傳輸的過程中不會丟失;所有的接收
者以相同的順序接受所有的消息;原子性:一個消息要么被所有的接收者接收,要么不被任何一個接收者都接收。目前在JBoss Application
Server Clustering,OSCache Clustering,Jetty HTTP session replication,?
Tomcat HTTP session replication中都使用了JGroups。
|
Unreliable | Reliable |
Unicast | UDP | TCP |
Multicast | IP Multicast | JGroups |
??? TCP和UDP是單播(Unicast)協議,也就是說:發送者和每一接收者之間是點對點傳輸。
如果一個發送者希望向多個接收者傳輸相同的數據,那么必須相應的復制多份數據。TCP是可靠的傳輸協議,但UDP不是可靠的,也就是說報文在傳輸的過程中
可能丟失、重復或著亂序,報文的最大尺寸也有限制。IP Multicast可以將消息同時發送到多個接收者。由于IP
Multicast是基于UDP的,因此IP Multicast是不可靠的。IP
Multicast需要一個特殊的組播地址,它是一組D類IP地址,范圍從224.0.0.0 到
239.255.255.255,其中有一部分地址是為特殊的目的保留的。JGroups使用UDP (IP
Multicast)、TCP、JMS作為傳輸協議。JGroups最強大的功能之一是提供了靈活的,可定制的協議棧,以滿足不同的需求。例如,如果選擇
使用IP
Multicast作為傳輸協議,那么為了防止報文丟失和重復,可以在協議棧中添加NAKACK協議;為了保證報文的順序,可以在協議棧中添加TOTAL
協議,以保證FIFO的順序;為了在組內成員發生變化時得到通知和回調,可以添加Group Membership Service (GMS) 和
FLUSH協議;Failure Detector
(FD)協議用于識別組內崩潰的成員;如果新加入的成員希望獲得組內其它成員維護的狀態,那么可以向協議棧中添加STATE_TRANSFER協議;如果
希望對傳輸的數據進行加密,那么可以使用CRYPT協議等等。
??? JGruops的主要功能有:
- 組的創建和刪除。組可以跨越LANs或者WANs。
- 加入組、主動或者被動(例如當機或者網絡故障)離開組。
- 在組成員改變時,組中其它成員可以得到通知。
- 向組中的單個或者多個成員發送消息。
??? 在JGroups中JChannel類提供了主要的API
,用于連接到集群(cluster)、發送和接收消息(Message)和注冊listeners等。Message包含消息頭(保存地址等信息)和一個
字節數組(保存希望傳輸的數據)。org.jgroups.Address接口及其實現類封裝了地址信息,它通常包含IP地址和端口號。連接到集群中的所
有實例(instance)被稱為一個視圖(org.jgroups.View)。通過View.getMembers()可以得到所有實例的地址。實例
只有在連接到集群后才能夠發送和接收消息。以相同name調用JChannel.connect(String
name)方法的所有實例會連接到同一個集群。當實例希望離開集群時,可以調用JChannel.disconnect()方法。當希望釋放占有的資源
時,可以調用JChannel.close()方法。JChannel.close()方法內部會調用JChannel.disconnect()方法。
???
通過調用JChannel.setReceiver()方法可以接收消息和得到View改變的通知。每當有實例加入或者離開集群的時
候,viewAccepted(View view)方法會被調用。View.toString()方法會打印出View中所有實例的地址,以及View
ID。需要注意的是,每次viewAccepted(View view)方法被調用時,view參數都不同,其View
ID也會增長。View內的第一個實例被稱為coordinator。Receiver接口上的getState(),setState()方法用于在實
例間傳遞狀態。新的實例通過setState()方法獲得通過狀態,而這個狀態是通過調用集群中其它某個實例上的getState()獲得的。
??? 以下是JGruops manual中的一個簡單的例子:
- import ?java.io.BufferedReader;??
- import ?java.io.InputStreamReader;??
- import ?java.util.LinkedList;??
- import ?java.util.List;??
- ??
- import ?org.jgroups.JChannel;??
- import ?org.jgroups.Message;??
- import ?org.jgroups.ReceiverAdapter;??
- import ?org.jgroups.View;??
- import ?org.jgroups.util.Util;??
- ??
- public ? class ?SimpleChat?{??
- ????//??
- ????private?JChannel?channel;??
- ????private?List<String>?state?=?new?LinkedList<String>();??
- ????private?String?userName?=?System.getProperty("user.name",?"WhiteSock");??
- ??????
- ????public?void?start()?throws?Exception?{??
- ????????//??
- ????????channel?=?new?JChannel();??
- ????????channel.setReceiver(new?ReceiverAdapter()?{??
- ??????????????
- ????????????public?void?receive(Message?msg)?{??
- ????????????????System.out.println(msg.getSrc()?+?":?"?+?msg.getObject());??
- ??????????????????
- ????????????????synchronized(state)?{??
- ????????????????????state.add((String)msg.getObject());??
- ????????????????}??
- ????????????}??
- ??????????????
- ????????????public?void?viewAccepted(View?view)?{??
- ????????????????System.out.println("view?accepted:?"?+?view);??
- ????????????}??
- ??????????????
- ????????????public?byte[]?getState()?{??
- ????????????????synchronized(state)?{??
- ????????????????????try?{??
- ????????????????????????return?Util.objectToByteBuffer(state);??
- ????????????????????}??
- ????????????????????catch(Exception?e)?{??
- ????????????????????????e.printStackTrace();??
- ????????????????????????return?null;??
- ????????????????????}??
- ????????????????}??
- ????????????}??
- ??????????????
- ????????????@SuppressWarnings("unchecked")??
- ????????????public?void?setState(byte[]?new_state)?{??
- ????????????????try?{??
- ????????????????????List<String>?list=(List<String>)Util.objectFromByteBuffer(new_state);??
- ????????????????????synchronized(state)?{??
- ????????????????????????state.clear();??
- ????????????????????????state.addAll(list);??
- ????????????????????}??
- ????????????????????System.out.println("received?state?("?+?list.size()?+?"?messages?in?chat?history):");??
- ????????????????????for(String?str:?list)?{??
- ????????????????????????System.out.println(str);??
- ????????????????????}??
- ????????????????}??
- ????????????????catch(Exception?e)?{??
- ????????????????????e.printStackTrace();??
- ????????????????}??
- ????????????}??
- ????????});??
- ????????channel.connect("ChatCluster");??
- ????????channel.getState(null,?10000);??
- ??????????
- ????????//??
- ????????sendMessage();??
- ??????????
- ????????//??
- ????????channel.close();??
- ????}??
- ??????
- ????private?void?sendMessage()?throws?Exception?{??
- ????????boolean?succeed?=?false;??
- ????????BufferedReader?br?=?null;??
- ????????try?{??
- ????????????br?=?new?BufferedReader(new?InputStreamReader(System.in));??
- ????????????while(true)?{??
- ????????????????System.out.print(">");??
- ????????????????System.out.flush();??
- ????????????????String?line?=?br.readLine();??
- ????????????????if(line?!=?null?&&?line.equals("exit"))?{??
- ????????????????????break;??
- ????????????????}?else?{??
- ????????????????????Message?msg?=?new?Message(null,?null,?"["?+?userName?+?"]"?+?line);??
- ????????????????????channel.send(msg);??
- ????????????????}??
- ????????????}??
- ????????????succeed?=?true;??
- ????????}?finally?{??
- ????????????if(br?!=?null)?{??
- ????????????????try?{??
- ????????????????????br.close();??
- ????????????????}?catch?(Exception?e)?{??
- ????????????????????if(succeed)?{??
- ????????????????????????throw?e;??
- ????????????????????}??
- ????????????????}??
- ????????????}??
- ????????}??
- ????}??
- ??????
- ????public?static?void?main(String?args[])?throws?Exception?{??
- ????????new?SimpleChat().start();??
- ????}??
- }??
??? 在以上例子中,主線程會阻塞,直到從stdin中讀取一行。如果這行是"exit",那么程序退出,否則向集群中發送一個消息。如果集群中某個實例強行退 出,那么集群中的其它實例也會得到通知。Message構造函數的第一個參數如果是null,那么意味著消息將被發送到集群內所有的實例。
2 API
2.1 Interfaces
2.1.1 Transport
??? Transport接口只定義了最簡單的方法,用于發送和接收消息。其定義如下:
- public ? interface ?Transport?{??
- ????void?send(Message?msg)?throws?Exception;??
- ????Object?receive(long?timeout)?throws?Exception;??
- }??
2.1.2 MessageListener
??? 如果說Transport接口是以pull的方式接收消息,那么MessageListener則是以push的方式處理消息。當收到消息時,receive方法會被調用。getState() 和setState()方法用于在實例間傳遞狀態。其定義如下:
- public ? interface ?MessageListener?{??
- ????void?receive(Message?msg);??
- ????byte[]?getState();??
- ????void?setState(byte[]?state);??
- }??
2.1.3 ExtendedMessageListener
??? ExtendedMessageListener繼承自MessageListener,它定義了用來在實例間部分傳遞狀態的方法。如果需要傳遞的狀態數據量很大,那么通過配置協議棧,也可以指定使用流的方式傳遞狀態。其定義如下:
- public ? interface ?ExtendedMessageListener? extends ?MessageListener?{??
- ????byte[]?getState(String?state_id);??
- ????void?setState(String?state_id,?byte[]?state);??
- ??
- ????void?getState(OutputStream?ostream);??
- ????void?setState(InputStream?istream);??
- ??
- ????void?getState(String?state_id,?OutputStream?ostream);??
- ????void?setState(String?state_id,?InputStream?istream);??
- }??
2.1.4 MembershipListener
??? 當收到view、suspicion message和block event
的時候,相應的方法會被調用。這個接口常用的方法是viewAccepted(),以便在新的實例加入(或者離開)到集群時得到通知。當JGroups推
測某個實例可能崩潰時(此時該實例并未離開集群),suspect()方法會被調用,目前沒有unsuspect()方法。當JGroups需要通知集群
中的實例不要發送消息時,block()方法會被調用。這通常需要配置FLUSH協議,例如為了確保在進行狀態傳遞的時候,沒有實例在發送消息。在
block()方法返回后,所有發送消息的線程都會被阻塞,知道FLUSH協議解除阻塞。需要注意的是,block()方法內不應該執行耗時的操作,否則
整個FLUSH協議都會被阻塞。其定義如下:
- public ? interface ?MembershipListener?{??
- ????void?viewAccepted(View?new_view);??
- ????void?suspect(Address?suspected_mbr);??
- ????void?block();??
- }??
2.1.5 ExtendedMembershipListener
??? ExtendedMembershipListener繼承自MembershipListener。當FLUSH協議解除阻塞的時候,unblock()方法會被調用,所有發送消息的線程可以繼續發送消息。其定義如下:
- public ? interface ?ExtendedMembershipListener? extends ?MembershipListener?{??
- ????void?unblock();??
- }??
2.1.6 ChannelListener
??? 可以通過調用JChannel接口的addChannelListener(ChannelListener listener)方法來添加ChannelListener。當Channel被連接或者關閉時,相應的方法會北調用。其定義如下:
- public ? interface ?ChannelListener?{??
- ????void?channelConnected(Channel?channel);??
- ????void?channelDisconnected(Channel?channel);??
- ????void?channelClosed(Channel?channel);??
- ????void?channelShunned();??
- ????void?channelReconnected(Address?addr);??
- }??
2.1.7 Receiver
??? Receiver繼承自MessageListener和MembershipListener。其定義如下:
- public ? interface ?Receiver? extends ?MessageListener,?MembershipListener?{??
- }??
2.1.8 ExtendedReceiver
??? ExtendedReceiver繼承自Receiver、ExtendedMessageListener和ExtendedMembershipListener。其定義如下:
- public ? interface ?ExtendedReceiver? extends ?Receiver,?ExtendedMessageListener,?ExtendedMembershipListener?{??
- }??
?
2.2 Channel
2.2.1 Creating a channel
??? 最常見的創建Channel的方法是通過構造函數,此外也可以通過工廠方法。需要注意的是,集群中所有的實例必須有相同的協議棧。JChannel的構造函數之一如下:
- public ?JChannel(String?properties)? throws ?ChannelException?{??
- ????this(ConfiguratorFactory.getStackConfigurator(properties));??
- }??
??? 以上的構造函數中,properties參數是冒號分割的字符串,用來配置協議棧。字符串的最左端的元素定義了最底層的協議。如果properties為 null,那么將使用缺省的協議棧,即jgroups-all.jar中的udp.xml。以下是個properties參數的例子:
- String?props= "UDP(mcast_addr=228.1.2.3;mcast_port=45566;ip_ttl=32):" ?+??
- "PING(timeout=3000;num_initial_members=6):" ?+??
- "FD(timeout=5000):" ?+??
- "VERIFY_SUSPECT(timeout=1500):" ?+??
- "pbcast.NAKACK(gc_lag=10;retransmit_timeout=3000):" ?+??
- "UNICAST(timeout=300,600,1200):" ?+??
- "FRAG:" ?+??
- "pbcast.GMS(join_timeout=5000;shun=false;print_local_addr=true)" ;??
??? 此外,也可以用File和URL作為構造函數的參數,這種方式允許以本地或者遠程的XML文件配置協議棧。XML文件的config節點中的每個子節點定 義一個協議,第一個子節點定義了最底層的協議。每個子節點名都對應一個Java類名,缺省的協議名不必是全限定類名,它們位于 org.jgroups.stack.protocols包中。如果是自定義的協議,那么則必須是全限定類名。每個協議可以有零個或多個屬性,以 name/value對的方式指定。以下是jgroups-all.jar中的udp.xml的內容:
- < config > ??
- ????<UDP??
- ?????????mcast_addr="${jgroups.udp.mcast_addr:228.10.10.10}"??
- ?????????mcast_port="${jgroups.udp.mcast_port:45588}"??
- ?????????tos="8"??
- ?????????ucast_recv_buf_size="20000000"??
- ?????????ucast_send_buf_size="640000"??
- ?????????mcast_recv_buf_size="25000000"??
- ?????????mcast_send_buf_size="640000"??
- ?????????loopback="false"??
- ?????????discard_incompatible_packets="true"??
- ?????????max_bundle_size="64000"??
- ?????????max_bundle_timeout="30"??
- ?????????use_incoming_packet_handler="true"??
- ?????????ip_ttl="${jgroups.udp.ip_ttl:2}"??
- ?????????enable_bundling="true"??
- ?????????enable_diagnostics="true"??
- ?????????thread_naming_pattern="cl"??
- ??
- ?????????use_concurrent_stack="true"??
- ??
- ?????????thread_pool.enabled="true"??
- ?????????thread_pool.min_threads="2"??
- ?????????thread_pool.max_threads="8"??
- ?????????thread_pool.keep_alive_time="5000"??
- ?????????thread_pool.queue_enabled="true"??
- ?????????thread_pool.queue_max_size="1000"??
- ?????????thread_pool.rejection_policy="Run"??
- ??
- ?????????oob_thread_pool.enabled="true"??
- ?????????oob_thread_pool.min_threads="1"??
- ?????????oob_thread_pool.max_threads="8"??
- ?????????oob_thread_pool.keep_alive_time="5000"??
- ?????????oob_thread_pool.queue_enabled="false"??
- ?????????oob_thread_pool.queue_max_size="100"??
- ?????????oob_thread_pool.rejection_policy="Run"/>??
- ??
- ????<PING?timeout="2000"??
- ????????????num_initial_members="3"/>??
- ????<MERGE2?max_interval="30000"??
- ????????????min_interval="10000"/>??
- ????<FD_SOCK/>??
- ????<FD?timeout="10000"?max_tries="5"???shun="true"/>??
- ????<VERIFY_SUSPECT?timeout="1500"??/>??
- ????<BARRIER?/>??
- ????<pbcast.NAKACK?use_stats_for_retransmission="false"??
- ???????????????????exponential_backoff="150"??
- ???????????????????use_mcast_xmit="true"?gc_lag="0"??
- ???????????????????retransmit_timeout="50,300,600,1200"??
- ???????????????????discard_delivered_msgs="true"/>??
- ????<UNICAST?timeout="300,600,1200"/>??
- ????<pbcast.STABLE?stability_delay="1000"?desired_avg_gossip="50000"??
- ???????????????????max_bytes="1000000"/>??
- ????<VIEW_SYNC?avg_send_interval="60000"???/>??
- ????<pbcast.GMS?print_local_addr="true"?join_timeout="3000"??
- ????????????????shun="false"??
- ????????????????view_bundling="true"/>??
- ????<FC?max_credits="500000"??
- ????????????????????min_threshold="0.20"/>??
- ????<FRAG2?frag_size="60000"??/>??
- ????<!--pbcast.STREAMING_STATE_TRANSFER?/-->??
- ????<pbcast.STATE_TRANSFER??/>??
- ????<!--?pbcast.FLUSH??/-->??
- </ config > ??
??? 以上XML文件中,UDP協議的mcast_addr屬性被指定使用jgroups.udp.mcast_addr系統屬性,如果沒有配置這個系統屬性,那么使用缺省值228.10.10.10。
2.2.2 Setting options
??? 通過setOpt(int option, Object value)方法可以給Channel設置屬性,目前支持的屬性有:
- Channel.BLOCK 這是一個Boolean型的屬性,缺省是false。如果設置成true,那么會接收到block message。
- Channel.LOCAL這是一個Boolean型的屬性,缺省設置成true。如果是true,那么當集群中的實例向集群發送消息時,這個實例本身也會收到這個消息。
- Channel.AUTO_RECONNECT這是一個Boolean型的屬性,缺省是false。如果設置成true,那么shunned channel 在離開集群后,會自動重新加入到集群中。
- Channel.AUTO_GETSTATE 這是一個Boolean型的屬性,缺省是false。如果設置成true,那么shunned channel在自動重新加入到集群后,會自動嘗試向集群的coordinator 獲得集群的狀態(需要AUTO_RECONNECT也設置成true)。
??? 通過Object getOpt(int option)可以或者channel的相關屬性值。
2.2.3 Connecting/Disconnecting
??? 通過調用connect(String
cluster_name) throws
ChannelException方法連接到集群。cluster_name參數指定了集群的名稱。集群中的第一個實例被稱為coordinator。當
集群中的成員發生變化的時候,coordinator會向集群中的其它實例發送view message。
??? 也可以通過調用void
connect(String cluster_name, Address target, String state_id,long
timeout) throws
ChannelException方法連接到集群,并從集群中請求當前的狀態。與將這兩個操作分開進行相比,在一個方法調用內完成這兩個操作,可以允許
JGroups對發送的消息進行優化,更重要的是,從調用者角度來看,這兩個操作被合并成一個原子操作。cluster_name參數用于指定集群的名
稱。target參數指定了從集群中的哪個實例獲得狀態,如果該參數為null,那么會從coordinator獲得。如果希望傳遞部分的狀態,那么
state_id參數可以用于指定狀態id。
??? 當Channel連接到集群后,通過調用String
getClusterName()方法可以獲得當前連接到的集群名稱。通過調用Address
getLocalAddress()方法可以獲得channel的地址。通過調用View
getView()方法可以獲得channel的當前view。每當Channel收到view
message的時候,channel的當前view就會被更新。
??? 通過調用void
disconnect()方法以斷開到集群的連接。如果channel已經并沒有連接到集群,或者chaneel已經被close,那么調用這個方法沒有
任何效果。如果channel已經連接到集群,那么這個方法內會向coordinator發送一個離開請求,同時coordinator會向集群中的所有
其它實例發送view message,以通知它們該實例的離開。斷開連接的channel可以通過調用connect()方法重新連接到集群。
??? 通過void close()方法以釋放channel占有的資源。Channel被close之后,調用channel上的任何方法都可能會導致異常。
2.2.4 Sending messages
???
當channel連接到集群后,可以通過以下方法發送消息。第一個send方法接受一個Message型的參數,如果msg的目標地址不是null,那么
消息會發送到指定地址,否則會發送到集群內的所有成員。msg的源地址可以不必手工設置,如果是null,那么會被協議棧的最底層協議(傳輸協議)設置成
channel的本地地址。第二個send方法在內部使用了一個send方法。
- void ?send(Message?msg)? throws ?ChannelNotConnectedException,?ChannelClosedException???
- void ?send(Address?dst,?Address?src,?Serializable?obj)? throws ?ChannelNotConnectedException,?ChannelClosedException??
??? 以下是個發送消息的例子:
- Hashtable?data;??
- try ?{??
- ????Address?receiver=channel.getView().getMembers().first();??
- ????channel.send(receiver,?null,?data);??
- }??
- catch (Exception?ex)?{??
- ????//?handle?errors??
- }??
2.2.5 Receiving messages
??? Channel
以異步的方式從網絡上接收消息,然后把消息存放在一個無界隊列中。當調用receive()方法時,會嘗試返回隊列中取得下一個可用的消息。如果隊列中沒
有消息,那么會被阻塞。如果timeout小于等于零,那么會永遠等待下去;否則會等待timeout指定的毫秒數,直到收到消息或者拋出
TimeoutException。需要注意的是,JChannel. receive(long
timeout)方法已經被標記為deprecated。根據channel options的不同,receive()方法可能返回以下類型的對象:
- Message 普通消息。Message.makeReply()可以同于創建消息的應答,即以當前消息的源地址作為應答消息的目標地址。
- View 當集群的成員發生變化的時候,集群的每個成員都會收到view message。當兩個或者多個子集群(subgroups)合并成一個的時候,集群中的成員會收到MergeView message。如果需要在子集群合并時處理子集群狀態的合并,那么通常需要在單獨的線程里執行耗時的操作。
- SuspectEvent 當集群中的某個成員被懷疑崩潰時,集群中的其它成員會收到SuspectEvent message。通過調用SuspectEvent.getMember()可以得到可疑成員的地址。在收到這個消息后,通常還會收到view message。
- BlockEvent 當收到BlockEvent message后,實例應該停止發送消息,然后應該調用Channel.blockOk()方法(目前JChannel.blockOk()方法是一個空 方法)確認已經停止發送消息。當Channel.blockOk()方法調用完畢之后,所有發送消息的線程都會被阻塞直到FLUSH協議解除阻塞。為了接 收BlockEvent message,需要設置Channel.BLOCK屬性為true。
- UnblockEvent 當收到UnblockEvent message后,實例可以繼續發送消息。
- GetStateEvent 當收到GetStateEvent message后,實例應該保存當前的狀態,并將當前狀態的一份拷貝作為參數調用Channel.returnState()方法,然后JGroups會 將這個狀態返回給請求狀態的實例。為了接收GetStateEvent message,需要在協議棧中配置pbcast.STATE_TRANSFER協議。
- StreamingGetStateEvent當收到StreamingGetStateEvent message后,實例應該通過StreamingGetStateEvent.getArg()返回的輸出流返回狀態。為了接收 StreamingGetStateEvent message,需要在協議棧中配置pbcast.STREAMING_STATE_TRANSFER協議。
- SetStateEvent當收到SetStateEvent message后,實例應該通過SetStateEvent.getArg()返回的字節數組取得狀態。
- StreamingSetStateEvent當收到StreamingSetStateEvent message后,實例應該通過StreamingSetStateEvent.getArg()返回的輸入流取得狀態。為了接收 StreamingSetStateEvent message,需要在協議棧中配置pbcast.STREAMING_STATE_TRANSFER協議。
??? 以下是個使用pull方式接收消息的例子:
- import ?java.io.BufferedReader;??
- import ?java.io.InputStreamReader;??
- import ?java.util.HashMap;??
- import ?java.util.Iterator;??
- import ?java.util.Map;??
- ??
- import ?org.jgroups.BlockEvent;??
- import ?org.jgroups.Channel;??
- import ?org.jgroups.GetStateEvent;??
- import ?org.jgroups.JChannel;??
- import ?org.jgroups.Message;??
- import ?org.jgroups.SetStateEvent;??
- import ?org.jgroups.UnblockEvent;??
- import ?org.jgroups.View;??
- import ?org.jgroups.util.Util;??
- ??
- public ? class ?PollStyleReceiver? implements ?Runnable?{??
- ????//??
- ????private?JChannel?channel;??
- ????private?Map<String,?String>?state?=?new?HashMap<String,?String>();??
- ????private?String?properties?=?"UDP(mcast_addr=228.1.2.3;mcast_port=45566;ip_ttl=32):"?+??
- ????"PING(timeout=3000;num_initial_members=6):"?+??
- ????"FD(timeout=5000):"?+??
- ????"VERIFY_SUSPECT(timeout=1500):"?+??
- ????"pbcast.NAKACK(gc_lag=10;retransmit_timeout=3000):"?+??
- ????"UNICAST(timeout=300,600,1200):"?+??
- ????"FRAG:"?+??
- ????"pbcast.GMS(join_timeout=5000;shun=false;print_local_addr=true):"?+???
- ????"pbcast.STATE_TRANSFER:"?+???
- ????"pbcast.FLUSH";??
- ??????
- ??????
- ????public?void?start()?throws?Exception?{??
- ????????//??
- ????????channel?=?new?JChannel(properties);??
- ????????channel.connect("PollStyleReceiver");??
- ????????channel.setOpt(Channel.BLOCK,?Boolean.TRUE);??
- ????????channel.getState(null,?10000);??
- ??????????
- ????????new?Thread(this).start();??
- ??????????
- ????????sendMessage();??
- ??????????
- ????????channel.close();??
- ????}??
- ??????
- ????@SuppressWarnings({?"unchecked",?"deprecation"?})??
- ????public?void?run()?{??
- ????????while(true)?{??
- ????????????try?{??
- ????????????????Object?obj?=?channel.receive(0);??
- ????????????????if(obj?instanceof?Message)?{??
- ????????????????????System.out.println("received?a?regular?message:?"?+?(Message)obj);??
- ????????????????????String?s?=?(String)((Message)obj).getObject();??
- ????????????????????String?key?=?s.substring(0,?s.indexOf("="));??
- ????????????????????String?value?=?s.substring(s.indexOf("=")?+?1);??
- ????????????????????state.put(key,?value);??
- ????????????????}?else?if(obj?instanceof?View)?{??
- ????????????????????System.out.println("received?a?View?message:?"?+?(View)obj);??
- ????????????????}?else?if(obj?instanceof?BlockEvent)?{??
- ????????????????????System.out.println("received?a?BlockEvent?message:?"?+?(BlockEvent)obj);??
- ????????????????????channel.blockOk();??
- ????????????????}?else?if(obj?instanceof?UnblockEvent)?{??
- ????????????????????System.out.println("received?a?UnblockEvent?message:?"?+?(UnblockEvent)obj);??
- ????????????????}?else?if(obj?instanceof?GetStateEvent)?{??
- ????????????????????System.out.println("received?a?GetStateEvent?message:?"?+?(GetStateEvent)obj);??
- ????????????????????channel.returnState(Util.objectToByteBuffer(copyState(state)));??
- ????????????????}?else?if(obj?instanceof?SetStateEvent)?{??
- ????????????????????System.out.println("received?a?SetStateEvent?message:?"?+?(SetStateEvent)obj);??
- ????????????????????this.state?=?(Map<String,?String>)Util.objectFromByteBuffer(((SetStateEvent)obj).getArg());??
- ????????????????????System.out.println("current?state:?"?+?printState(this.state));??
- ????????????????}?else?{??
- ????????????????????System.out.println(obj);??
- ????????????????}??
- ????????????}?catch(Exception?e)?{??
- ????????????????e.printStackTrace();??
- ????????????????break;??
- ????????????}??
- ????????}??
- ????}??
- ??????
- ????private?void?sendMessage()?throws?Exception?{??
- ????????boolean?succeed?=?false;??
- ????????BufferedReader?br?=?null;??
- ????????try?{??
- ????????????br?=?new?BufferedReader(new?InputStreamReader(System.in));??
- ????????????while(true)?{??
- ????????????????System.out.print(">?");??
- ????????????????System.out.flush();??
- ????????????????String?line?=?br.readLine();??
- ????????????????if(line?!=?null?&&?line.equals("exit"))?{??
- ????????????????????break;??
- ????????????????}?else?if(line.indexOf("=")?>?0?||?line.indexOf("=")?==?line.length()?-?1){??
- ????????????????????Message?msg?=?new?Message(null,?null,?line);??
- ????????????????????channel.send(msg);??
- ????????????????}?else?{??
- ????????????????????System.out.println("invalid?input:?"?+?line);??
- ????????????????}??
- ????????????}??
- ????????????succeed?=?true;??
- ????????}?finally?{??
- ????????????if(br?!=?null)?{??
- ????????????????try?{??
- ????????????????????br.close();??
- ????????????????}?catch?(Exception?e)?{??
- ????????????????????if(succeed)?{??
- ????????????????????????throw?e;??
- ????????????????????}??
- ????????????????}??
- ????????????}??
- ????????}??
- ????}??
- ??????
- ????private?Map<String,?String>?copyState(Map<String,?String>?s)?{??
- ????????Map<String,?String>?m?=?new?HashMap<String,?String>();???
- ????????for(String?key?:?s.keySet())?{??
- ????????????m.put(key,?s.get(key));??
- ????????}??
- ????????return?m;??
- ????}??
- ??????
- ????private?String?printState(Map<String,?String>?s)?{??
- ????????StringBuffer?sb?=?new?StringBuffer();??
- ????????sb.append("[");??
- ????????for(Iterator<String>?iter?=?s.keySet().iterator();?iter.hasNext();?)?{??
- ????????????String?key?=?iter.next();??
- ????????????sb.append(key).append("=");??
- ????????????sb.append(s.get(key));??
- ????????????if(iter.hasNext())?{??
- ????????????????sb.append(",?");??
- ????????????}??
- ????????}??
- ????????sb.append("]");??
- ????????return?sb.toString();??
- ????}??
- ??????
- ????public?static?void?main(String?args[])?throws?Exception?{??
- ????????new?PollStyleReceiver().start();??
- ????}??
- }??
??? 程序啟動后,程序會將在命令行鍵入鍵值對(例如key1=value1)保存到HashMap中。并允許在不同的實例間傳遞狀態。
???
除了以poll的方式接收消息外,JGroups也支持以push的方式處理消息。通過向JChannel注冊Receiver,允許程序以回調的方式處
理消息,而不必啟動額外的線程來接收消息,同時JGroups在內部也不用使用無界隊列來保存消息。一下是個使用push方式處理消息的例子:
- JChannel?ch?=? new ?JChannel();??
- ch.setReceiver(new?ExtendedReceiverAdapter()?{??
- ????public?void?receive(Message?msg)?{??
- ????????System.out.println("received?message?"?+?msg);??
- ????}??
- ????public?void?viewAccepted(View?new_view)?{??
- ????????System.out.println("received?view?"?+?new_view);??
- ????}??
- });??
- ch.connect("bla");??
2.2.6 State transfer
??? JGroups支持在集群中維護和傳遞狀態(state),例如web
server的Http
Sessions等。集群中的某個實例可以通過JChannel.send()方法發送消息,從而把對狀態的修改同步到集群的其它實例中。當一個新的實例
加入到集群后,可以調用JChannel.getState()方法向集群中的某個實例(缺省是coordinator)請求獲得當前的狀態。需要注意的
是,JChannel.getState()方法返回的是boolean類型。如果該實例是集群中的第一個實例,那么該方法返回false(也就是說目前
沒有狀態),否則返回true。在接下來JChannel.receive()方法的返回值中會包含SetStateEvent
message,或者通過MembershipListener.setState()方法獲得狀態。JChannel.getState()方法不直接
返回狀態的原因是,如果JChannel的消息隊列中還有未被處理的消息,那么讓JChannel.getState()直接返回狀態,會破壞消息接收的
FIFO順序保證,傳遞的狀態也會不正確。
??? 假設某個集群中包含A、B 和C三個成員,當D加入到集群的時候,如果D調用了JChannel.getState(),那么會發生以下的調用序列:
- D 調用 JChannel.getState()方法。假設狀態從集群中的A成員取得。
- A 收到GetStateEvent message或者A注冊的Receiver的getState() 方法被調用。A返回了當前狀態。
- D 調用 JChannel.getState()方法返回,返回值是true。
- D 收到SetStateEvent message或者D注冊的Receiver的setState()方法被調用。D取得狀態。
??? 2.2.5節的例子中包含了狀態傳遞相關的代碼,需要注意的是,在調用JChannel.returnState()方法的時候,為了防止在狀態被通過網絡發送前,程序的其它地方對狀態進行了修改(比如接收到新的消息并更新狀態),需要傳遞當前狀態的一份拷貝。
??? 除了通過處理GetStateEvent 和 SetStateEvent消息來傳遞狀態之外,JGroups也支持通過Reciever傳遞狀態,例如在第一章中演示的例子。
??? JGroups支持傳遞部分狀態和以流的形式傳遞狀態,詳細內容可以參考JGroups Manual。
3 Building Blocks
??? Building
blocks位于org.jgroups.blocks包中,在邏輯上可以視為channels之上的一層,它提供了更復雜的接口。Building
blocks并不必依賴于channels,部分building
blocks只需要實現了Transport接口的類即可工作。以下簡要介紹部分building blocks。
3.1 MessageDispatcher
??? Channels
通常用于異步地發送和接收消息。然后有些情況下需要同步通信,例如發送者希望向集群發送消息并等待所有成員的應答,或者等待部分成員的應答。
MessageDispatcher支持以同步或者異步的方式發送消息,它在構造時需要一個Channel型的參數。
???
MessageDispatcher提供了Object handle(Message msg)方法,用于以push
方式的接收消息并返回應答(必須可以被序列化),該方法拋出的異常也會被傳播到消息發送者。MessageDispatcher在內部使用了
PullPushAdapter,PullPushAdapter也是org.jgroups.blocks包中的類,但是已經被標記為
deprecated。這種方式被稱為MessageDispatcher的server模式。
???
MessageDispatcher的client模式是指通過調用castMessage或者sendMessage向集群發送消息并同步或者異步的等
待應答。castMessage()方法向dests指定的地址發送消息,如果dest為null,那么向集群中所有成員發送消息。
castMessage()方法的返回值是RspList,RspList 實現了Map<Address,Rsp>
接口。msg參數中的目的地址會被覆蓋。mode參數(由org.jgroups.blocks.GroupRequest類定義)指定了消息是同步還是
異步發送,其可選值如下:
- GET_FIRST 返回收到的第一個應答。
- GET_ALL 等待所有成員的應答(被懷疑崩潰的成員除外)。
- GET_MAJORITY 等待絕大多數成員(相對與成員的個數)的應答。
- GET_ABS_MAJORITY等待絕大多數成員(一個絕對的數值,只計算一次)的應答。
- GET_N 等待n個應答,如果n大于成員的個數,可能會一直阻塞下去。
- GET_NONE 不等待應答,直接返回,即異步方式。
??? castMessage()方法的定義如下:
- public ?RspList?castMessage(Vector?dests,?Message?msg,? int ?mode,? long ?timeout);??
??? sendMessage()方法允許向一個成員發送消息,msg參數的目的地址不能為null。如果mode參數是GET_NONE,那么消息的發送變成 異步方式;否則mode參數會被忽略(缺省采用GET_FIRST)。sendMessage()方法的定義如下:
- public ?Object?sendMessage(Message?msg,? int ?mode,? long ?timeout)? throws ?TimeoutException;??
??? 以下是個使用MessageDispatcher的例子:
- import ?java.io.BufferedReader;??
- import ?java.io.InputStreamReader;??
- ??
- import ?org.jgroups.Channel;??
- import ?org.jgroups.JChannel;??
- import ?org.jgroups.Message;??
- import ?org.jgroups.blocks.GroupRequest;??
- import ?org.jgroups.blocks.MessageDispatcher;??
- import ?org.jgroups.blocks.RequestHandler;??
- import ?org.jgroups.util.RspList;??
- ??
- public ? class ?MessageDispatcherTest?{??
- ????//??
- ????private?Channel?channel;??
- ????private?MessageDispatcher?dispatcher;??
- ????private?boolean?propagateException?=?false;??
- ??
- ????public?void?start()?throws?Exception?{??
- ????????//??
- ????????channel?=?new?JChannel();??
- ????????dispatcher?=?new?MessageDispatcher(channel,?null,?null,?new?RequestHandler()?{??
- ??
- ????????????public?Object?handle(Message?msg)?{??
- ????????????????System.out.println("got?a?message:?"?+?msg);??
- ????????????????if(propagateException)?{??
- ????????????????????throw?new?RuntimeException("failed?to?handle?message:?"?+?msg.getObject());??
- ????????????????}?else?{??
- ????????????????????return?new?String("success");??
- ????????????????}??
- ????????????}??
- ??????????????
- ????????});??
- ????????channel.connect("MessageDispatcherTest");??
- ??????????
- ????????//??
- ????????sendMessage();??
- ??????????
- ????????//??
- ????????channel.close();??
- ????????dispatcher.stop();??
- ????}??
- ??????
- ????private?void?sendMessage()?throws?Exception?{??
- ????????boolean?succeed?=?false;??
- ????????BufferedReader?br?=?null;??
- ????????try?{??
- ????????????br?=?new?BufferedReader(new?InputStreamReader(System.in));??
- ????????????while(true)?{??
- ????????????????System.out.print(">?");??
- ????????????????System.out.flush();??
- ????????????????String?line?=?br.readLine();??
- ????????????????if(line?!=?null?&&?line.equals("exit"))?{??
- ????????????????????break;??
- ????????????????}?else?{??
- ????????????????????Message?msg?=?new?Message(null,?null,?line);??
- ????????????????????RspList?rl?=?dispatcher.castMessage(null,?msg,?GroupRequest.GET_ALL,?0);??
- ????????????????????System.out.println("Responses:\n"?+?rl);??
- ????????????????}??
- ????????????}??
- ????????????succeed?=?true;??
- ????????}?finally?{??
- ????????????if(br?!=?null)?{??
- ????????????????try?{??
- ????????????????????br.close();??
- ????????????????}?catch?(Exception?e)?{??
- ????????????????????if(succeed)?{??
- ????????????????????????throw?e;??
- ????????????????????}??
- ????????????????}??
- ????????????}??
- ????????}??
- ????}??
- ??
- ????public?static?void?main(String[]?args)?{??
- ????????try?{??
- ????????????new?MessageDispatcherTest().start();??
- ????????}?catch?(Exception?e)?{??
- ????????????e.printStackTrace();??
- ????????}??
- ????}??
- }??
3.2 RpcDispatcher
??? RpcDispatcher
繼承自MessageDispatcher,它允許遠程調用集群中其它成員上的方法,并可選地等待應答。跟MessageDispatcher相比,不需
要為RpcDispatcher指定RequestHandler。RpcDispatcher的構造函數接受一個Object
server_obj參數,它是遠程調用的目標對象。RpcDispatcher的callRemoteMethods系列方法用于遠程調用目標對象上的
方法,該方法可以由MethodCall指定,也可以通過方法名、參數類型指定。跟MessageDispatcher的castMessage()方法
和sendMessage()方法類似,callRemoteMethods系列方法也接受一個int
mode參數,其含義也相同。以下是個簡單的例子:
- import ?java.io.BufferedReader;??
- import ?java.io.InputStreamReader;??
- ??
- import ?org.jgroups.Channel;??
- import ?org.jgroups.JChannel;??
- import ?org.jgroups.blocks.GroupRequest;??
- import ?org.jgroups.blocks.RpcDispatcher;??
- import ?org.jgroups.util.RspList;??
- ??
- public ? class ?RpcDispatcherTest?{??
- ????private?Channel?channel;??
- ????private?RpcDispatcher?dispatcher;??
- ??
- ????public?int?print(int?number)?throws?Exception?{??
- ????????return?number?*?2;??
- ????}??
- ??
- ????public?void?start()?throws?Exception?{??
- ????????channel?=?new?JChannel();??
- ????????dispatcher?=?new?RpcDispatcher(channel,?null,?null,?this);??
- ????????channel.connect("RpcDispatcherTest");??
- ??????????
- ????????//??
- ????????sendMessage();??
- ??????????
- ????????//??
- ????????channel.close();??
- ????????dispatcher.stop();??
- ????}??
- ??????
- ????private?void?sendMessage()?throws?Exception?{??
- ????????boolean?succeed?=?false;??
- ????????BufferedReader?br?=?null;??
- ????????try?{??
- ????????????br?=?new?BufferedReader(new?InputStreamReader(System.in));??
- ????????????while(true)?{??
- ????????????????System.out.print(">?please?input?an?int?value:");??
- ????????????????System.out.flush();??
- ????????????????String?line?=?br.readLine();??
- ????????????????if(line?!=?null?&&?line.equals("exit"))?{??
- ????????????????????break;??
- ????????????????}?else?{??
- ????????????????????int?param?=?0;??
- ????????????????????try?{??
- ????????????????????????param?=?Integer.parseInt(line);??
- ????????????????????}?catch(Exception?e)?{??
- ????????????????????????System.out.println("invalid?input:?"?+?line);??
- ????????????????????????continue;??
- ????????????????????}??
- ????????????????????RspList?rl?=?dispatcher.callRemoteMethods(null,?"print",?new?Object[]{new?Integer(param)},?new?Class[]{int.class},?GroupRequest.GET_ALL,?0);??
- ????????????????????System.out.println("Responses:?\n"?+?rl);??
- ????????????????}??
- ????????????}??
- ????????????succeed?=?true;??
- ????????}?finally?{??
- ????????????if(br?!=?null)?{??
- ????????????????try?{??
- ????????????????????br.close();??
- ????????????????}?catch?(Exception?e)?{??
- ????????????????????if(succeed)?{??
- ????????????????????????throw?e;??
- ????????????????????}??
- ????????????????}??
- ????????????}??
- ????????}??
- ????}??
- ??
- ????public?static?void?main(String[]?args)?{??
- ????????try?{??
- ????????????new?RpcDispatcherTest().start();??
- ????????}?catch?(Exception?e)?{??
- ????????????e.printStackTrace();??
- ????????}??
- ????}??
- }??
3.3 ReplicatedHashMap
??? ReplicatedHashMap
繼承自ConcurrentHashMap,并在內部使用了RpcDispatcher。ReplicatedHashMap構造函數的
clustername參數指定了集群的名字,集群中所有的實例會包含相同的狀態。新加入的實例在開始工作前會從集群中獲得當前的狀態。對實例的修改(例
如通過put,remove方法)會傳播到集群的其它實例中,只讀的請求(例如get方法)則是本地調用。需要注意的
是,ReplicatedHashMap的以下劃線開頭的方法是用于RpcDispatcher的遠程調用的。在ReplicatedHashMap上可
以注冊 Notification,以便在實例的狀態改變時進行回調,所有的回調也是本地的。以下是個簡單的例子:
- import ?java.io.BufferedReader;??
- import ?java.io.InputStreamReader;??
- import ?java.util.Iterator;??
- import ?java.util.Map;??
- import ?java.util.Vector;??
- ??
- import ?org.jgroups.Address;??
- import ?org.jgroups.ChannelFactory;??
- import ?org.jgroups.JChannelFactory;??
- import ?org.jgroups.View;??
- import ?org.jgroups.blocks.ReplicatedHashMap;??
- ??
- public ? class ?ReplicatedHashMapTest? implements ?ReplicatedHashMap.Notification<String,?String>?{??
- ????//??
- ????private?ReplicatedHashMap<String,?String>?map;??
- ??
- ????public?void?start()?throws?Exception?{??
- ????????ChannelFactory?factory?=?new?JChannelFactory();??
- ????????map?=?new?ReplicatedHashMap<String,?String>("ReplicatedHashMapTest",?factory,?"udp.xml",?false,?10000);??
- ????????map.addNotifier(this);??
- ??
- ????????sendMessage();??
- ????????map.stop();??
- ????}??
- ??
- ????public?void?entryRemoved(String?key)?{??
- ????????System.out.println("in?entryRemoved("?+?key?+?")");??
- ????}??
- ??
- ????public?void?entrySet(String?key,?String?value)?{??
- ????????System.out.println("in?entrySet("?+?key?+?","?+?value?+?")");??
- ????}??
- ??
- ????public?void?contentsSet(Map<String,?String>?m)?{??
- ????????System.out.println("in?contentsSet("?+?printMap(m)?+?")");??
- ????}??
- ??
- ????public?void?contentsCleared()?{??
- ????????System.out.println("in?contentsCleared()");??
- ????}??
- ??
- ????public?void?viewChange(View?view,?Vector<Address>?newMembers,??
- ????????????Vector<Address>?oldMembers)?{??
- ????????System.out.println("in?viewChange("?+?view?+?")");??
- ????}??
- ??
- ????private?void?sendMessage()?throws?Exception?{??
- ????????boolean?succeed?=?false;??
- ????????BufferedReader?br?=?null;??
- ????????try?{??
- ????????????br?=?new?BufferedReader(new?InputStreamReader(System.in));??
- ????????????while?(true)?{??
- ????????????????System.out.print(">?");??
- ????????????????System.out.flush();??
- ????????????????String?line?=?br.readLine();??
- ????????????????if?(line?!=?null?&&?line.equals("exit"))?{??
- ????????????????????break;??
- ????????????????}?else?{??
- ????????????????????if?(line.equals("show"))?{??
- ????????????????????????System.out.println(printMap(map));??
- ????????????????????}?else?if?(line.equals("clear"))?{??
- ????????????????????????map.clear();??
- ????????????????????}?else?if?(line.startsWith("remove?"))?{??
- ????????????????????????String?key?=?line.substring(line.indexOf("?")?+?1,?line.length()).trim();??
- ????????????????????????map.remove(key);??
- ????????????????????}?else?if?(line.startsWith("put?"))?{??
- ????????????????????????line?=?line.replace("put?",?"");??
- ????????????????????????int?index?=?line.indexOf("=");??
- ????????????????????????if?(index?<=?0?||?index?>=?(line.length()?-?1))?{??
- ????????????????????????????System.out.println("invalid?input");??
- ????????????????????????????continue;??
- ????????????????????????}??
- ????????????????????????String?key?=?line.substring(0,?index).trim();??
- ????????????????????????String?value?=?line.substring(index?+?1,?line.length())??
- ????????????????????????????????.trim();??
- ????????????????????????map.put(key,?value);??
- ????????????????????}?else?{??
- ????????????????????????System.out.println("invalid?input:?"?+?line);??
- ????????????????????????continue;??
- ????????????????????}??
- ????????????????}??
- ????????????}??
- ????????????succeed?=?true;??
- ????????}?finally?{??
- ????????????if?(br?!=?null)?{??
- ????????????????try?{??
- ????????????????????br.close();??
- ????????????????}?catch?(Exception?e)?{??
- ????????????????????if?(succeed)?{??
- ????????????????????????throw?e;??
- ????????????????????}??
- ????????????????}??
- ????????????}??
- ????????}??
- ????}??
- ??
- ????private?String?printMap(Map<String,?String>?m)?{??
- ????????StringBuffer?sb?=?new?StringBuffer();??
- ????????sb.append("[");??
- ????????for?(Iterator<String>?iter?=?map.keySet().iterator();?iter.hasNext();)?{??
- ????????????String?key?=?iter.next();??
- ????????????String?value?=?map.get(key);??
- ????????????sb.append(key).append("=").append(value);??
- ????????????if?(iter.hasNext())?{??
- ????????????????sb.append(",");??
- ????????????}??
- ????????}??
- ????????sb.append("]");??
- ????????return?sb.toString();??
- ????}??
- ??
- ????public?static?void?main(String?args[])?{??
- ????????try?{??
- ????????????new?ReplicatedHashMapTest().start();??
- ????????}?catch?(Exception?e)?{??
- ????????????e.printStackTrace();??
- ????????}??
- ????}??
- }??
3.4 NotificationBus
??? NotificationBus
提供了向集群發送通知的能力,通知可以是任何可以被序列化的對象。NotificationBus在內部使用Channel,其start()和
stop()方法用于啟動和停止。NotificationBus的setConsumer()方法用于注冊Consumer接口,其定義如下:
- public ? interface ?Consumer?{??
- ????void?handleNotification(Serializable?n);??
- ????Serializable?getCache();??
- ????void?memberJoined(Address?mbr);??
- ????void?memberLeft(Address?mbr);??
- }??
??? NotificationBus的getCacheFromCoordinator() 和getCacheFromMember()用于請求集群的狀態。前者是從coordinator得到狀態,后者從指定地址的成員處得到狀態。 NotificationBus上注冊的Consumer需要實現getCache()方法以返回狀態。以下是個簡單的例子:
- import ?java.io.BufferedReader;??
- import ?java.io.InputStreamReader;??
- import ?java.io.Serializable;??
- import ?java.util.Iterator;??
- import ?java.util.LinkedList;??
- ??
- import ?org.jgroups.Address;??
- import ?org.jgroups.blocks.NotificationBus;??
- ??
- public ? class ?NotificationBusTest? implements ?NotificationBus.Consumer?{??
- ????//??
- ????private?NotificationBus?bus;??
- ????private?LinkedList<Serializable>?cache;??
- ??
- ????public?void?handleNotification(Serializable?n)?{??
- ????????System.out.println("in?handleNotification("?+?n?+?")");??
- ????????if?(cache?!=?null)?{??
- ????????????cache.add(n);??
- ????????}??
- ????}??
- ??
- ????public?Serializable?getCache()?{??
- ????????return?cache;??
- ????}??
- ??
- ????public?void?memberJoined(Address?mbr)?{??
- ????????System.out.println("in?memberJoined("?+?mbr?+?")");??
- ????}??
- ??
- ????public?void?memberLeft(Address?mbr)?{??
- ????????System.out.println("in?memberLeft("?+?mbr?+?")");??
- ????}??
- ??
- ????@SuppressWarnings("unchecked")??
- ????public?void?start()?throws?Exception?{??
- ????????//??
- ????????bus?=?new?NotificationBus("NotificationBusTest",?null);??
- ????????bus.setConsumer(this);??
- ????????bus.start();??
- ????????cache?=?(LinkedList<Serializable>)?bus.getCacheFromCoordinator(3000,?1);??
- ????????if?(cache?==?null)?{??
- ????????????cache?=?new?LinkedList<Serializable>();??
- ????????}??
- ????????System.out.println(printCache(cache));??
- ??
- ????????//??
- ????????sendNotification();??
- ??
- ????????//??
- ????????bus.stop();??
- ????}??
- ??
- ????private?void?sendNotification()?throws?Exception?{??
- ????????boolean?succeed?=?false;??
- ????????BufferedReader?br?=?null;??
- ????????try?{??
- ????????????br?=?new?BufferedReader(new?InputStreamReader(System.in));??
- ????????????while?(true)?{??
- ????????????????System.out.print(">?");??
- ????????????????System.out.flush();??
- ????????????????String?line?=?br.readLine();??
- ????????????????if?(line?!=?null?&&?line.equals("exit"))?{??
- ????????????????????break;??
- ????????????????}?else?{??
- ????????????????????bus.sendNotification(line);??
- ????????????????}??
- ????????????}??
- ????????????succeed?=?true;??
- ????????}?finally?{??
- ????????????if?(br?!=?null)?{??
- ????????????????try?{??
- ????????????????????br.close();??
- ????????????????}?catch?(Exception?e)?{??
- ????????????????????if?(succeed)?{??
- ????????????????????????throw?e;??
- ????????????????????}??
- ????????????????}??
- ????????????}??
- ????????}??
- ????}??
- ??
- ????private?String?printCache(LinkedList<Serializable>?c)?{??
- ????????StringBuffer?sb?=?new?StringBuffer();??
- ????????sb.append("[");??
- ????????for?(Iterator<Serializable>?iter?=?c.iterator();?iter.hasNext();)?{??
- ????????????sb.append(iter.next());??
- ????????????if?(iter.hasNext())?{??
- ????????????????sb.append(",");??
- ????????????}??
- ????????}??
- ????????sb.append("]");??
- ????????return?sb.toString();??
- ????}??
- ??
- ????public?static?void?main(String[]?args)?{??
- ????????try?{??
- ????????????new?NotificationBusTest().start();??
- ????????}?catch?(Exception?e)?{??
- ????????????e.printStackTrace();??
- ????????}??
- ????}??
-
}
4 Protocol Stack
4.1 Transport protocols
??? Transport protocols是指協議棧中最底層的協議,它們負責發送和接收消息。JGgroups提供了以下幾種transport protocols。
4.1.1 UDP
??? JGroups中的UDP協議使用IP multicast向集群發送消息,使用UDP
datagram向單個的成員發送unicast消息。啟動后會打開兩個socket,分別是multicast socket和unicast
socket。Channel的地址是unicast socket的地址和端口號。UDP通常用于集群中的成員分布于LAN內的情況。
??? 如果使用UDP和PING做為協議棧的底層協議,那么JGroups會使用IP
multicast發現集群中的成員,以及向集群發送發送消息。然而,如果IP
multicast在子網間被禁用,那么可以設置UDP的ip_mcast屬性為false,以便指定UDP使用多個unicast
messages向集群發送消息,而不是使用multicast
message。此外,還需要設置PING的gossip_系列屬性,以便指定PING使用GossipRouter來發現集群中的其它成員。需要注意的
是,對GossipRouter的依賴可能會導致single point of failure,而且系統的可伸縮性也比較差。
??? 在啟動任何成員之前,首先要啟動GossipRouter(否則成員需要處理MergeView消息用于合并subgroup的狀態),例如:
- java?org.jgroups.stack.GossipRouter?-port? 5555 ?-bindaddress?localhost??
??? UDP和PING的配置如下:
- < UDP ? ip_mcast = "false" ? /> ??
- < PING ? gossip_host = "localhost" ? gossip_port = "5555" ? gossip_refresh = "15000" ? timeout = "2000" ? num_initial_members = "3" /> ??
4.1.2 TCP
??? 當集群中的成員分布于WAN時(路由器會丟棄IP multicast報文),TCP可能是唯一可用的傳輸協議。當使用TCP作為傳輸協議是,可用的發現協議有:
- PING with GossipRouter: 跟4.1.1中介紹的一樣,p_mcast屬性必須設置成false,GossipRouter 也必須先于集群中的成員啟動。
- TCPPING: 從特定已知的成員處得到集群中其它成員的信息。
- TCPGOSSIP: 除了允許多個GossipRouters 之外,TCPGOSSIP 跟PING相同。
??? 以下是個使用TCP和TCPPING的例子:
- < TCP ? start_port = "7800" ? /> ?+??
- < TCPPING ? initial_hosts = "HostA[7800],HostB[7800]" ? port_range = "5" ? timeout = "3000" ? num_initial_members = "3" ? /> ??
??? 使用TCPPING的優點是不需要額外GossipRouters,而是從集群的成員中選擇那些已知的成員,例如以上例子中的HostA[7800]和 HostB[7800],并從這些成員處得到其它成員的信息。TCP協議的start_port="7800"屬性指定了選擇7800作為端口號,如果該 端口號被占用,那么嘗試下一個(7801)端口號,直到找到可用的端口號。TCPPING協議會嘗試連接HostA和HostB,連接的端口號的范圍是從 7800到7800 + port_range -1(在以上例子中是7804)。
??? 以下是個使用TCP和TCPGOSSIP的例子:
- < TCP ? /> ??
- < TCPGOSSIP ? initial_hosts = "localhost[5555],localhost[5556]" ? gossip_refresh_rate = "10000" ? num_initial_members = "3" ? /> ??
??? 以上例子中,initial_hosts 屬性用于指定GossipRouter的地址和端口號。GossipRouter需要先于集群中的成員啟動。
4.2 Reliable Message
4.2.1 pbcast.NAKACK
???
NAKACK協議保證了向集群的所有成員發送的消息的傳輸可靠性,以及消息的FIFO順序。消息傳輸的可靠性是指發送的消息不會丟失。此外發送者將發送的
消息編號,如果接收者沒有收到特定編號的消息,那么發送者會收到重新發送的請求。FIFO順序是指接收者會以消息發送的順序接收消息。以下是部分
NAKACK協議的屬性:
- retransmit_timeout 逗號分割的一系列毫秒數。例如100,200,400,800,1600。在第一次發送重傳輸請求前等待100ms,第二次發送重傳輸請求前等待 200ms,依此類推直到等待1600ms。從這以后,每次發送重傳輸請求前等待100ms。
- use_mcast_xmit 當某個成員接收到P成員發送的對于消息M的重傳輸請求,該成員會向P重新發送消息M。考慮到集群中的其它成員也可能沒有收到消息M,如果 use_mcast_xmit設置為true,那么該成員會向整個集群重新發送消息M。如果使用UDP作為傳輸協議,那么JGroups使用IP Multicast;如果使用TCP作為傳輸協議,那么會發送n-1個unicast消息(n是集群中消息的個數)。
- use_mcast_xmit_req 跟use_mcast_xmit屬性類似,不同之處在于對重傳輸的請求消息進行組播發送。
- xmit_from_random_member 如果xmit_from_random_member設置為true,那么JGroups會從集群的成員的中隨機挑選一個成員,并向這個成員發送重傳輸請 求。這樣做優點是對于進行了負載均衡,缺點是隨機挑選的那個成員可能也沒有收到消息,在這種情況下還需要繼續發送重傳輸請求。需要注意的是,如果這個屬性 設置為true,那么discard_delivered_msgs屬性必須設置為false。
- discard_delivered_msgs 如果discard_delivered_msgs設置為true,那么集群中的成員不會緩存其它成員發送的消息(因此不需要STABLE協議來對這些消息進行垃圾收集)。這意味著重傳輸請求只能發送給消息的最初發送者。
- max_xmit_buf_size 通常收到的消息會緩存在retransmission table中,這個屬性指定了retransmission table的上限。如果retransmission table達到上限,那么舊的項目會被丟棄。需要主要的是,設置這個屬性可能導致消息丟失。
4.2.2 UNICAST
??? UNICAST協議保證了單獨的發送者和接收者之間傳遞的消息的傳輸可靠性,以及消息的FIFO順序。在可靠的傳輸協議(例如TCP)之上,
UNICAST協議并不是必須的。然而,UNICAST可以防止相同發送者上的并發的消息傳遞。除非希望如此,否則應該在協議棧中包含UNICAST協
議。
??? 以下是部分UNICAST協議的屬性:
- retransmit_timeout 逗號分割的一系列毫秒數。例如100,200,400,800,1600。在第一次發送重傳輸請求前等待100ms,第二次發送重傳輸請求前等待 200ms,依此類推直到等待1600ms。從這以后,每次發送重傳輸請求前等待100ms。
4.3 Failure Detection
??? Failure detection 的目的是檢測集群內的成員是否崩潰。當某個成員被懷疑崩潰時,那么會向集群中的每個成員發送SUSPECT
消息,以進行通知。需要注意的時是,Failure detection
并不負責從集群中清除崩潰的成員(實際上是由GMS協議負責),它只是負責發現可能已經崩潰的成員,并通知集群中的其它成員。
4.3.1 FD
??? FD協議基于心跳消息。如果在timeout指定的毫秒內沒有接收到某個成員的應答,并且在嘗試了max_tries 指定的次數后,那么這個成員會被標記為可疑,并將被GMS協議從集群中清除。
每個成員向其右側的鄰居(當前view的成員列表中,該成員的下一個成員。列表中最后的成員的右側鄰居是列表的第一個成員)發送帶有
FdHeader.HEARTBEAT頭的消息。當鄰居收到這個消息后,它會應答帶有FdHeader.HEARTBEAT_ACK頭的消息。每當收到應
答時,FD協議的last_ack屬性會被更新成當前的時間,num_tries也會設置為0。如果當前時間和last_ack之差大于timeout指
定的毫秒數,那么FD協議會最多嘗試max_tries 指定的次數,如果仍然沒有收到應答,那么這個鄰居會被標記為可疑。
4.3.2 FD_SOCK
??? FD_SOCK協議基于一個有TCP sockets組成的環,即集群中的每個成員都通過TCP
socket連接到右側的鄰居(當前view的成員列表中,該成員的下一個成員。列表中最后的成員的右側鄰居是列表的第一個成員)。當某個成員檢測到它的
鄰居非正常地關閉了TCP socket之后,那么它會把這個鄰居標記為可疑。
4.4 Miscellaneous
4.4.1 STABLE
??? 為了響應可能的重傳輸請求,集群中的成員需要保存一定數量的消息直到它確定這些消息已經被集群中所有的成員成功地接收。對于某個消息M來說,
message stability
意味著M已經被集群中所有的成員接收。STABLE協議周期性地(或者收到消息的字節數達到的配置的上限)向集群中的所有成員發送stable
messages,這些消息中包含了特定成員收到的最大序號。當集群中的每個成員都收到了其它所有成員的stable
messages后,可以計算出目前每個成員已經收到的消息的最小序號,接下來這個序號被發送到集群中每個成員,最后每個成員會從自己的
retransmission
tables中刪除小于這個最小序號的最小消息。需要注意的是,如果沒有在協議棧中配置STABLE,那么可能會導致內存耗盡。以下是個配置STABLE
協議的例子:
- < pbcast.STABLE ? stability_delay = "1000" ? desired_avg_gossip = "50000" ? max_bytes = "1000000" /> ??
??? 以上例子中stability_delay屬性指定,在發送消息前等待1~1000毫秒,以避免所有的成員同時發送消息。 desired_avg_gossip屬性指定發送stable messages的周期,單位是毫秒,如果是0,那么禁用周期檢查。max_bytes指定了在發送stable message消息前,接收到的消息的最大字節數。
4.4.2 pbcast.FLUSH
??? 4.2 Reliable Message中介紹了保證消息可靠傳輸的協議,但是在某些情況下,這種保證是不夠的,考慮以下情況:
集群中某個成員A向集群發送消息M1,此時A的當前View是V1={A,B,C},也就是說A認為M1將發送到A(如果Channel.LOCAL選項是true)、B和C。正在此時,D也加入到集群中,那么D可能會,也可能不會收到M1。
通過在協議棧中配置FLUSH協議可以保證:
- 發送到V1的消息只會被傳遞到V1。所以以上例子中D不會收到M1。
- 在安裝V2前,集群中所有的成員都收到相同的消息。例如一個集群V1={A,B,C}中,C發送了5個消息,A收到了C發送的這5個消息,但是B 只收到了其中前3個。如果此時C崩潰,那么FLUSH協議會保證,在安裝V2={A,B} 前,B會收到所有C發送過的消息。在這種情況下,A會向B發送后兩個消息。
??? 通常,在以下兩種情況下需要使用FLUSH協議:
- State transfer 當某個成員請求狀態傳遞時,它通知其它成員停止發送消息并等待響應。接下來coordinator會將狀態發送給這個成員。當該成員接收到狀態后,它通知其它成員可以繼續發送消息。
- View changes 在安裝新的view時,所有發送到V1的消息都會被傳遞到V1。
??? FLUSH協議通常在STATE_TRANSFER、STATE_TRANSFER 或者 GMS
協議之上。此外需要注意的時,FLUSH協議必須是協議棧的最上層協議。除了JGroups自動處理FLUSH之外,JGroups也允許開發人員顯式調
用 Channel.startFlush()方法發起flush。在Channel.startFlush()方法返回后,在調用
Channel.stopFlush()方法之前,可以保證集群中的所有成員不能發送消息,而且Channel.startFlush()方法調用前發送
的消息都會被所有成員接收。在調用了Channel.stopFlush()方法之后,集群中的所有成員可以繼續發送消息。
???
如果將Channel.BLOCK屬性設置為true(缺省是false),那么可以在flush階段得到通知。如果采用poll方式,那么在某個成員調
用Channel.startFlush()方法后,其它成員會收到EVENT.BLOCK消息,這些成員應該發送EVENT.BLOCK_OK消息進行
響應。如果采用push方式,那么channel上注冊的MembershipListener的block()方法會被調用。
4.4.3 MERGE2
??? 假設由于某種原因(例如switch故障),某個集群{A,B,C,D,E},分裂為兩個子集群{A,B,C}
和{D,E},A、B和C可以互相ping通,D和E可以互相ping通,但是A、B和C卻ping不通D和E。在這種情況下,由于兩個子集群獨立工作,
會導致這兩個子集群的狀態并不相同。當故障解除后,MERGE2協議會通知集群中的成員,這兩個子集群將合并成一個。
???
至于如何處理狀態的合并,需要應用程序自己決定,這是因為JGroups并不了解集群的狀態。需要注意的是,用于合并的狀態的代碼應該在單獨的線程中執
行。一種簡單的處理方式是對于原來是主子集群中的成員不做任何處理,對于其它的成員則丟棄當前狀態,從合并后集群的coordinator處重新獲得狀
態。