欢迎光临散文网 会员登陆 & 注册

电商数据处理_指标计算

2023-03-05 16:45 作者:_邪月大魔王  | 我要投稿

package matchs.praction.commerce

import org.apache.spark.sql.{SaveMode, SparkSession}

object IndexCompute {
 def main(args: Array[String]): Unit = {

   val spark = SparkSession.builder()
     .master("local[*]").appName("Compute")
     .enableHiveSupport().getOrCreate()

   /*

   推荐使用网页端观看,移动端灾难现场

   注:与订单金额计算相关使用order_money字段,同一个订单无需多次重复计算,需要考虑退款或者取消的订单
   第一题:
  1、 根据dwd或者dws层表统计每人每天下单的数量和下单的总金额,
     存入dws层(需自建)的user_consumption_day_aggr表中(表结构如下),
     然后使用hive cli按照客户主键、订单总金额均为降序排序,查询出前5条;
       字段 类型 中文含义 备注
       customer_id  int  客户主键 customer_id
       customer_name  string 客户名称 customer_name
       total_amount double 订单总金额  当天订单总金额
       total_count  int  订单总数 当天订单总数
       year int  年  订单产生的年,为动态分区字段
       month  int  月  订单产生的月,为动态分区字段
       day  int  日  订单产生的日,为动态分区字段
    */

   spark.sql(
     """
       |INSERT INTO TABLE n_dws.user_consumption_day_aggr
       |SELECT
       |fact.customer_id,
       |fact.customer_name,
       |SUM(fact.order_money) AS total_amount,
       |COUNT(1) AS total_count,
       |year,
       |month,
       |day
       |FROM(
       |     SELECT
       |     DISTINCT
       |     CAST(fact.customer_id AS INT),
       |     dim.customer_name,
       |     fact.order_money,
       |     CAST(SUBSTRING(fact.etl_date, 1, 4) AS INT) AS year,
       |     CAST(SUBSTRING(fact.etl_date, 5, 2) AS INT) AS month,
       |     CAST(SUBSTRING(fact.etl_date, 7, 2) AS INT) AS day
       |     FROM n_dwd.fact_order_master fact JOIN n_dwd.dim_customer_inf dim
       |     ON fact.order_status = '已下单'
       |     AND fact.customer_id = dim.customer_id
       |     AND fact.order_sn NOT IN(
       |                                  SELECT order_sn
       |                                  FROM n_dwd.fact_order_master
       |                                  WHERE order_status = '已退款'
       |                                  )
       |     )fact
       |GROUP BY fact.customer_id, fact.customer_name,
       |fact.year, fact.month, fact.day
       |""".stripMargin)

   spark.sql("SELECT * FROM n_dws.user_consumption_day_aggr ORDER BY customer_id DESC, total_amount DESC LIMIT 5").show()

   /*
   +-----------+-------------+------------+-----------+----+-----+---+
   |customer_id|customer_name|total_amount|total_count|year|month|day|
   +-----------+-------------+------------+-----------+----+-----+---+
   |      19999|       吉秀芳|     2967.75|          1|2022|    4| 15|
   |      19999|       吉秀芳|      775.75|          1|2022|    5|  7|
   |      19998|       赵丹丹|     7215.19|          1|2022|    4|  6|
   |      19998|       赵丹丹|     3303.48|          1|2022|    4|  2|
   |      19997|         解娜|    11474.22|          1|2022|    3| 30|
   +-----------+-------------+------------+-----------+----+-----+---+
    */


   /*
   第二题:
   2、 根据dwd或者dws层表统计每个城市每月下单的数量和下单的总金额(以order_master中的地址为判断依据),
      并按照province_name,year,month进行分组,按照total_amount逆序排序,形成sequence值,
      将计算结果存入Hive的dws数据库city_consumption_day_aggr表中(表结构如下),
      然后使用hive cli根据订单总数、订单总金额均为降序排序,查询出前5条,
      在查询时对于订单总金额字段将其转为bigint类型(避免用科学计数法展示);
         字段 类型 中文含义 备注
         city_name  string 城市名称
         province_name  string 省份名称
         total_amount double 订单总金额  当月订单总金额
         total_count  int  订单总数 当月订单总数
         sequence int  次序 即当月中该城市消费额在该省中的排名
         year int  年  订单产生的年,为动态分区字段
         month  int  月  订单产生的月,为动态分区字段
    */

   spark.sql(
     """
       |INSERT INTO TABLE n_dws.city_consumption_day_aggr
       |SELECT
       |city AS city_name,
       |province AS province_name,
       |CAST(SUM(order_money) AS BIGINT) AS total_amount,
       |CAST(COUNT(order_money) AS INT) AS total_count,
       |CAST(ROW_NUMBER() OVER(ORDER BY SUM(order_money) DESC) AS INT) AS sequence,
       |CAST(SUBSTRING(etl_date, 1, 4) AS INT) AS `year`,
       |CAST(SUBSTRING(etl_date, 5, 2) AS INT) AS `month`
       |FROM n_dwd.fact_order_master
       |WHERE order_status = '已下单'
       |AND order_sn NOT IN (
       |                     SELECT order_sn
       |                     FROM n_dwd.fact_order_master
       |                     WHERE order_status = '已退款'
       |                     )
       |GROUP BY city, province,
       |SUBSTRING(etl_date, 1, 4),
       |SUBSTRING(etl_date, 5, 2)
       |""".stripMargin)

   // 优化

/*    spark.sql(
     """
       |SELECT city, province, SUM(order_money) sum_order, COUNT(1) cnt,
       |CAST(SUBSTRING(etl_date, 1, 4) AS INT) AS `year`,
       |CAST(SUBSTRING(etl_date, 5, 2) AS INT) AS `month`
       |FROM n_dwd.fact_order_master
       |WHERE order_status = '已退款'
       |GROUP BY city, province,
       |CAST(SUBSTRING(etl_date, 1, 4) AS INT),
       |CAST(SUBSTRING(etl_date, 5, 2) AS INT)
       |""".stripMargin).createTempView("de_order")

   spark.sql(
     """
       |SELECT city, province, SUM(order_money) sum_order, COUNT(1) cnt,
       |CAST(SUBSTRING(etl_date, 1, 4) AS INT) AS `year`,
       |CAST(SUBSTRING(etl_date, 5, 2) AS INT) AS `month`
       |FROM n_dwd.fact_order_master
       |WHERE order_status = '已下单'
       |GROUP BY city, province,
       |CAST(SUBSTRING(etl_date, 1, 4) AS INT),
       |CAST(SUBSTRING(etl_date, 5, 2) AS INT)
       |""".stripMargin).createTempView("or_order")*/

   spark.sql(
     """
       |SELECT
       |or_order.city AS city_name,
       |or_order.province AS province_name,
       |CAST((or_order.sum_order - de_order.sum_order) AS BIGINT) AS total_amount,
       |CAST((or_order.cnt - de_order.cnt) AS INT) AS total_count,
       |ROW_NUMBER() OVER(ORDER BY (or_order.sum_order - de_order.sum_order) DESC) AS sequence,
       |or_order.`year`,
       |or_order.`month`
       |FROM (
       |      SELECT city, province, SUM(order_money) sum_order, COUNT(1) cnt,
       |      CAST(SUBSTRING(etl_date, 1, 4) AS INT) AS `year`,
       |      CAST(SUBSTRING(etl_date, 5, 2) AS INT) AS `month`
       |      FROM n_dwd.fact_order_master
       |      WHERE order_status = '已下单'
       |      GROUP BY city, province,
       |      CAST(SUBSTRING(etl_date, 1, 4) AS INT),
       |      CAST(SUBSTRING(etl_date, 5, 2) AS INT)
       |      ) AS or_order
       |JOIN (
       |       SELECT city, province, SUM(order_money) sum_order, COUNT(1) cnt,
       |       CAST(SUBSTRING(etl_date, 1, 4) AS INT) AS `year`,
       |       CAST(SUBSTRING(etl_date, 5, 2) AS INT) AS `month`
       |       FROM n_dwd.fact_order_master
       |       WHERE order_status = '已退款'
       |       GROUP BY city, province,
       |       CAST(SUBSTRING(etl_date, 1, 4) AS INT),
       |       CAST(SUBSTRING(etl_date, 5, 2) AS INT)
       |       ) AS de_order
       |ON de_order.city = or_order.city
       |AND de_order.province = or_order.province
       |AND de_order.`year` = or_order.`year`
       |AND de_order.`month` = or_order.`month`
       |""".stripMargin)


   spark.sql("SELECT * FROM n_dws.city_consumption_day_aggr ORDER BY total_count DESC, total_amount DESC LIMIT 5").show()

   /*
   +------------+-------------+------------+-----------+--------+----+-----+
   |   city_name|province_name|total_amount|total_count|sequence|year|month|
   +------------+-------------+------------+-----------+--------+----+-----+
   |      上海市|       上海市|   110372845|      25762|       1|2022|    4|
   |      上海市|       上海市|    33649186|       7861|       2|2022|    3|
   |      上海市|       上海市|    28371160|       6739|       3|2022|    5|
   | 浙江省杭州市|       浙江省|    12745388|       3031|       4|2022|    4|
   | 江苏省南京市|       江苏省|    11586943|       2702|       5|2022|    4|
   +------------+-------------+------------+-----------+--------+----+-----+
    */


   /*
   第三题:
   3、 请根据dwd或者dws层表计算出每个城市每个月平均订单金额和该城市所在省份平均订单金额相比较结果(“高/低/相同”),
   存入ClickHouse数据库shtd_result的cityavgcmpprovince表中(表结构如下),
   然后在Linux的ClickHouse命令行中根据城市平均订单金额、省份平均订单金额均为降序排序,查询出前5条;
     字段 类型 中文含义 备注
     cityname text 城市份名称
     cityavgconsumption double 该城市平均订单金额
     provincename text 省份名称
     provinceavgconsumption double 该省平均订单金额
     comparison text 比较结果 城市平均订单金额和该省平均订单金额比较结果,值为:高/低/相同
    */

   spark.sql(
     """
       |SELECT
       |city,
       |province,
       |AVG(order_money) AS cityavgconsumption
       |FROM n_dwd.fact_order_master
       |WHERE order_status = '已下单'
       |AND order_sn NOT IN (
       |                   SELECT order_sn
       |                   FROM n_dwd.fact_order_master
       |                   WHERE order_status = '已退款'
       |                    )
       |GROUP BY province, city
       |""".stripMargin).createTempView("city_avg")

   spark.sql(
     """
       |SELECT province, AVG(order_money) AS provinceavgconsumption
       |FROM n_dwd.fact_order_master
       |WHERE order_status = '已下单'
       |AND order_sn NOT IN (
       |                     SELECT order_sn
       |                     FROM n_dwd.fact_order_master
       |                     WHERE order_status = '已退款'
       |                     )
       |GROUP BY province
       |""".stripMargin).createTempView("province_avg")

   val no_3frame = spark.sql(
     """
       |SELECT city_avg.city AS cityname,
       |ROUND(city_avg.cityavgconsumption, 2) AS cityavgconsumption,
       |province_avg.province AS provincename,
       |ROUND(province_avg.provinceavgconsumption, 2) AS provinceavgconsumption,
       |IF(
       |   ROUND(city_avg.cityavgconsumption, 2)
       |    >
       |   ROUND(province_avg.provinceavgconsumption, 2),
       |   '高', IF(
       |           ROUND(city_avg.cityavgconsumption, 2)
       |           <
       |           ROUND(province_avg.provinceavgconsumption, 2),
       |            '低', '相同'
       |            )
       |   ) AS comparison
       |FROM province_avg LEFT JOIN city_avg
       |ON province_avg.province = city_avg.province
       |""".stripMargin)

   // IF(boolean, a, IF(boolean, b, c)) 三元表达式

   no_3frame
     .write.mode(SaveMode.Append)
     .format("jdbc").option("url","jdbc:clickhouse://127.0.0.1:8123")
     .option("driver", "com.clickhouse.jdbc.ClickHouseDriver")
     .option("dbtable", "shtd_result.cityavgcmpprovince").save()

   // bigdata1 :) SELECT * FROM shtd_result.cityavgcmpprovince ORDER BY cityavgconsumption DESC, provinceavgconsumption DESC LIMIT 5;

   /*
   ┌─cityname─┬─cityavgconsumption─┬─provincename─┬─provinceavgconsumption─┬─comparison─┐
   │ 江苏省淮安市 │             4580.81 │          江苏省│                4228.88 │             高│
   │ 浙江省嘉兴市 │             4580.79 │          浙江省│                 4229.4 │             高│
   │ 浙江省慈溪市 │             4567.32 │          浙江省│                 4229.4 │             高│
   │ 江苏省海门市 │             4488.08 │          江苏省│                4228.88 │             高│
   │ 浙江省海宁市 │             4463.39 │          浙江省│                 4229.4 │             高│
   └───────┴─────────────┴─────────┴───────────────┴──────────┘
    */


 /*
   第四题:
    4、请根据dwd或者dws层表计算出每个城市每个月平均订单金额和该城市所在省份订单金额中位数相比较结果(“高/低/相同”),
        存入ClickHouse数据库shtd_result的citymidcmpprovince表中(表结构如下),
        然后在Linux的ClickHouse命令行中根据城市平均订单金额、省份平均订单金额均为降序排序,查询出前5条;
          字段  类型 中文含义 备注
          cityname  text 城市份名称
          citymidconsumption  double 该城市订单金额中位数
          provincename  text 省份名称
          provincemidconsumption  double 该省订单金额中位数
          comparison  text 比较结果 城市订单金额中位数和该省订单金额中位数比较结果,值为:高/低/相同

      注:此题题目与表结构不一致,如按照题目求出每个城市每个月的平均订单金额与城市所在省份订单金额中位数比较结果,
         则缺少月份字段,结果毫无意义。这里给出两种解法,第一种为:求出每个城市中位数金额与城市所在省份订单金额中
         位数比较结果,第二种为:创建新的表格,包括进月份及年份,求出每个城市每个月的平均订单金额与城市所在省份
         订单金额中位数比较结果
   */

   // 第一种写法

   spark.sql(
     """
       |SELECT province, city, ROUND(AVG(order_money), 2) AS order_money
       |FROM (
       |       SELECT province, city, order_money,
       |       ROW_NUMBER() OVER(PARTITION BY province, city ORDER BY order_money) cnt,
       |       COUNT(1) OVER(PARTITION BY province, city) cp_cnt
       |       FROM n_dwd.fact_order_master
       |       WHERE order_status = '已下单'
       |       AND order_sn NOT IN (
       |                             SELECT order_sn
       |                             FROM n_dwd.fact_order_master
       |                             WHERE order_status = '已退款'
       |                             )
       |       )tmp_b
       |WHERE cnt IN(FLOOR((cp_cnt+1)/2), FLOOR(cp_cnt+2)/2)
       |GROUP BY province, city
       |""".stripMargin).createTempView("city_median")

   /*
   ROUND(AVG(order_money), 2)
   AVG在此次用处为:
           中位数在取时,如果有偶数个订单,会出现两个中位数,此时
           需合并后相除取平均数(需省份和地区作分区字段)
   ROUND(a, b) 为四舍五入函数,a为需要四舍五入的数字,b为保留后几位小数点 例:ROUND(11.987, 2) => 11.99
   FLOOR(a) 为向下取整函数 例:FLOOR(11.9) => 11
   */

   spark.sql(
     """
       |SELECT province, ROUND(AVG(order_money), 2) AS order_money
       |FROM (
       |       SELECT province, order_money,
       |       ROW_NUMBER() OVER(PARTITION BY province ORDER BY order_money) cnt,
       |       COUNT(1) OVER(PARTITION BY province) cp_cnt
       |       FROM n_dwd.fact_order_master
       |       WHERE order_status = '已下单'
       |       AND order_sn NOT IN (
       |                            SELECT order_sn
       |                            FROM n_dwd.fact_order_master
       |                            WHERE order_status = '已退款'
       |                            )
       |       )tmp_a
       |WHERE cnt IN(FLOOR((cp_cnt+1)/2), FLOOR(cp_cnt+2)/2)
       |GROUP BY province
       |""".stripMargin).createTempView("province_median")

   val no_4frame = spark.sql(
     """
       |SELECT
       |city_median.city AS cityname,
       |city_median.order_money AS citymidconsumption,
       |province_median.province AS provincename,
       |province_median.order_money AS provincemidconsumption,
       |IF(
       |   city_median.order_money > province_median.order_money,
       |   '高', IF(city_median.order_money < province_median.order_money,'低', '相同')
       |    ) AS comparison
       |FROM city_median LEFT JOIN province_median
       |ON city_median.province = province_median.province
       |""".stripMargin)

   no_4frame
     .write.mode(SaveMode.Append)
     .format("jdbc").option("url", "jdbc:clickhouse://127.0.0.1:8123")
     .option("diver","com.clickhouse.jdbc.ClickHouseDriver")
     .option("dbtable", "shtd_result.citymidcmpprovince").save()

   // bigdata1 :) SELECT * FROM citymidcmpprovince ORDER BY citymidconsumption DESC, provincemidconsumption DESC LIMIT 5;

   /*
   ┌─cityname─────┬─citymidconsumption─┬─provincename─┬─provincemidconsumption─┬─comparison─┐
   │       浙江省上虞市 │            4384.39 │        浙江省   │                  3925.6 │          高  │
   │       江苏省泰州市 │            4216.66 │        江苏省   │                  3931.4 │          高  │
   │       江苏省淮安市 │            4163.43 │        江苏省   │                  3931.4 │          高  │
   │       江苏省扬州市 │            4152.15 │        江苏省   │                  3931.4 │          高  │
   │       江苏省吴江市 │            4145.63 │        江苏省   │                  3931.4 │          高  │
   └───────────┴────────────┴──────────┴────────────────┴────────┘
    */


   // 第二种写法
   spark.sql(
     """
       |SELECT province, city,
       |ROUND(AVG(order_money), 2) AS order_money,
       |SUBSTRING(etl_date, 1, 4) AS `year`,
       |SUBSTRING(etl_date, 5, 2) AS `month`
       |FROM n_dwd.fact_order_master
       |WHERE order_status = '已下单'
       |AND order_sn NOT IN (
       |                     SELECT order_sn
       |                     FROM n_dwd.fact_order_master
       |                     WHERE order_status = '已退款'
       |                     )
       |GROUP BY province, city,
       |SUBSTRING(etl_date, 1, 4),
       |SUBSTRING(etl_date, 5, 2)
       |""".stripMargin).createTempView("avg_month")

   // 此处可不创建临时视图,直接使用第二题的答案表(dws.city_consumption_day_aggr),这里为了保证每一题的独立性,不使用

   spark.sql(
     """
       |SELECT province,
       |ROUND(AVG(order_money), 2) AS order_money
       |FROM (
       |SELECT province, order_money,
       |ROW_NUMBER() OVER(PARTITION BY province ORDER BY province) cnt,
       |COUNT(1) OVER(PARTITION BY province) cp_cnt
       |FROM n_dwd.fact_order_master
       |WHERE order_status = '已下单'
       |AND order_sn NOT IN (
       |                     SELECT order_sn
       |                     FROM n_dwd.fact_order_master
       |                     WHERE order_status = '已退款'
       |                      )
       |       ) tmp_a
       |WHERE cnt IN(FLOOR((cp_cnt+1)/2), FLOOR(cp_cnt+2)/2)
       |GROUP BY province
       |""".stripMargin).createTempView("province_medians")

   spark.sql(
     """
       |SELECT
       |avg_month.city AS cityname,
       |avg_month.order_money AS citymidconsumption,
       |province_medians.province AS provincename,
       |province_medians.order_money AS provincemidconsumption,
       |IF(
       |   avg_month.order_money
       |   >
       |   province_medians.order_money,
       |   '高', IF(
       |             avg_month.order_money
       |             <
       |             province_medians.order_money,
       |             '低', '相同'
       |           )
       |   ) AS comparison,
       |avg_month.`year`,
       |avg_month.`month`
       |FROM province_medians LEFT JOIN avg_month
       |ON province_medians.province = avg_month.province
       |ORDER BY citymidconsumption DESC, provincemidconsumption DESC
       |""".stripMargin).show(5)

   /*
       +------------+------------------+------------+----------------------+----------+----+-----+
       |    cityname|citymidconsumption|provincename|provincemidconsumption|comparison|year|month|
       +------------+------------------+------------+----------------------+----------+----+-----+
       |浙江省上虞市|            8928.6|      浙江省|                   5495.2|         高|2022|   06|
       |江苏省姜堰市|           7965.96|      江苏省|                  8221.04|         低|2022|   07|
       |江苏省吴江市|           6556.79|      江苏省|                  8221.04|         低|2022|   08|
       |浙江省上虞市|           6184.59|      浙江省|                   5495.2|         高|2022|   07|
       |浙江省绍兴市|           6163.55|      浙江省|                   5495.2|         高|2022|   07|
       +------------+------------------+------------+----------------------+----------+----+-----+
    */


   /*
   第五题:
   5、 请根据dwd或者dws层表来计算每个省份2022年订单金额前3省份,
   依次存入ClickHouse数据库shtd_result的regiontopthree表中(表结构如下),
   然后在Linux的ClickHouse命令行中根据省份升序排序,查询出前5条;
     字段 类型 中文含义 备注
     provincename text 省份名称
     citynames  text 城市名称 用,分割显示前三城市的name
     cityamount text 省份名称 用,分割显示前三城市的订单金额(需要去除小数部分,使用四舍五入)
    */

   val no_5frame = spark.sql(
     """
       |SELECT province provincename,
       |CONCAT_WS(',', COLLECT_SET(city)) AS citynames,
       |CONCAT_WS(',', COLLECT_SET(order_money)) AS cityamount
       |FROM (
       |       SELECT province, city, FLOOR(SUM(order_money)) order_money,
       |       ROW_NUMBER() OVER(PARTITION BY province ORDER BY SUM(order_money)) cnt
       |       FROM n_dwd.fact_order_master
       |       WHERE order_status = '已下单'
       |       AND order_sn NOT IN (
       |                             SELECT order_sn
       |                             FROM n_dwd.fact_order_master
       |                             WHERE order_status = '已退款'
       |                             )
       |       GROUP BY province, city
       |       )
       |WHERE cnt <= 3
       |GROUP BY province
       |ORDER BY provincename ASC
       |""".stripMargin)

   no_5frame
     .write.mode(SaveMode.Append)
     .format("jdbc").option("url", "jdbc:clickhouse://127.0.0.1:8123")
     .option("diver", "com.clickhouse.jdbc.ClickHouseDriver")
     .option("dbtable", "shtd_result.regiontopthree").save()

   // COLLECT_SET(a)将分组中的a列转为一个数组返回
   // CONCAT_WS(',', str) 将多个字符串连接成一个字符串,可以一次性指定分隔符


   // bigdata1 :) SELECT * FROM regiontopthree ORDER BY provincename ASC LIMIT 5;

   /*
   ┌─provincename─┬────────citynames───────────────┬─cityamount───────┐
   │ 上海市         │ 上海市                                        │ 183764909            │
   │ 广东省         │ 广东省河源市                                   │ 1142416              │
   │ 江苏省         │ 江苏省江阴市,江苏省镇江市,江苏省溧阳市             │ 859475,822571,936577 │
   │ 浙江省         │ 浙江省奉化市,浙江省上虞市,浙江省金华市             │ 916449,824200,907457 │
   │ 贵州省         │ 贵州省贵阳市                                  │ 4226817              │
   └─────────┴────────────────────────────┴──────────────┘
    */


   /*
   第六题:
   6、请根据dwd或者dws层的相关表,计算销售量前10的商品,
   销售额前10的商品,存入ClickHouse数据库shtd_result的topten表中(表结构如下),
   然后在Linux的ClickHouse命令行中根据排名升序排序,查询出前5条;
     字段 类型 中文含义 备注
     topquantityid  int  商品id 销售量前10的商品
     topquantityname  text 商品名称 销售量前10的商品
     topquantity  int  该商品销售量 销售量前10的商品
     toppriceid text 商品id 销售额前10的商品
     toppricename text 商品名称 销售额前10的商品
     topprice decimal  该商品销售额 销售额前10的商品
     sequence int  排名 所属排名
    */

   spark.sql(
     """
       |SELECT
       |product_id AS topquantityid,
       |product_name AS topquantityname,
       |SUM(product_cnt) AS topquantity,
       |ROW_NUMBER() OVER(ORDER BY SUM(product_cnt) DESC) AS sequence
       |FROM n_dwd.fact_order_detail
       |WHERE order_sn NOT IN (
       |                       SELECT order_sn
       |                       FROM n_dwd.fact_order_master
       |                       WHERE order_status = '已退款'
       |                        )
       |GROUP BY product_id, product_name
       |""".stripMargin).createTempView("sell_count")

   spark.sql(
     """
       |SELECT
       |product_id AS toppriceid,
       |product_name AS toppricename,
       |ROUND(product_price * SUM(product_cnt), 2) AS topprice,
       |ROW_NUMBER() OVER(ORDER BY product_price * SUM(product_cnt) DESC) AS sequence
       |FROM n_dwd.fact_order_detail
       |WHERE order_sn NOT IN (
       |                       SELECT order_sn
       |                       FROM n_dwd.fact_order_master
       |                       WHERE order_status = '已退款'
       |                       )
       |GROUP BY product_id, product_name, product_price
       |""".stripMargin).createTempView("sell_amount")


   val no_6frame = spark.sql(
     """
       |SELECT
       |topquantityid,
       |topquantityname,
       |topquantity,
       |toppriceid,
       |toppricename,
       |topprice,
       |sell_count.sequence
       |FROM sell_count JOIN sell_amount
       |ON sell_count.sequence <= 10
       |AND sell_amount.sequence <= 10
       |AND sell_count.sequence = sell_amount.sequence
       |""".stripMargin)

   no_6frame
     .write.mode(SaveMode.Append)
     .format("jdbc").option("url", "jdbc:clickhouse://127.0.0.1:8123")
     .option("diver", "com.clickhouse.jdbc.ClickHouseDiver")
     .option("dbtable", "shtd_result.topten").save()

   // bigdata1 :) SELECT * FROM topten ORDER BY sequence LIMIT 5;

   /*
   ┌─topquantityid─┬─topquantityname───────────┬─topquantity┬─toppriceid┬─toppricename──────────────┬─topprice─┬─sequence─┐
   │           599 │ 正点原子蓝牙4.2串口透传模块 ATK-       │         89 │ 599        │ 正点原子蓝牙4.2串口透传模块 ATK-         │ 77917.72 │         1 │
   │          7122 │ 夏新无线蓝牙耳机5.0单双耳一对迷你隐形    │         76 │ 11976      │ 爆款i12马卡龙多彩磨砂金属电镀蓝牙耳机5.0  │ 66334.68 │          2 │
   │           322 │ 华为Nova双耳真无线蓝牙耳机Nova5       │         74 │ 7122       │ 夏新无线蓝牙耳机5.0单双耳一对迷你隐形      │  64383.4 │         3 │
   │          7009 │ 不入耳式蓝牙耳机双耳骨传导无线运动跑步概 │         74 │ 9193       │ 傲古声 蓝牙耳机双耳入耳式5.0苹果安卓       │ 63572.52 │         4 │
   │          9344 │ 大电量无线蓝牙耳机超长续航双耳苹果华为真 │         71 │ 9344       │ 大电量无线蓝牙耳机超长续航双耳苹果华为真     │ 63331.29 │         5 │
   └─────────┴─────────────────────┴────────┴───────┴────────────────────────┴──────┴───────┘
    */

   /*
   第七题:
   7、请根据dwd或者dws层的数据,请计算连续两天下单的用户与已下单用户的占比,
       将结果存入ClickHouse数据库shtd_result的userrepurchasedrate表中(表结构如下),
       然后在Linux的ClickHouse命令行中查询结果数据;
          字段  类型 中文含义 备注
          purchaseduser int  下单人数 已下单人数
          repurchaseduser int  连续下单人数 连续两天下单的人数
          repurchaserate  text 百占比  连续两天下单人数/已下单人数百分比(保留1位小数,四舍五入,不足的补0)例如21.1%,或者32.0%
    */

   spark.sql(
     """
       |SELECT COUNT(1) order_num
       |FROM (
       |      SELECT customer_id, COUNT(1) days_count
       |      FROM (
       |            SELECT DISTINCT customer_id, etl_date,
       |            DENSE_RANK() OVER(PARTITION BY customer_id ORDER BY etl_date) date
       |            FROM n_dwd.fact_order_master
       |            WHERE order_status = '已下单'
       |            AND order_sn NOT IN (
       |                                 SELECT order_sn
       |                                 FROM n_dwd.fact_order_master
       |                                 WHERE order_status = '已退款'
       |                                 )
       |             ) tmp_a
       |        GROUP BY customer_id, etl_date - date
       |        HAVING days_count > 1
       |        ORDER BY customer_id
       |       ) tmp_b
       |""".stripMargin).createTempView("continuous_order")

   val no_7frame = spark.sql(
     """
       |SELECT total_num AS purchaseduser,
       |order_num AS repurchaseduser,
       |CONCAT(CAST(ROUND((order_num/total_num)*100, 1) AS STRING), '%') AS repurchaserate
       |FROM (
       |      SELECT COUNT(1) total_num
       |      FROM n_dwd.fact_order_master
       |      WHERE order_status = '已下单'
       |      AND order_sn NOT IN (
       |                            SELECT order_sn
       |                            FROM n_dwd.fact_order_master
       |                            WHERE order_status = '已退款'
       |                            )
       |      ) tmp_a JOIN continuous_order
       |""".stripMargin)

   no_7frame
     .write.mode(SaveMode.Append)
     .format("jdbc").option("url", "jdbc:clickhouse://127.0.0.1:8123")
     .option("diver", "com.clickhouse.jdbc.ClickHouseDiver")
     .option("dbtable", "shtd_result.userrepurchasedrate").save()

   // bigdata1 :) SELECT * FROM userrepurchasedrate;

   /*
   ┌─purchaseduser─┬─repurchaseduser─┬─repurchaserate─┐
   │         73296 │             3936 │ 5.4%              │
   └─────────┴───────────┴────────────┘
    */


   /*
   第八题:
   8、根据dwd或者dws层的数据,请计算每个省份累计订单量,然后根据每个省份订单量从高到低排列,
   将结果打印到控制台(使用spark中的show算子,同时需要显示列名);
       例如:可以考虑首先生成类似的临时表A:
       province_name  Amount(订单量)
       A省 10122
       B省 301
       C省 2333333

       然后生成结果类似如下:其中C省销量最高,排在第一列,A省次之,以此类推。
       C省 A省 B省
       23333331 10122  301
    */

   // 第一种方法

   spark.sql(
     """
       |SELECT
       |SUM(IF(province = '上海市', amount, 0)) AS Shanghai,
       |SUM(IF(province = '江苏省', amount, 0)) AS Jiangsu,
       |SUM(IF(province = '浙江省', amount, 0)) AS Zhejiang,
       |SUM(IF(province = '贵州省', amount, 0)) AS Guizhou,
       |SUM(IF(province = '广东省', amount, 0)) AS Guangdong
       |FROM (
       |       SELECT province, COUNT(order_money) amount
       |       FROM n_dwd.fact_order_master
       |       WHERE order_status = '已下单'
       |       AND order_sn NOT IN(
       |                           SELECT order_sn
       |                           FROM n_dwd.fact_order_master
       |                           WHERE order_status = '已退款'
       |                           )
       |       GROUP BY province
       |       )
       |""".stripMargin).show()

   /*
   +--------+-------+--------+-------+---------+
   |Shanghai|Jiangsu|Zhejiang|Guizhou|Guangdong|
   +--------+-------+--------+-------+---------+
   |   42983|  17353|   11736|    965|      259|
   +--------+-------+--------+-------+---------+
    */

   // 第二种方法
   spark.sql(
     """
       |SELECT
       |SUM(IF(province = '上海市', 1, 0)) AS Shanghai,
       |SUM(IF(province = '江苏省', 1, 0)) AS Jiangsu,
       |SUM(IF(province = '浙江省', 1, 0)) AS Zhejiang,
       |SUM(IF(province = '贵州省', 1, 0)) AS Guizhou,
       |SUM(IF(province = '广东省', 1, 0)) AS Guangdong
       |FROM n_dwd.fact_order_master
       |WHERE order_status = '已下单'
       |AND order_sn NOT IN (
       |                      SELECT order_sn
       |                      FROM n_dwd.fact_order_master
       |                      WHERE order_status = '已退款'
       |                      )
       |""".stripMargin).show()

   /*
   +--------+-------+--------+-------+---------+
   |Shanghai|Jiangsu|Zhejiang|Guizhou|Guangdong|
   +--------+-------+--------+-------+---------+
   |   42983|  17353|   11736|    965|      259|
   +--------+-------+--------+-------+---------+
   */


   /*
   第九题:
   9、 根据dwd或者dws层的相关表,请计算2022年4月26日凌晨0点0分0秒到早上9点59分59秒为止,
   该时间段每小时的新增订单金额与当天订单总金额累加值,存入ClickHouse数据库shtd_result的
   accumulateconsumption表中,然后在Linux的ClickHouse命令行中根据订单时间段升序排序,查询出前5条;
       假如数据为:
       用户 订单时间 订单金额
       张三1号 2020-04-26 00:00:10  10
       李四1号 2020-04-26 00:20:10  5
       李四2号 2020-04-26 01:21:10  10
       王五1号 2020-04-26 03:20:10  50

       计算结果则为:
       订单时间段  订单新增金额 累加总金额
       2020-04-26 00  15 15
       2020-04-26 01  10 25
       2020-04-26 02  0  25
       2020-04-26 03  50 75

       accumulateconsumption表结构如下:
       字段 类型 中文含义 备注
       consumptiontime  varchar  消费时间段
       consumptionadd double 订单新增金额
       consumptionacc double 累加总金额
    */

   val no_9frame = spark.sql(
     """
       |SELECT CONCAT('2022-04-26 0', hour_time) AS consumptiontime,
       |ROUND(hour_money, 2) AS consumptionadd,
       |CAST(SUM(hour_money) OVER(ORDER BY hour_time) AS BIGINT) AS consumptionacc
       |FROM(
       |      SELECT HOUR(modified_time) hour_time, SUM(order_money) hour_money
       |      FROM (
       |            SELECT DISTINCT shipping_user, modified_time, order_money
       |            FROM n_dwd.fact_order_master
       |            WHERE order_status = '已下单'
       |            AND modified_time BETWEEN '2022-04-26 00:00:00'
       |            AND '2022-04-26 10:00:00'
       |            AND order_sn NOT IN (
       |                                   SELECT order_sn
       |                                   FROM n_dwd.fact_order_master
       |                                   WHERE order_status = '已退款'
       |                                   )
       |            ) tmp_a
       |      GROUP BY HOUR(modified_time)
       |     )tmp_b
       |UNION ALL
       |SELECT '2022-04-26 00', 0, 0
       |UNION ALL
       |SELECT '2022-04-26 01', 0, 0
       |UNION ALL
       |SELECT '2022-04-26 02', 0, 0
       |ORDER BY consumptiontime ASC
       |""".stripMargin)

   no_9frame
     .write.mode(SaveMode.Append)
     .format("jdbc").option("url", "jdbc:clickhouse://127.0.0.1:8123")
     .option("diver", "com.clickhouse.jdbc.ClickHouseDiver")
     .option("dbtable", "shtd_result.accumulateconsumption").save()

   // CAST(SUM(hour_money) OVER(ORDER BY hour_time) AS BIGINT)在此处的用处
   // CAST(a AS b) 将a字段转换为b类型
   // 在SUM函数后加开窗排序,可得到累加的效果

   // OS:不开窗能想到的方法只有这个了,面向答案编程(bus)

   /*
   ┌─consumptiontime─┬─consumptionadd─┬─consumptionacc─┐
   │ 2022-04-26 00   │              0 │                 0 │
   │ 2022-04-26 01   │              0 │                 0 │
   │ 2022-04-26 02   │              0 │                 0 │
   │ 2022-04-26 03   │          22723 │             22723 │
   │ 2022-04-26 04   │           4838 │             27561 │
   └──────────┴──────────┴─────────────┘
    */


   /*
   第十题:
   10、  根据dwd层或dws层的相关表,请计算2022年4月26日凌晨0点0分0秒到早上9点59分59秒为止的数据,
   以5个小时为时间窗口,滑动的步长为1小时,做滑动窗口计算该窗口内订单总金额和订单总量,时间不满5小时
   不触发计算(即从凌晨5点0分0秒开始触发计算),存入ClickHouse数据库shtd_result的
   slidewindowconsumption表中,然后在Linux的ClickHouse命令行中根据订单时间段升序排序,
   查询出前5条,将核心业务代码中的开窗相关代码与MySQL查询结果展示出来。

     假如数据为:
     用户 订单时间 订单金额
     张三1号 2020-04-26 00:00:10  10
     李四1号 2020-04-26 00:20:10  25
     李四2号 2020-04-26 01:21:10  10
     李四2号 2020-04-26 02:21:10  5
     王五1号 2020-04-26 03:20:10  20
     李四2号 2020-04-26 04:20:10  10
     王五2号 2020-04-26 05:10:10  10
     李四2号 2020-04-26 06:20:10  10
     赵六2号 2020-04-26 07:10:10  10
     赵六2号 2020-04-26 08:10:10  10
     王五2号 2020-04-26 09:11:10  10
     王五4号 2020-04-26 09:32:10  30

     计算结果则为:
     订单时间段  该窗口内订单金额 订单总量 平均每单价格
     2020-04-26 04  80 6  13.33
     2020-04-26 05  55 5  11
     2020-04-26 06  55 5  11
     2020-04-26 07  60 5  12
     2020-04-26 08  50 5  10
     2020-04-26 09  80 6  13.33

     slidewindowconsumption表结构如下:
     字段 类型 中文含义 备注
     consumptiontime  varchar  订单时间段
     consumptionsum double 该窗口内的订单总金额
     consumptioncount double 订单总数量
     consumptionavg double 平均每单价格 上面两个字段相除,四舍五入保留两位小数
    */

   // UNIX_TIMESTAMP(A, B) 时间字段转时间戳
   // A 代表类时间字段, B 代表时间字段的格式

   // FROM_UNIXTIME(A, B) 时间戳转时间格式
   // A 代表类时间戳, B 代表要转成的时间格式


   spark.sql(
     """
       |SELECT order_money,
       |FROM_UNIXTIME(UNIX_TIMESTAMP(modified_time, 'yyyy-MM-dd HH:mm:ss'), 'yyyy-MM-dd HH') AS modified_time
       |FROM (
       |       SELECT DISTINCT modified_time, order_money
       |       FROM n_dwd.fact_order_master
       |       WHERE order_status = '已下单'
       |       AND modified_time BETWEEN '2022-04-26 00:00:00'
       |       AND '2022-04-26 10:00:00'
       |       AND order_sn NOT IN (
       |                            SELECT order_sn
       |                            FROM n_dwd.fact_order_master
       |                            WHERE order_status = '已退款'
       |                            )
       |       ) tmp_a
       |""".stripMargin).createTempView("fiver_order")

   val no_10frame = spark.sql(
     """
       |SELECT consumptiontime,
       |SUM(consumptionsum) OVER(ORDER BY consumptiontime ROWS 4 PRECEDING) AS consumptionsum,
       |SUM(consumptioncount) OVER(ORDER BY consumptiontime ROWS 4 PRECEDING) AS consumptioncount,
       |AVG(consumptionavg) OVER(ORDER BY consumptiontime ROWS 4 PRECEDING) AS consumptionavg
       |FROM (
       |       SELECT modified_time AS consumptiontime,
       |       SUM(order_money) AS consumptionsum,
       |       COUNT(order_money) AS consumptioncount,
       |       AVG(order_money) AS consumptionavg
       |       FROM fiver_order
       |       GROUP BY modified_time
       |       ) tmp_a
       |WHERE HOUR(consumptiontime) > 4
       |""".stripMargin)

   no_10frame.write.mode(SaveMode.Append)
     .format("jdbc").option("url", "jdbc:clickhouse://127.0.0.1:8123")
     .option("diver", "com.jdbc.clickhouse.ClickHouseDiver")
     .option("dbtable", "shtd_result.slidewindowconsumption").save()

   // bigdata1 :) SELECT * FROM slidewindowconsumption ORDER BY consumptiontime ASC LIMIT 5;

   /*
   ┌─consumptiontime─┬──consumptionsum─┬─consumptioncount─┬───consumptionavg─┐
   │ 2022-04-26 05   │          184952.67 │               44 │             4203.46 │
   │ 2022-04-26 06   │          557175.09 │              128 │            4317.34 │
   │ 2022-04-26 07   │          890390.89 │              211 │           4216.445 │
   │ 2022-04-26 08   │         1282905.98 │              311 │            4143.62 │
   │ 2022-04-26 09   │         1714063.44 │              403 │          4252.1965 │
   └──────────┴─────────────┴───────────┴─────────────┘
    */


   spark.stop()
   spark.close()

 }

}

电商数据处理_指标计算的评论 (共 条)

分享到微博请遵守国家法律