MetaQ集群安裝測試
1,ZooKeeper集群安裝,自己搜索安裝
2,下載https://github.com/killme2008/Metamorphosis/tree/metamorphosis-all-1.4.6.2,如果不想自己編譯可以直接下載http://fnil.net/downloads/index.html,我這里選擇自己編譯,主要是以后如果出現(xiàn)問題自己可以修改其源碼,重新編譯
3,maven編譯,maven環(huán)境自己搜索配置好,下載all項(xiàng)目后需要編譯其子項(xiàng)目metamorphosis-server-wrapper。dos環(huán)境進(jìn)入其目錄下mvn eclipse:eclipse,完成后導(dǎo)入到eclipse,用eclipse插件編譯。或者直接dos該目錄下執(zhí)行mvn clean install -Dmaven.test.skip=true。完成后target目錄下生產(chǎn)其jar包;
可以在工程創(chuàng)建lib文件夾,輸入以下命令:mvn dependency:copy-dependencies -DoutputDirectory=lib (不加DoutputDirectory會(huì)默認(rèn)輸出到targed/dependency下)。再把install的jar包也copy到lib下。
4,完成編譯后上傳到服務(wù)器
需要修改conf/server.ini文件
[system]brokerId=2 numPartitions=1 serverPort=8123 ashboardHttpPort=8120 unflushThreshold=0 unflushInterval=10000 maxSegmentSize=1073741824 maxTransferSize=1048576 deletePolicy=delete,168 deleteWhen=0 0 6,18 * * ? flushTxLogAtCommit=1 stat=true dataPath=/data1/metaq/data dataLogPath=/data1/metaq/log [zookeeper] zk.zkConnect=192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181 zk.zkSessionTimeoutMs=30000 zk.zkConnectionTimeoutMs=30000 zk.zkSyncTimeMs=5000 ;; Topics section [topic=test] [topic=meta-test] |
集群的話需要修改上面標(biāo)紅部分,brokerId保證每個(gè)服務(wù)器節(jié)點(diǎn)上不一樣就行
dataPath,dataLogPath如果自己制定,需要每臺服務(wù)器mkdir
分發(fā)到個(gè)節(jié)點(diǎn),在每臺節(jié)點(diǎn)的bin下都執(zhí)行metaServer.sh start
需要停止時(shí)執(zhí)行metaServer.sh stop
查看狀態(tài)sh metaServer.sh status
5,應(yīng)用例子
package com.test.metaq; import java.util.concurrent.Executor; import com.taobao.metamorphosis.Message; import com.taobao.metamorphosis.client.MessageSessionFactory; import com.taobao.metamorphosis.client.MetaClientConfig; import com.taobao.metamorphosis.client.MetaMessageSessionFactory; import com.taobao.metamorphosis.client.consumer.ConsumerConfig; import com.taobao.metamorphosis.client.consumer.MessageConsumer; import com.taobao.metamorphosis.client.consumer.MessageListener; import com.taobao.metamorphosis.exception.MetaClientException; import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig; public class AsyncConsum { public static void main(String[] args) { final MetaClientConfig metaClientConfig = new MetaClientConfig(); final ZKConfig zkConfig = new ZKConfig(); zkConfig.zkConnect = "192.168.1.1:2181"; metaClientConfig.setZkConfig(zkConfig); MessageSessionFactory sessionFactory = null; try { sessionFactory = new MetaMessageSessionFactory(metaClientConfig); } catch (MetaClientException e) { // TODO Auto-generated catch block e.printStackTrace(); } final String topic = "test"; final String group = "meta-example"; MessageConsumer consumer = sessionFactory.createConsumer(new ConsumerConfig(group)); try { consumer.subscribe(topic, 1024 * 1024, new MessageListener() { public void recieveMessages(Message message) { System.out.println("Receive message " + new String(message.getData())); } public Executor getExecutor() { return null; } }); consumer.completeSubscribe(); } catch (MetaClientException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } |
package com.test.metaq; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import com.taobao.metamorphosis.Message; import com.taobao.metamorphosis.client.MessageSessionFactory; import com.taobao.metamorphosis.client.MetaClientConfig; import com.taobao.metamorphosis.client.MetaMessageSessionFactory; import com.taobao.metamorphosis.client.producer.MessageProducer; import com.taobao.metamorphosis.client.producer.SendResult; import com.taobao.metamorphosis.exception.MetaClientException; import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig; public class Products { public static void main(String[] args) { final MetaClientConfig metaClientConfig = new MetaClientConfig(); final ZKConfig zkConfig = new ZKConfig(); zkConfig.zkConnect = "192.168.1.1:2181"; metaClientConfig.setZkConfig(zkConfig); MessageSessionFactory sessionFactory = null; try { sessionFactory = new MetaMessageSessionFactory(metaClientConfig); } catch (MetaClientException e) { e.printStackTrace(); } MessageProducer producer = sessionFactory.createProducer(); final String topic = "test"; producer.publish(topic); BufferedReader reader = new BufferedReader(new InputStreamReader( System.in)); String line = "qiujinyong"; try { while ((line = reader.readLine()) != null) { SendResult sendResult = producer.sendMessage(new Message(topic, line.getBytes())); if (!sendResult.isSuccess()) { System.err.println("Send message failed,error message:" + sendResult.getErrorMessage()); } else { System.out.println("Send message successfully,sent to " + sendResult.getPartition()); } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (MetaClientException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } |
打包test.jar后,傳服務(wù)器上 java -cp test.jar com.test.metaq.Products 命令行輸入message
打包test.jar后,傳服務(wù)器上 java -cp test.jar com.test.metaq.AsyncConsum 命令行會(huì)接收到message
posted on 2014-04-24 10:18 順其自然EVO 閱讀(237) 評論(0) 編輯 收藏 所屬分類: 測試學(xué)習(xí)專欄