隨筆-314  評(píng)論-209  文章-0  trackbacks-0

          spark 累加歷史主要用到了窗口函數(shù),而進(jìn)行全部統(tǒng)計(jì),則需要用到rollup函數(shù)

          1  應(yīng)用場(chǎng)景:

            1、我們需要統(tǒng)計(jì)用戶的總使用時(shí)長(zhǎng)(累加歷史)

            2、前臺(tái)展現(xiàn)頁(yè)面需要對(duì)多個(gè)維度進(jìn)行查詢,如:產(chǎn)品、地區(qū)等等

            3、需要展現(xiàn)的表格頭如: 產(chǎn)品、2015-04、2015-05、2015-06

          2 原始數(shù)據(jù):

          復(fù)制代碼
          product_code |event_date |duration |
          -------------|-----------|---------|
          1438         |2016-05-13 |165      |
          1438         |2016-05-14 |595      |
          1438         |2016-05-15 |105      |
          1629         |2016-05-13 |12340    |
          1629         |2016-05-14 |13850    |
          1629         |2016-05-15 |227      |
          復(fù)制代碼

          3 業(yè)務(wù)場(chǎng)景實(shí)現(xiàn)

          3.1 業(yè)務(wù)場(chǎng)景1:累加歷史:

          如數(shù)據(jù)源所示:我們已經(jīng)有當(dāng)天用戶的使用時(shí)長(zhǎng),我們期望在進(jìn)行統(tǒng)計(jì)的時(shí)候,14號(hào)能累加13號(hào)的,15號(hào)能累加14、13號(hào)的,以此類推

          3.1.1 spark-sql實(shí)現(xiàn)

          復(fù)制代碼
          //spark sql 使用窗口函數(shù)累加歷史數(shù)據(jù)
          sqlContext.sql(
          """
            select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration
            from userlogs_date
          """).show
          +-----+----------+------------+                                                 
          |pcode|event_date|sum_duration|
          +-----+----------+------------+
          | 1438|2016-05-13|         165|
          | 1438|2016-05-14|         760|
          | 1438|2016-05-15|         865|
          | 1629|2016-05-13|       12340|
          | 1629|2016-05-14|       26190|
          | 1629|2016-05-15|       26417|
          +-----+----------+------------+
          復(fù)制代碼

          3.1.2 dataframe實(shí)現(xiàn)

           

          復(fù)制代碼
          //使用Column提供的over 函數(shù),傳入窗口操作
          import org.apache.spark.sql.expressions._
          val first_2_now_window = Window.partitionBy("pcode").orderBy("event_date")
          df_userlogs_date.select(
              $"pcode",
              $"event_date",
              sum($"duration").over(first_2_now_window).as("sum_duration")
          ).show
          
          +-----+----------+------------+                                                 
          |pcode|event_date|sum_duration|
          +-----+----------+------------+
          | 1438|2016-05-13|         165|
          | 1438|2016-05-14|         760|
          | 1438|2016-05-15|         865|
          | 1629|2016-05-13|       12340|
          | 1629|2016-05-14|       26190|
          | 1629|2016-05-15|       26417|
          +-----+----------+------------+
          復(fù)制代碼

           3.1.3 擴(kuò)展 累加一段時(shí)間范圍內(nèi)

          實(shí)際業(yè)務(wù)中的累加邏輯遠(yuǎn)比上面復(fù)雜,比如,累加之前N天,累加前N天到后N天等等。以下我們來(lái)實(shí)現(xiàn):

           3.1.3.1 累加歷史所有:

          select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration from userlogs_date
          select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and current row) as sum_duration from userlogs_date
          Window.partitionBy("pcode").orderBy("event_date").rowsBetween(Long.MinValue,0)
          Window.partitionBy("pcode").orderBy("event_date")
          上邊四種寫法完全相等

          3.1.3.2 累加N天之前,假設(shè)N=3
          select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 3 preceding and current row) as sum_duration from userlogs_date
          Window.partitionBy("pcode").orderBy("event_date").rowsBetween(-3,0

            3.1.3.3 累加前N天,后M天: 假設(shè)N=3 M=5 

          select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 3 preceding and 5 following ) as sum_duration from userlogs_date
          Window.partitionBy("pcode").orderBy("event_date").rowsBetween(-3,5)
          3.1.3.4 累加該分區(qū)內(nèi)所有行
          select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and unbounded following ) as sum_duration from userlogs_date
          Window.partitionBy("pcode").orderBy("event_date").rowsBetween(Long.MinValue,Long.MaxValue)

           總結(jié)如下:

          preceding:用于累加前N行(分區(qū)之內(nèi))。若是從分區(qū)第一行頭開(kāi)始,則為 unbounded。 N為:相對(duì)當(dāng)前行向前的偏移量
          following :與preceding相反,累加后N行(分區(qū)之內(nèi))。若是累加到該分區(qū)結(jié)束,則為 unbounded。N為:相對(duì)當(dāng)前行向后的偏移量
          current row:顧名思義,當(dāng)前行,偏移量為0
          說(shuō)明:上邊的前N,后M,以及current row均會(huì)累加該偏移量所在行

          3.1.3.4 實(shí)測(cè)結(jié)果
          累加歷史:分區(qū)內(nèi)當(dāng)天及之前所有 寫法1:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc) as sum_duration from userlogs_date
          
          
          復(fù)制代碼
          +-----+----------+------------+                                                 
          |pcode|event_date|sum_duration|
          +-----+----------+------------+
          | 1438|2016-05-13|         165|
          | 1438|2016-05-14|         760|
          | 1438|2016-05-15|         865|
          | 1629|2016-05-13|       12340|
          | 1629|2016-05-14|       26190|
          | 1629|2016-05-15|       26417|
          +-----+----------+------------+
          復(fù)制代碼
          累加歷史:分區(qū)內(nèi)當(dāng)天及之前所有 寫法2:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and current row) as sum_duration from userlogs_date
          復(fù)制代碼
          +-----+----------+------------+                                                 
          |pcode|event_date|sum_duration|
          +-----+----------+------------+
          | 1438|2016-05-13|         165|
          | 1438|2016-05-14|         760|
          | 1438|2016-05-15|         865|
          | 1629|2016-05-13|       12340|
          | 1629|2016-05-14|       26190|
          | 1629|2016-05-15|       26417|
          +-----+----------+------------+
          復(fù)制代碼
          累加當(dāng)日和昨天:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 1 preceding and current row) as sum_duration from userlogs_date
          復(fù)制代碼
          +-----+----------+------------+                                                 
          |pcode|event_date|sum_duration|
          +-----+----------+------------+
          | 1438|2016-05-13|         165|
          | 1438|2016-05-14|         760|
          | 1438|2016-05-15|         700|
          | 1629|2016-05-13|       12340|
          | 1629|2016-05-14|       26190|
          | 1629|2016-05-15|       14077|
          +-----+----------+------------+
          復(fù)制代碼
          累加當(dāng)日、昨日、明日:select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between 1 preceding and 1 following ) as sum_duration from userlogs_date
          復(fù)制代碼
          +-----+----------+------------+                                                 
          |pcode|event_date|sum_duration|
          +-----+----------+------------+
          | 1438|2016-05-13|         760|
          | 1438|2016-05-14|         865|
          | 1438|2016-05-15|         700|
          | 1629|2016-05-13|       26190|
          | 1629|2016-05-14|       26417|
          | 1629|2016-05-15|       14077|
          +-----+----------+------------+
          復(fù)制代碼
          累加分區(qū)內(nèi)所有:當(dāng)天和之前之后所有select pcode,event_date,sum(duration) over (partition by pcode order by event_date asc rows between unbounded preceding and unbounded following ) as sum_duration from userlogs_date
          復(fù)制代碼
          +-----+----------+------------+                                                 
          |pcode|event_date|sum_duration|
          +-----+----------+------------+
          | 1438|2016-05-13|         865|
          | 1438|2016-05-14|         865|
          | 1438|2016-05-15|         865|
          | 1629|2016-05-13|       26417|
          | 1629|2016-05-14|       26417|
          | 1629|2016-05-15|       26417|
          +-----+----------+------------+
          復(fù)制代碼
          3.2 業(yè)務(wù)場(chǎng)景2:統(tǒng)計(jì)全部

          3.2.1 spark sql實(shí)現(xiàn)

          復(fù)制代碼
          //spark sql 使用rollup添加all統(tǒng)計(jì)
          sqlContext.sql(
          """
            select pcode,event_date,sum(duration) as sum_duration
            from userlogs_date_1
            group by pcode,event_date with rollup
            order by pcode,event_date
          """).show()
          
          +-----+----------+------------+                                                 
          |pcode|event_date|sum_duration|
          +-----+----------+------------+
          | null|      null|       27282|
          | 1438|      null|         865|
          | 1438|2016-05-13|         165|
          | 1438|2016-05-14|         595|
          | 1438|2016-05-15|         105|
          | 1629|      null|       26417|
          | 1629|2016-05-13|       12340|
          | 1629|2016-05-14|       13850|
          | 1629|2016-05-15|         227|
          +-----+----------+------------+
          復(fù)制代碼

          3.2.2 dataframe函數(shù)實(shí)現(xiàn)

          復(fù)制代碼
          //使用dataframe提供的rollup函數(shù),進(jìn)行多維度all統(tǒng)計(jì)
          df_userlogs_date.rollup($"pcode", $"event_date").agg(sum($"duration")).orderBy($"pcode", $"event_date")
          
          +-----+----------+-------------+                                                
          |pcode|event_date|sum(duration)|
          +-----+----------+-------------+
          | null|      null|        27282|
          | 1438|      null|          865|
          | 1438|2016-05-13|          165|
          | 1438|2016-05-14|          595|
          | 1438|2016-05-15|          105|
          | 1629|      null|        26417|
          | 1629|2016-05-13|        12340|
          | 1629|2016-05-14|        13850|
          | 1629|2016-05-15|          227|
          +-----+----------+-------------+
          復(fù)制代碼

            3.3 行轉(zhuǎn)列 ->pivot

           

           

          pivot目前還沒(méi)有sql語(yǔ)法,先用df語(yǔ)法吧
          復(fù)制代碼
          val userlogs_date_all = sqlContext.sql("select dcode, pcode,event_date,sum(duration) as duration from userlogs group by dognum, pcode,event_date ")
          userlogs_date_all.registerTempTable("userlogs_date_all")
          val dates = userlogs_date_all.select($"event_date").map(row => row.getAs[String]("event_date")).distinct().collect().toList
          userlogs_date_all.groupBy($"dcode", $"pcode").pivot("event_date", dates).sum("duration").na.fill(0).show
          
          +-----------------+-----+----------+----------+----------+----------+
          |            dcode|pcode|2016-05-26|2016-05-13|2016-05-14|2016-05-15|
          +-----------------+-----+----------+----------+----------+----------+
          |         F2429186| 1438|         0|         0|       227|         0|
          |        AI2342441| 1438|         0|         0|         0|       345|
          |       A320018711| 1438|         0|       939|         0|         0|
          |         H2635817| 1438|         0|       522|         0|         0|
          |         D0288196| 1438|         0|       101|         0|         0|
          |         Y0242218| 1438|         0|      1036|         0|         0|
          |         H2392574| 1438|         0|         0|       689|         0|
          |         D2245588| 1438|         0|         0|         1|         0|
          |         Y2514906| 1438|         0|         0|       118|         4|
          |         H2540419| 1438|         0|       465|       242|         5|
          |         R2231926| 1438|         0|         0|       305|         0|
          |         H2684591| 1438|         0|       136|         0|         0|
          |         A2548470| 1438|         0|       412|         0|         0|
          |         GH000309| 1438|         0|         0|         0|         4|
          |         H2293216| 1438|         0|         0|         0|       534|
          |         R2170601| 1438|         0|         0|         0|         0|
          |B2365238;B2559538| 1438|         0|         0|         0|         0|
          |         BQ005465| 1438|         0|         0|       642|        78|
          |        AH2180324| 1438|         0|       608|       146|        36|
          |         H0279306| 1438|         0|       490|         0|         0|
          +-----------------+-----+----------+----------+----------+----------+
          復(fù)制代碼

           

           

          附錄

          下面是這兩個(gè)函數(shù)的官方api說(shuō)明:

          org.apache.spark.sql.scala
          復(fù)制代碼
          1
          def rollup(col1: String, cols: String*): GroupedData
          Create a multi-dimensional rollup for the current DataFrame using the specified columns, so we can run aggregation on them. See GroupedData for all the available aggregate functions.
          This is a variant of rollup that can only group by existing columns using column names (i.e. cannot construct expressions).
          
          // Compute the average for all numeric columns rolluped by department and group.
          df.rollup("department", "group").avg()
          
          // Compute the max age and average salary, rolluped by department and gender.
          df.rollup($"department", $"gender").agg(Map(
            "salary" -> "avg",
            "age" -> "max"
          ))
          復(fù)制代碼
          復(fù)制代碼
          def rollup(cols: Column*): GroupedData
          Create a multi-dimensional rollup for the current DataFrame using the specified columns, so we can run aggregation on them. See GroupedData for all the available aggregate functions.
          df.rollup($"department", $"group").avg()
          
          // Compute the max age and average salary, rolluped by department and gender.
          df.rollup($"department", $"gender").agg(Map(
            "salary" -> "avg",
            "age" -> "max"
          ))
          復(fù)制代碼
          org.apache.spark.sql.Column.scala
          復(fù)制代碼
          def over(window: WindowSpec): Column
          Define a windowing column.
          
          val w = Window.partitionBy("name").orderBy("id")
          df.select(
            sum("price").over(w.rangeBetween(Long.MinValue, 2)),
            avg("price").over(w.rowsBetween(0, 4))
          )
          復(fù)制代碼
          posted on 2017-10-23 22:05 xzc 閱讀(884) 評(píng)論(0)  編輯  收藏 所屬分類: hadoop
          主站蜘蛛池模板: 聊城市| 克什克腾旗| 余干县| 视频| 涿州市| 兴安县| 石首市| 富川| 土默特左旗| 铜川市| 静安区| 万山特区| 朝阳县| 苏尼特左旗| 东源县| 万宁市| 肇庆市| 普宁市| 武陟县| 永城市| 永嘉县| 四子王旗| 翁牛特旗| 句容市| 陇西县| 栾川县| 乌海市| 广东省| 绥棱县| 邵武市| 措勤县| 葫芦岛市| 定安县| 工布江达县| 蕉岭县| 隆回县| 托克逊县| 乌拉特中旗| 彭泽县| 禹城市| 高青县|