paulwong

          #

          2014年值得關注的十個Hadoop大數據創業公司

          開源大數據框架Apache Hadoop已經成了大數據處理的事實標準,同時也幾乎成了大數據的代名詞,雖然這多少有些以偏概全。

          根據Gartner的估計,目前的Hadoop生態系統市場規模在7700萬美元左右,2016年,該市場規模將快速增長至8.13億美元。

          但是在Hadoop這個快速擴增的藍海中游泳并非易事,不僅開發大數據基礎設施技術產品這件事很難,銷售起來也很難,具體到大數據基礎設施工具如 Hadoop、NoSQL數據庫和流處理系統則更是難上加難。客戶需要大量培訓和教育,付費用戶需要大量支持和及時跟進的產品開發工作。而跟企業級客戶打 交道往往并非創業公司團隊的強項。此外,大數據基礎設施技術創業通常對風險投資規模也有較高要求。

          盡管困難重重,Hadoop創業公司依然如雨后春筍冒出,除了Cloudera、Datameer、DataStax和MapR等已經功成名就的 Hadoop創業公司外,最近CIO雜志評出了2014年十大最值得關注的Hadoop創業公司,了解這些公司的產品和商業模式對企業大數據技術創業者和 大數據應用用戶來說都非常有參考價值:

          一、Platfora

          platfora

          業務:所提供的大數據分析解決方案能夠將Hadoop中的原始數據轉換成可互動的,基于內存計算的商業智能服務。

          簡介:創立于2011年,迄今已募集6500萬美元。

          入選理由:Platfora的目標是簡化復雜難用的Hadoop,推動Hadoop在企業市場的應用。Platfora的做法是簡化數據采集和分析 流程,將Hadoop中的原始數據自動轉化成可以互動的商業智能服務,無需ETL或者數據倉庫。(參考閱讀:Hadoop只是窮人的ETL)

           

          二、Alpine Data Labs

          alpine data

          業務:提供基于Hadoop的數據分析平臺

          簡介:創立于2010年,迄今累計融資2350萬美元。

          入選理由:復雜的高級分析和機器學習應用通常都需要腳本和代碼開發高手實現,這進一步推高了數據科學家的技術門檻。實際上大數據企業高管和IT經理都沒時間也沒興致學習編程技術,或者去了解復雜的Hadoop。Alpine Data通過SaaS服務的方式大幅降低了預測分析的應用門檻。

           

          三、Altiscale

          altiscale

          業務:提供Hadoop即服務(HaaS)

          簡介:創立于2012年3月,迄今融資1200萬美元。

          入選理由:大數據正在鬧人才荒,而通過云計算提供Hadoop相關服務無疑是普及Hadoo的一條捷徑,根據TechNavio的估計,2016年 HaaS市場規模將高達190億美元,是塊大蛋糕。但是HaaS市場的競爭已經日趨激烈,包括亞馬遜EMR、微軟的Hadoop on Azure,以及Rackspace的Hortonworks云服務等都是重量級玩家,Altiscale還需要與Hortonworks、 Cloudera、Mortar Data、Qubole、Xpleny展開直接競爭。

           

          四、Trifacta

          trifacta

          業務:提供平臺幫助用戶將復雜的原始數據轉化成干凈的結構化格式供分析使用。

          簡介:創立于2012年,迄今融資1630萬美元。

          入選理由:大數據技術平臺和分析工具之間存在一個巨大的瓶頸,那就是數據分析專家需要花費大量精力和時間轉化數據,而且業務數據分析師們往往也并不 具備獨立完成數據轉化工作的技術能力。為了解決這個問題Trifacta開發出了“預測互動”技術,將數據操作可視化,而且Trifacta的機器學習算 法還能同時觀察用戶和數據屬性,預測用戶意圖,并自動給出建議。Trifata的競爭對手是Paxata、Informatica和CirroHow。

           

          五、Splice Machine

          splice machine

          業務:提供面向大數據應用的,基于Hadoop的SQL兼容數據庫。

          簡介:創立于2012年,迄今融資1900萬美元。

          入選理由:新的數據技術使得傳統關系型數據庫的一些流行功能如ACID合規、交易一致性和標準的SQL查詢語言等得以在廉價可擴展的Hadoop上 延續。Splice Machine保留了NoSQL數據庫所有的優點,例如auto-sharding,容錯、可擴展性等,同時又保留了SQL。

           

          六、DataTorrent

          datarorrent

          業務:提供基于Hadoop平臺的實時流處理平臺

          簡介:創立于2012年,2013年6月獲得800萬美元A輪融資。

          入選理由:大數據的未來是快數據,而DataTorrent正是要解決快數據的問題。

           

          七、Qubole

          qubole

          業務:提供大數據DaaS服務,基于“真正的自動擴展Hadoop集群”。

          簡介:創立于2011年,累計融資700萬美元。

          入選理由:大數據人才一將難求,對于大多數企業來說,像使用SaaS企業應用一樣使用Hadoop是一個現實的選擇。

           

          八、Continuuity 

          continuuity

          業務:提供基于Hadoop的大數據應用托管平臺

          簡介:創立于2011年,累計獲得1250萬美元融資,創始人兼CEO Todd Papaioannou曾是雅虎副總裁云架構負責人,去年夏天Todd離開Continuuity后,聯合創始人CTO Jonathan Gray接替擔任CEO一職。

          入選理由:Continuuity的商業模式非常聰明也非常獨特,他們繞過非常難纏的Hadoop專家,直接向Java開發者提供應用開發平臺,其 旗艦產品Reactor是一個基于Hadoop的Java集成化數據和應用框架,Continuuity將底層基礎設施進行抽象處理,通過簡單的Java 和REST API提供底層基礎設施服務,為用戶大大簡化了Hadoop基礎設施的復雜性。Continuuity最新發布的服務——Loom是一個集群管理方案,通 過Loom創建的集群可以使用任意硬件和軟件堆疊的模板,從單一的LAMP服務器和傳統應用服務器如JBoss到包含數千個節點的大規模的Hadoop集 群。集群還可以部署在多個云服務商的環境中(例如Rackspace、Joyent、Openstack等)而且還能使用常見的SCM工具。

           

          九、Xplenty

          xplenty

          業務:提供HaaS服務

          簡介:創立于2012年,從Magma風險投資獲得金額不詳的融資。

          入選理由:雖然Hadoop已經成了大數據的事實工業標準,但是Hadoop的開發、部署和維護對技術人員的技能依然有著極高要求。Xplenty 的技術通過無需編寫代碼的Hadoop開發環境提供Hadoop處理服務,企業無需投資軟硬件和專業人才就能快速享受大數據技術。

          十、Nuevora

          nuevora

          業務:提供大數據分析應用

          簡介:創立于2011年,累計獲得300萬早期投資。

          入選理由:Nuevora的著眼點是大數據應用最早啟動的兩個領域:營銷和客戶接觸。Nuevora的nBAAP(大數據分析與應用)平臺的主要功 能包括基于最佳時間預測算法的定制分析應用,nBAAP基于三個關鍵大數據技術:Hadoop(大數據處理)、R(預測分析)和Tableau(數據可視 化)

          posted @ 2014-05-23 12:15 paulwong 閱讀(360) | 評論 (0)編輯 收藏

          12 款最好的 Bootstrap 設計工具

          作為一位設計師,會經常追尋新鮮有趣的設計工具,這些工具會提高工作的效率,使得工作更有效, 最重要的是使工作變得更方便。非常肯定的說,隨著日益增長的工具和應用的數量,設計和開發變得越來越簡單了。其中最普遍使用的最終框架 之一是 Bootstrap,它在 2013 年特別流行。如果你是位設計師,你可能會接觸過它,甚至是使用過它。如果你是 Bootstrap 的使用者或者是相關功能的用戶,這篇文章非常的適合你!

          這里總共列舉了 12 款最好的 Bootstrap 設計工具,這些都能很好的簡化大家的工作。希望大家能從這些列表中找到適合自己的,在評論跟大家分享一下使用的感想和其他類似的工具,Enjoy :)

          12. Bootstrap Designer

          Bootstrap Designer

          Bootstrap Designer 是一個在線工具,不需要下載就可以使用。用戶可以使用它創建漂亮和迷人基于 Bootstrap 框架的 HTML5 模板。

          11. Get Kickstrap

          Get Kickstrap

          如果用戶想結合先進高級的 web 技術和 Bootstrap,可以嘗試一下 Get Kickstrap。這款特別的工具非常高大上,可以運行數據庫驅動的 web 應用程序,而且還不用任何的后臺哦 :D

          10. Bootply

          Bootply

          這個工具擁有非常多樣化的庫,集成了其他 Bootstrap 基礎插件,框架和庫。

          9. Bootstrap Button Generator

          Bootstrap Button Generator

          這個特別的工具能幫助用戶輕松創建各種類型的按鈕,用戶只需要輸入 CSS 類到新的按鈕中,并且選擇相應的顏色值就可以了。當需要使用的時候,只需要復制粘貼就能輕松創建漂亮的按鈕了。 

          8. Easel

          Easel

          這個工具能在瀏覽器中運行,這個非常基礎的工具能幫助在小團隊工作的開發者和設計者建立非常真實的 web 元素。

          7. Layoutit

          Layoutit

          用戶只需要使用拖拽界面構建器就可以輕松創建前端代碼。

          6. Bootswatch

          Bootswatch

          這是一個開源的工具,而且非常容易安裝和使用。

          5. Boottheme

          Boottheme

          這是一款在線主題生成器和 Twitter Bootstrap 的 web 設計工具。用戶可以在設計 webapp 時生成和預覽主題。

          4. Custom Font Tool

          Custom Font Tool

          這個特別的工具允許用戶創建非常個性化的字體,通過非常舒適的命令行方式。

          3. WordPress Twitter Bootstrap CSS

          WordPress Twitter Bootstrap CSS

          這是另外一個基于 CSS 的 Twitter Bootstrap,讓用戶能很好的在 WordPress 網站上使用 Twitter Bootstrap JavaScript 庫。

          2. Bootmetro

          Bootmetro

          這是個非常靈活和簡單的框架,用戶可以創建現代化和經典的 web 應用。同時,也可以定義類似 Windows 8 的風格和感覺。

          1. Flat UI Free–Framework and Bootstrap Theme Design

          Flat UI Free–Framework and Bootstrap Theme Design

          這是另外一個基于 Twitter Bootstrap 的工具,可以運行一個平滑風格,組件可以包含 PSD 文件的 UI。使用這個特殊的工具可以創建出非常有創意的 UI。

          posted @ 2014-05-23 12:13 paulwong 閱讀(335) | 評論 (0)編輯 收藏

          簡單的 6 步配置 MongoDB 復制集

               摘要: 在本次教程中,我們將創建含有三個節點的群集。第一個節點是主節點,第二個節點是Failover節點,第三個節點是仲裁節點。1. 安裝Mongo并設置配置文件安裝3臺服務器并調整配置文件/etc/mongod.conf:1#Select your replication set name2replSet=[replication_set_name]3#Selec...  閱讀全文

          posted @ 2014-05-23 12:11 paulwong 閱讀(320) | 評論 (0)編輯 收藏

          BUTTON樣式

          http://www.oschina.net/p/buttons

          posted @ 2014-05-20 18:19 paulwong 閱讀(316) | 評論 (0)編輯 收藏

          NUTCH+ELASTICSEARCH ON AWS

          Elasticsearch: Indexing SQL databases. The easy way.
          http://blog.comperiosearch.com/blog/2014/01/30/elasticsearch-indexing-sql-databases-the-easy-way/


          Elasticsearch-HBase-River
          https://github.com/mallocator/Elasticsearch-HBase-River

          Elasticsearch river
          http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/modules-plugins.html#river


          Mongodb river
          http://satishgandham.com/2012/09/a-complete-guide-to-integrating-mongodb-with-elastic-search/
          http://shuminghuang.iteye.com/blog/1829432






          posted @ 2014-05-15 09:14 paulwong 閱讀(746) | 評論 (0)編輯 收藏

          安裝KAFKA

          1. 下載KAFKA
            wget http://apache.fayea.com/apache-mirror/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz

          2. 解壓
            tar -zxf kafka_2.9.2-0.8.1.1.tgz

          3. 0.7之前的版本這時就要安裝相應的包之類的,0.8.1之后就不用了。把命令加進PATH中
            export KAFKA_HOME=/home/ubuntu/java/kafka_2.9.2-0.8.1.1
            export PATH=$JAVA_HOME/bin:$STORM_HOME/bin:$KAFKA_HOME/bin:$ZOOKEEPER_HOME/bin:$BIN_HOME/bin:$MAVEN_HOME/bin:$PATH

          4. SOURCE一下
            source /etc/profile

          5. 制作啟動命令,start-kafka.sh,并放于kafaka_hoem/bin下
            kafka-server-start.sh $KAFKA_HOME/config/server.properties &

          6. 安裝ZOOKEEPER
            wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
            tar -zxf  zookeeper-3.4.6.tar.gz
            cd zookeeper-3.4.6.tar.gz/conf
            cp zoo_sample.cfg zoo.cfg

          7. 改zoo.cfg
            dataDir=$ZOOKEEPER_HOME/data

            #方便查LOG
            dataLogDir=$ZOOKEEPER_HOME/logs

            #控制客戶的連接數,默認數為60,太少
            maxClientCnxns=300

            #如果有多個ZOOKEEPER INSTANCE時
            server.1=10.120.151.223:2888:3888
            server.2=10.120.151.224:2888:3888

          8. 啟動ZOOKEEPER
            zkServer.sh start

          9. 更改KAFKA的配置文件server.properties, 主要改幾個地方
            #這個是配置PRODUCER/CONSUMER連上來的時候使用的地址
            advertised.host.name=54.72.4.92
            #設置KAFKA LOG路徑
            log.dirs=$KAFKA_HOME/logs/kafka-logs
            #設置ZOOKEEPER的連接地址
            zookeeper.connect=54.72.4.92:2181

          10. 啟動KAFKA
            start-kafka.sh

          11. 新建一個TOPIC
            #KAFKA有幾個,replication-factor就填幾個
            kafka-topics.sh --create --topic kafkatopic --replication-factor 1 --partitions 1 --zookeeper localhost:2181

          12. 發送消息至KAFKA
            kafka-console-producer.sh --broker-list localhost:9092 --sync --topic kafkatopic

          13. 另開一個終端,顯示消息的消費
            kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafkatopic --from-beginning

          14. 在發送消息的終端輸入aaa,則可以在消費消息的終端顯示

          posted @ 2014-05-11 10:30 paulwong 閱讀(2745) | 評論 (0)編輯 收藏

          Auto rebalance Storm

          http://stackoverflow.com/questions/15010420/storm-topology-rebalance-using-java-code


          使用Nimbus獲取STORM的信息
          http://www.andys-sundaypink.com/i/retrieve-storm-cluster-statistic-from-nimbus-java-mode/
          TSocket tsocket = new TSocket("localhost", 6627);
          TFramedTransport tTransport = new TFramedTransport(tsocket);
          TBinaryProtocol tBinaryProtocol = new TBinaryProtocol(tTransport);
          Nimbus.Client client = new Nimbus.Client(tBinaryProtocol);
          String topologyId = "test-1-234232567";


          try {

          tTransport.open();
          ClusterSummary clusterSummary = client.getClusterInfo();
          StormTopology stormTopology = client.getTopology(topologyId);
          TopologyInfo topologyInfo = client.getTopologyInfo(topologyId);
          List<ExecutorSummary> executorSummaries = topologyInfo.get_executors();

          List<TopologySummary> topologies = clusterSummary.get_topologies();
          for(ExecutorSummary executorSummary : executorSummaries){

          String id = executorSummary.get_component_id();
          ExecutorInfo executorInfo = executorSummary.get_executor_info();
          ExecutorStats executorStats = executorSummary.get_stats();
          System.out.println("executorSummary :: " + id + " emit size :: " + executorStats.get_emitted_size());
          }
          catch (TTransportException e) {
          e.printStackTrace();
          catch (TException e) {
          e.printStackTrace();
          catch (NotAliveException e) {
          e.printStackTrace();
          }




          posted @ 2014-05-09 23:48 paulwong 閱讀(534) | 評論 (0)編輯 收藏

          淺釋STORM

          STORM是一個消息處理引擎,可以處理源源不斷的進來的消息,這些消息的處理是可以按步驟的。

          處理的方式有各種自定義:

          1. 可自定義消息處理的步驟

          2. 可自定義每種類型的消息需要多少個進程來處理

          3. 每個步驟里的消息是在某個進程里的線程來做處理的

          4. 可自定義每個步驟里的消息的線程數

          5. 可以增加和刪除要處理的消息類型
          如果要處理某種消息了,要怎么辦呢?

          1. 定義數據來源組件(SPOUT)

          2. 定義處理步驟(BOLT)

          3. 組合成一個消息處理流程框架TOPOLOGY

          4. 定義處理消息的進程的數量、定義每個步驟并發時可用的線程數

          5. 部署TOPOLOGY
          當一個TOPOLOGY被部署到STORM時,STORM會查找配置對象的WORKER數量,根據這個數量相應的啟動N個JVM,然后根據每個步驟配置的NUMTASKS生成相應個數的線程,然后每個步驟中配置的數量實例化相應個數的對象,然后就啟動一個線程不斷的執行SPOUT中的nextTuple()方法,如果這個方法中有輸出結果,就啟動另一線程,并在此線程中將這個結果作為參數傳到下一個對象的excue方法中。

          如果此時又有一個步驟BOLT需要執行的話,也是新取一個線程去執行BOLT中的方法啟動的線程不會越過NUMTASKS的數量。



          posted @ 2014-05-09 22:56 paulwong 閱讀(257) | 評論 (0)編輯 收藏

          Storm performance

          The configuration is used to tune various aspects of the running topology. The two configurations specified here are very common:

          1. TOPOLOGY_WORKERS (set with setNumWorkers) specifies how many processes you want allocated around the cluster to execute the topology. Each component in the topology will execute as many threads. The number of threads allocated to a given component is configured through the setBolt and setSpout methods. Those threadsexist within worker processes. Each worker process contains within it some number of threads for some number of components. For instance, you may have 300 threads specified across all your components and 50 worker processes specified in your config. Each worker process will execute 6 threads, each of which of could belong to a different component. You tune the performance of Storm topologies by tweaking the parallelism for each component and the number of worker processes those threads should run within.
          2. TOPOLOGY_DEBUG (set with setDebug), when set to true, tells Storm to log every message every emitted by a component. This is useful in local mode when testing topologies, but you probably want to keep this turned off when running topologies on the cluster.

          There's many other configurations you can set for the topology. The various configurations are detailed on the Javadoc for Config.


          Common configurations


          There are a variety of configurations you can set per topology. A list of all the configurations you can set can be found here. The ones prefixed with "TOPOLOGY" can be overridden on a topology-specific basis (the other ones are cluster configurations and cannot be overridden). Here are some common ones that are set for a topology:

          1. Config.TOPOLOGY_WORKERS: This sets the number of worker processes to use to execute the topology. For example, if you set this to 25, there will be 25 Java processes across the cluster executing all the tasks. If you had a combined 150 parallelism across all components in the topology, each worker process will have 6 tasks running within it as threads.
          2. Config.TOPOLOGY_ACKERS: This sets the number of tasks that will track tuple trees and detect when a spout tuple has been fully processed. Ackers are an integral part of Storm's reliability model and you can read more about them onGuaranteeing message processing.
          3. Config.TOPOLOGY_MAX_SPOUT_PENDING: This sets the maximum number of spout tuples that can be pending on a single spout task at once (pending means the tuple has not been acked or failed yet). It is highly recommended you set this config to prevent queue explosion.
          4. Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS: This is the maximum amount of time a spout tuple has to be fully completed before it is considered failed. This value defaults to 30 seconds, which is sufficient for most topologies. SeeGuaranteeing message processing for more information on how Storm's reliability model works.
          5. Config.TOPOLOGY_SERIALIZATIONS: You can register more serializers to Storm using this config so that you can use custom types within tuples.

          Reference:
          http://storm.incubator.apache.org/documentation/Running-topologies-on-a-production-cluster.html

          storm rebalance 命令調整topology并行數及問題分析
          http://blog.csdn.net/jmppok/article/details/17243857

          flume+kafka+storm+mysql 數據流
          http://blog.csdn.net/jmppok/article/details/17259145



          http://storm.incubator.apache.org/documentation/Tutorial.html

          posted @ 2014-05-08 09:19 paulwong 閱讀(277) | 評論 (0)編輯 收藏

          KMEANS PAGERANK ON HADOOP

          https://github.com/keokilee/kmeans-hadoop

          https://github.com/rorlig/hadoop-pagerank-java

          http://wuyanzan60688.blog.163.com/blog/static/12777616320131011426159/

          http://codecloud.net/hadoop-k-means-591.html


          import java.io.*;
          import java.net.URI;
          import java.util.Iterator;
          import java.util.Random;
          import java.util.Vector;

          import org.apache.hadoop.filecache.DistributedCache;
          import org.apache.hadoop.fs.FileSystem;
          import org.apache.hadoop.fs.Path;
          import org.apache.hadoop.io.*;
          import org.apache.hadoop.mapred.*;
          import org.apache.hadoop.util.GenericOptionsParser;

          public class KMeans {
              static enum Counter { CENTERS, CHANGE, ITERATIONS }

              public static class Point implements WritableComparable<Point> {
                  // Longs because this will store sum of many ints
                  public LongWritable x;
                  public LongWritable y;
                  public IntWritable num; // For summation points

                  public Point() {
                      this.x = new LongWritable(0);
                      this.y = new LongWritable(0);
                      this.num = new IntWritable(0);
                  }

                  public Point(int x, int y) {
                      this.x = new LongWritable(x);
                      this.y = new LongWritable(y);
                      this.num = new IntWritable(1);
                  }

                  public Point(IntWritable x, IntWritable y) {
                      this.x = new LongWritable(x.get());
                      this.y = new LongWritable(y.get());
                      this.num = new IntWritable(1);
                  }

                  public void add(Point that) {
                      x.set(x.get() + that.x.get());
                      y.set(y.get() + that.y.get());
                      num.set(num.get() + that.num.get());
                  }

                  public void norm() {
                      x.set(x.get() / num.get());
                      y.set(y.get() / num.get());
                      num.set(1);
                  }

                  public void write(DataOutput out) throws IOException {
                      x.write(out);
                      y.write(out);
                      num.write(out);
                  }

                  public void readFields(DataInput in) throws IOException {
                      x.readFields(in);
                      y.readFields(in);
                      num.readFields(in);
                  }

                  public long distance(Point that) {
                      long dx = that.x.get() - x.get();
                      long dy = that.y.get() - y.get();

                      return dx * dx + dy * dy;
                  }

                  public String toString() {
                      String ret = x.toString() + '\t' + y.toString();
                      if (num.get() != 1)
                          ret += '\t' + num.toString();
                      return ret;
                  }

                  public int compareTo(Point that) {
                      int ret = x.compareTo(that.x);
                      if (ret == 0)
                          ret = y.compareTo(that.y);
                      if (ret == 0)
                          ret = num.compareTo(that.num);
                      return ret;
                  }
              }

              public static class Map
                      extends MapReduceBase
                      implements Mapper<Text, Text, Point, Point>
              {
                  private Vector<Point> centers;
                  private IOException error;

                  public void configure(JobConf conf) {
                      try {
                          Path paths[] = DistributedCache.getLocalCacheFiles(conf);
                          if (paths.length != 1)
                              throw new IOException("Need exactly 1 centers file");

                          FileSystem fs = FileSystem.getLocal(conf);
                          SequenceFile.Reader in = new SequenceFile.Reader(fs, paths[0], conf);

                          centers = new Vector<Point>();
                          IntWritable x = new IntWritable();
                          IntWritable y = new IntWritable();
                          while(in.next(x, y))
                              centers.add(new Point(x, y));
                          in.close();

                          // Generate new points if we don't have enough.
                          int k = conf.getInt("k", 0);
                          Random rand = new Random();
                          final int MAX = 1024*1024;
                          for (int i = centers.size(); i < k; i++) {
                              x.set(rand.nextInt(MAX));
                              y.set(rand.nextInt(MAX));
                              centers.add(new Point(x, y));
                          }
                      } catch (IOException e) {
                          error = e;
                      }
                  }

                  public void map(Text xt, Text yt,
                          OutputCollector<Point, Point> output, Reporter reporter)
                      throws IOException
                  {
                      if (error != null)
                          throw error;

                      int x = Integer.valueOf(xt.toString());
                      int y = Integer.valueOf(yt.toString());
                      Point p = new Point(x, y);
                      Point center = null;
                      long distance = Long.MAX_VALUE;

                      for (Point c : centers) {
                          long d = c.distance(p);
                          if (d <= distance) {
                              distance = d;
                              center = c;
                          }
                      }

                      output.collect(center, p);
                  }
              }

              public static class Combine
                      extends MapReduceBase
                      implements Reducer<Point, Point, Point, Point>
              {
                  public void reduce(Point center, Iterator<Point> points,
                          OutputCollector<Point, Point> output, Reporter reporter)
                      throws IOException
                  {
                      Point sum = new Point();
                      while(points.hasNext()) {
                          sum.add(points.next());
                      }

                      output.collect(center, sum);
                  }
              }

              public static class Reduce
                      extends MapReduceBase
                      implements Reducer<Point, Point, IntWritable, IntWritable>
              {
                  public void reduce(Point center, Iterator<Point> points,
                          OutputCollector<IntWritable, IntWritable> output,
                          Reporter reporter)
                      throws IOException
                  {
                      Point sum = new Point();
                      while (points.hasNext()) {
                          sum.add(points.next());
                      }
                      sum.norm();

                      IntWritable x = new IntWritable((int) sum.x.get());
                      IntWritable y = new IntWritable((int) sum.y.get());

                      output.collect(x, y);

                      reporter.incrCounter(Counter.CHANGE, sum.distance(center));
                      reporter.incrCounter(Counter.CENTERS, 1);
                  }
              }

              public static void error(String msg) {
                  System.err.println(msg);
                  System.exit(1);
              }

              public static void initialCenters(
                      int k, JobConf conf, FileSystem fs,
                      Path in, Path out)
                  throws IOException
              {
                  BufferedReader input = new BufferedReader(
                          new InputStreamReader(fs.open(in)));
                  SequenceFile.Writer output = new SequenceFile.Writer(
                          fs, conf, out, IntWritable.class, IntWritable.class);
                  IntWritable x = new IntWritable();
                  IntWritable y = new IntWritable();
                  for (int i = 0; i < k; i++) {
                      String line = input.readLine();
                      if (line == null)
                          error("Not enough points for number of means");

                      String parts[] = line.split("\t");
                      if (parts.length != 2)
                          throw new IOException("Found a point without two parts");

                      x.set(Integer.valueOf(parts[0]));
                      y.set(Integer.valueOf(parts[1]));
                      output.append(x, y);
                  }
                  output.close();
                  input.close();
              }

              public static void main(String args[]) throws IOException {
                  JobConf conf = new JobConf(KMeans.class);
                  GenericOptionsParser opts = new GenericOptionsParser(conf, args);
                  String paths[] = opts.getRemainingArgs();

                  FileSystem fs = FileSystem.get(conf);

                  if (paths.length < 3)
                      error("Usage:\n"
                              + "\tKMeans <file to display>\n"
                              + "\tKMeans <output> <k> <input file>"
                           );

                  Path outdir  = new Path(paths[0]);
                  int k = Integer.valueOf(paths[1]);
                  Path firstin = new Path(paths[2]);
                  
                  if (k < 1 || k > 20)
                      error("Strange number of means: " + paths[1]);

                  if (fs.exists(outdir)) {
                      if (!fs.getFileStatus(outdir).isDir())
                          error("Output directory \"" + outdir.toString()
                                  + "\" exists and is not a directory.");
                  } else {
                      fs.mkdirs(outdir);
                  }

                  // Input: text file, each line "x\ty"
                  conf.setInputFormat(KeyValueTextInputFormat.class);
                  for (int i = 2; i < paths.length; i++)
                      FileInputFormat.addInputPath(conf, new Path(paths[i]));

                  conf.setInt("k", k);

                  // Map: (x,y) -> (centroid, point)
                  conf.setMapperClass(Map.class);
                  conf.setMapOutputKeyClass(Point.class);
                  conf.setMapOutputValueClass(Point.class);

                  // Combine: (centroid, points) -> (centroid, weighted point)
                  conf.setCombinerClass(Combine.class);

                  // Reduce: (centroid, weighted points) -> (x, y) new centroid
                  conf.setReducerClass(Reduce.class);
                  conf.setOutputKeyClass(IntWritable.class);
                  conf.setOutputValueClass(IntWritable.class);

                  // Output
                  conf.setOutputFormat(SequenceFileOutputFormat.class);

                  // Chose initial centers
                  Path centers = new Path(outdir, "initial.seq");
                  initialCenters(k, conf, fs, firstin, centers);

                  // Iterate
                  long change  = Long.MAX_VALUE;
                  URI cache[] = new URI[1];
                  for (int iter = 1; iter <= 1000 && change > 100 * k; iter++) {
                      Path jobdir = new Path(outdir, Integer.toString(iter));
                      FileOutputFormat.setOutputPath(conf, jobdir);

                      conf.setJobName("k-Means " + iter);
                      conf.setJarByClass(KMeans.class);

                      cache[0] = centers.toUri();
                      DistributedCache.setCacheFiles( cache, conf );

                      RunningJob result = JobClient.runJob(conf);
                      System.out.println("Iteration: " + iter);

                      change   = result.getCounters().getCounter(Counter.CHANGE);
                      centers  = new Path(jobdir, "part-00000");
                  }
              }
          }

          192.5.53.208

          posted @ 2014-05-07 23:57 paulwong 閱讀(391) | 評論 (0)編輯 收藏

          僅列出標題
          共115頁: First 上一頁 53 54 55 56 57 58 59 60 61 下一頁 Last 
          主站蜘蛛池模板: 西贡区| 宝应县| 黄山市| 十堰市| 白玉县| 龙门县| 苍南县| 南充市| 苗栗市| 东丰县| 南江县| 盘山县| 辽阳县| 法库县| 孝义市| 巴林右旗| 奉节县| 南丰县| 宁化县| 车致| 工布江达县| 姜堰市| 河北省| 石泉县| 宁乡县| 岱山县| 志丹县| 平阴县| 沧州市| 嘉祥县| 襄汾县| 崇仁县| 岐山县| 阜新市| 高安市| 南漳县| 上思县| 盐山县| 石家庄市| 浦城县| 三明市|