徜徉在知识海洋的一群鲸鱼
Spark具体分析数据流程
Spark具体分析数据流程

Spark具体分析数据流程

本次项目具体需要实现以下几点需求:

用户访问 session 统计

该模块主要是对用户访问 session 进行统计分析,包括 session 聚合指标计算、按时间比例随机抽取 session、获取每天点击、下单和购买排名前 10 的品类、并获取 top10 品类中排名前 10 的 session。该模块可以让产品经理、数据分析师以及企业管理层形象地看到各种条件下的具体用户行为以及统计指标,从而对公司的产品设计以及业务发展战略做出调整。主要使用 Spark Core 实现。

页面单跳转化率统计

该模块主要是计算关键页面之间的单步跳转转化率,涉及到页面切片算法以及页面流匹配算法。该模块可以让产品经理、数据分析师以及企业管理层看到各个关键页面之间的转化率,从而对网页布局,进行更好的优化设计。主要使用 Spark Core实现。

区域热门商品统计

该模块主要实现每天统计出各个区域的 top3 热门商品。该模块可以让企业管理层看到电商平台在不同区域出售的商品的整体情况,从而对公司的商品相关的战略进行调整。主要使用 Spark SQL 实现。

广告流量实时统计

该模块负责实时统计公司的广告流量,包括广告展现流量和广告点击流量。实现动态黑名单机制,以及黑名单过滤;实现滑动窗口内的各城市的广告展现流量和广告点击流量的统计;实现每个区域每个广告的点击流量实时统计;实现每个区域top3 点击量的广告的统计。主要使用 Spark Streaming 实现。

数据处理

对于离线数据,我们将数据存储到数据库仓库中,之后使用spark分析。

user_visit_action

字段名称说明
date日期,代表这个用户点击行为是在哪一天 发生的
user_id用户 ID,唯一地标识某个用户
session_idSession ID,唯一地标识某个用户的一个访 问 session
page_id页面 ID,点击了某些商品/品类,也可能是搜索了某个关键词,然后进入了某个页面, 页面的 id
action_time动作时间,这个点击行为发生的时间点
search_keyword搜索关键词,如果用户执行的是一个搜索 行为,比如说在网站/app 中,搜索了某个关键词,然后会跳转到商品列表页面;
click_category_id点击品类 ID,可能是在网站首页,点击了 某个品类(美食、电子设备、电脑)
click_product_id点击商品 ID,可能是在网站首页,或者是在商品列表页,点击了某个商品(比如呷哺 呷哺火锅 XX 路店 3 人套餐、iphone 6s)
order_category_ids下单品类 ID,代表了可能将某些商品加入了购物车,然后一次性对购物车中的商品下了一个订单,这就代表了某次下单的行为中, 有哪些商品品类,可能有 6 个商品,但是就 对应了 2 个品类,比如有 3 根火腿肠(食品 品类),3 个电池(日用品品类)
order_product_ids下单商品 ID,某次下单,具体对哪些商品 下的订单
pay_category_ids付款品类 ID,对某个订单,或者某几个订单,进行了一次支付的行为,对应了哪些品 类
pay_product_ids付款商品 ID,支付行为下,对应的哪些具 体的商品
city_id城市 ID,代表该用户行为发生在哪个城市

user_info

字段名称说明
user_id用户 ID,唯一地标识某个用户
username用户登录名
name用户昵称或真实姓名
age用户年龄
professional用户职业
city用户所在城市
sex用户性别

product_info

字段名称说明
proudct_id商品 ID,唯一地标识某个商品
product_name商品名称
extend_info额外信息,例如商品为自营商品还是第三 方商品

对于实时数据,我们采取每 5 秒向 Kafka 集群写入数据,格式如下:

格式 :timestamp province city userid adid

字段名称取值范围
timestamp当前时间毫秒
userId0 – 99
provice/city1 – 9 ((0L,” 北京”,” 北京”),(1L,” 上海”,” 上海”),(2L,” 南京”,” 江苏省 “),(3L,”广州”,”广东省”),(4L,”三亚”,”海南省”),(5L,”武汉”,”湖北省 “),(6L,”长沙”,”湖南省”),(7L,”西安”,”陕西省”),(8L,”成都”,”四川省”),(9L,”哈尔滨”,”东北省”))
adid0 – 19

模块分析

commons 模块

包名称(package解析
conf配置工具类 获取 commerce.properties 文件中的所有配置信息,使用户可以通过对象的方式访问
 commerce.properties 中的所有配置
constant常量接口 包括项目中所需要使用的所有常量
modelSpark SQL 样例类 包括 Spark SQL 中的用户访问动作表、用户信息表、产品表的样例类
poolMySQL 连接池 通过自定义MySQL 连接池实现对MySQL 数据库的操作
utils工具类 提供了日期时间工具类、数字格式工具类、参数工具类、字符串工具类、校验工具类等工具类

conf 包工具类

constant 包常量类

model 包用户数据表

utils 包

类名称(class解析
DateUtils时间工具类 负责时间的格式化、判断时间先后、计算时间差值、获取指定日期等工作
NumberUtils数字工具类 负责数字的格式化工作
ParamUtils参数工具类 负责从JSON 对象中提取参数
StringUtils字符串工具类 负责字符串是否为空判断、字符串截断与补全、从拼接字符串中提取字段、给拼接字符串中字段设置值等工作
ValidUtils校验工具类 负责校验数据中的指定字段是否在指定范围范围内、校验数据中的指定字段中是否有值与参数字段相同、校验数据中的指定字段是否与参数字段相同等工作

mock 模块

Object解析
MockDataGenerate离线模拟数据生成 负责生成离线模拟数据并写入Hive 表中,模拟数据包括用户行为信息、用户信息、产品数据信息等
MockRealtimeDataGenerate实时模拟数据生成 负责生成实时模拟数据并写入 Kafka 中,实时模拟数据为实时广告数据

analyse 模块

数据结构解析

//***************** 输入表 *********************

/**
  * 用户访问动作表
  *
  * @param date               用户点击行为的日期
  * @param user_id            用户的ID
  * @param session_id         Session的ID
  * @param page_id            某个页面的ID
  * @param action_time        点击行为的时间点
  * @param search_keyword     用户搜索的关键词
  * @param click_category_id  某一个商品品类的ID
  * @param click_product_id   某一个商品的ID
  * @param order_category_ids 一次订单中所有品类的ID集合
  * @param order_product_ids  一次订单中所有商品的ID集合
  * @param pay_category_ids   一次支付中所有品类的ID集合
  * @param pay_product_ids    一次支付中所有商品的ID集合
  * @param city_id            城市ID
  */
case class UserVisitAction(date: String,
                           user_id: Long,
                           session_id: String,
                           page_id: Long,
                           action_time: String,
                           search_keyword: String,
                           click_category_id: Long,
                           click_product_id: Long,
                           order_category_ids: String,
                           order_product_ids: String,
                           pay_category_ids: String,
                           pay_product_ids: String,
                           city_id: Long
                          )

/**
  * 用户信息表
  *
  * @param user_id      用户的ID
  * @param username     用户的名称
  * @param name         用户的名字
  * @param age          用户的年龄
  * @param professional 用户的职业
  * @param city         用户所在的城市
  * @param sex          用户的性别
  */
case class UserInfo(user_id: Long,
                    username: String,
                    name: String,
                    age: Int,
                    professional: String,
                    city: String,
                    sex: String
                   )

/**
  * 产品表
  *
  * @param product_id   商品的ID
  * @param product_name 商品的名称
  * @param extend_info  商品额外的信息
  */
case class ProductInfo(product_id: Long,
                       product_name: String,
                       extend_info: String
                      )

作为产生数据的初始表,会在产生数据以及后面的分析数据用到。

需求一:Session 各范围访问步长、访问时长占比统计

需求一要统计出符合筛选条件的 session 中,访问时长在 1s~3s、4s~6s、7s~9s、10s~30s、30s~60s、1m~3m、3m~10m、10m~30m、30m 以上各个范围内的 session 占比;访问步长在 1~3、4~6、7~9、10~30、30~60、60 以上各个范围内的 session 占比,并将结果保存到 MySQL 数据库中。

数据库表

DROP TABLE IF EXISTS `session_aggr_stat`; CREATE TABLE `session_aggr_stat` (
`taskid` varchar(255) DEFAULT NULL,
`session_count` int(11) DEFAULT NULL,
`visit_length_1s_3s_ratio` double DEFAULT NULL,
`visit_length_4s_6s_ratio` double DEFAULT NULL,
`visit_length_7s_9s_ratio` double DEFAULT NULL,
`visit_length_10s_30s_ratio` double DEFAULT NULL,
`visit_length_30s_60s_ratio` double DEFAULT NULL,
`visit_length_1m_3m_ratio` double DEFAULT NULL,
`visit_length_3m_10m_ratio` double DEFAULT NULL,
`visit_length_10m_30m_ratio` double DEFAULT NULL,
`visit_length_30m_ratio` double DEFAULT NULL,
`step_length_1_3_ratio` double DEFAULT NULL,
`step_length_4_6_ratio` double DEFAULT NULL,
`step_length_7_9_ratio` double DEFAULT NULL,
`step_length_10_30_ratio` double DEFAULT NULL,
`step_length_30_60_ratio` double DEFAULT NULL,
`step_length_60_ratio` double DEFAULT NULL, KEY `idx_task_id` (`taskid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

该需求需要完成

对 Session 数据进行聚合,对行为数据按 session 粒度进行分组,对每一个 session 分组进行聚合,将 session 中所有的搜索词和点击品类都聚合起来,遍历 session 所有的访问行为,

并不是每一行访问行为都有 searchKeyword 何 clickCategoryId 两个字段的,只有搜索行为,是有 searchKeyword 字段的,只有点击品类的行为,是有 clickCategoryId 字段的,所以,任何一行行为数据,都不可能两个字段都有,所以数据是可能出现 null 值的,是否将搜索词或点击品类 id 拼接到字符串中去,首先要满足:不能是 null 值,其次,之前的字符串中还没有搜索词或者点击品类 id,计算 session 开始和结束时间,计算 session 访问步长,计算 session 访问时长(秒),查询所有用户数据,并映射成<userid,Row>的格式,将 session 粒度聚合数据,与用户信息进行 join,对 join 起来的数据进行拼接,并且返回<sessionid,fullAggrInfo>格式的数据,后面就是读取hive表,并按照需求书写相应的spark RDD和sql代码。

需求二:Session 随机抽取

在符合条件的 session 中,按照时间比例随机抽取 1000 个 session,抽取完毕之后,需要将 Session 的相关信息和详细信息保存到 MySQL 数据库中。

数据库表

DROP TABLE IF EXISTS `session_detail`; CREATE TABLE `session_detail` (
`taskid` varchar(255) DEFAULT NULL,
`userid` int(11) DEFAULT NULL,
`sessionid` varchar(255) DEFAULT NULL,
`pageid` int(11) DEFAULT NULL,
`actionTime` varchar(255) DEFAULT NULL,
`searchKeyword` varchar(255) DEFAULT NULL,
`clickCategoryId` int(11) DEFAULT NULL,
`clickProductId` int(11) DEFAULT NULL,
`orderCategoryIds` varchar(255) DEFAULT NULL,
`orderProductIds` varchar(255) DEFAULT NULL,
`payCategoryIds` varchar(255) DEFAULT NULL,
`payProductIds` varchar(255) DEFAULT NULL, KEY `idx_task_id` (`taskid`),
KEY `idx_session_id` (`sessionid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS `session_random_extract`; CREATE TABLE `session_random_extract` (
`taskid` varchar(255) DEFAULT NULL,
`sessionid` varchar(255) DEFAULT NULL,
`startTime` varchar(50) DEFAULT NULL,
`searchKeywords` varchar(255) DEFAULT NULL,
`clickCategoryIds` varchar(255) DEFAULT NULL, KEY `idx_task_id` (`taskid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

我们需要计算出每天每小时的 session 数量,获取<yyyy-MM-dd_HH,aggrInfo>格式的 RDD,得到每天每小时的 session 数量

使用按时间 比例随机抽 取算法, 计算 出每天每小 时要抽取 session 的索引,将

<yyyy-MM-dd_HH,count>格式的 map,转换成<yyyy-MM-dd,<HH,count>>的格式,按时间比例随机抽取算法,总共要抽取 100 个 session,先按照天数,进行平分,计算每个小时的 session 数量,占据当天总 session 数量的比例,直接乘以每天要抽取的数量,一旦随机生成的 index 已经存在,重新获取,直到获取到之前没有的 index,

遍历每天每小时的 session,然后根据随机索引进行抽取,我们用 flatMap 算子,遍历所有的

<dateHour,(session aggrInfo)>格式的数据,

获取抽取出来的 session 的明细数据,根据 sessionId 与详细数据进行聚合,

需求三:Top10热门品类

数据中的每个 session 可能都会对一些品类的商品进行点击、下单和支付等等行为, 那么现在就需要获取这些 session 点击、下单和支付数量排名前 10 的最热门的品类。也就是说,要计算出所有这些 session 对各个品类的点击、下单和支付的次数, 然后按照这三个属性进行排序,获取前 10 个品类。

数据库

DROP TABLE IF EXISTS `top10_category`; CREATE TABLE `top10_category` (
`taskid` varchar(255) DEFAULT NULL,
`categoryid` int(11) DEFAULT NULL,
`clickCount` int(11) DEFAULT NULL,
`orderCount` int(11) DEFAULT NULL,
`payCount` int(11) DEFAULT NULL, KEY `idx_task_id` (`taskid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

获取每一个 Sessionid 点击过、下单过、支付过的数量,计算各品类的点击、下单和支付的次数,

join 各品类与它的点击、下单和支付的次数,distinctCategoryIdRDD 中是所有产生过点击、下单、支付行为的商品类别,通过 distinctCategoryIdRDD 与各个统计数据的 LeftJoin 保证数据的完整性,将数据映射成<CategorySortKey,info>格式的 RDD,然后进行二次排序(降序),用 take(10)取出 top10 热门品类,并写入 MySQL。

需求四:Top10热门品类 Top10活跃 Session 统计

对于排名前 10 的品类,分别获取其点击次数排名前 10 的 session。

数据库

DROP TABLE IF EXISTS `top10_session`; CREATE TABLE `top10_session` (
`taskid` varchar(255) DEFAULT NULL,
`categoryid` int(11) DEFAULT NULL,
`sessionid` varchar(255) DEFAULT NULL,
`clickCount` int(11) DEFAULT NULL, KEY `idx_task_id` (`taskid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

将 top10 热门品类的 id,生成一份 RDD,计算 top10 品类被各 session 点击的次数,分组取 TopN 算法实现,获取每个品类的 top10 活跃用户,获取 top10 活跃 session 的明细数据,将活跃 Session 的明细数据,写入到 MySQL

需求五:页面转化率统计

计算页面单跳转化率, 什么是页面单跳转换率, 比如一个用户在一次 Session 过程中访问的页面路径 3,5,7,9,10,21,那么页面 3 跳到页面 5 叫一次单跳,7-9 也叫一次单跳,那么单跳转化率就是要统计页面点击的概率,比如: 计算 3-5 的单跳转化率,先获取符合条件的 Session 对于页面 3 的访问次数(PV)为 A,然后获取符合条件的 Session 中访问了页面 3 又紧接着访问了页面 5 的次数为 B,那么 B/A 就是 3-5 的页面单跳转化率,我们记为 C;那么页面 5-7 的转化率怎么求呢?先需要求出符合条件的 Session 中访问页面 5 又紧接着访问了页面 7 的次数为 D,那么 D/B 即为 5-7 的单跳转化率。

DROP TABLE IF EXISTS `page_split_convert_rate`; CREATE TABLE `page_split_convert_rate` (
`taskid` varchar(255) DEFAULT NULL,
`convertRate` varchar(255) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
页面转化率的求解思路是通过 UserAction 表获取一个 session 的所有 UserAction,根据时间顺序排序后获取全部 PageId
然后将PageId 组合成PageFlow,即1,2,3,4,5 的形式(按照时间顺序排列),之后,组合为1_2, 2_3, 3_4, ...
的形式
然后筛选出出现在 targetFlow 中的所有 A_B
对每个 A_B 进行数量统计,然后统计 startPage 的 PV,之后根据 targetFlow 的 A_B 顺序,计算每一层的转化率

需求六:各区域 Top3商品统计

根据用户指定的日期查询条件范围,统计各个区域下的最热门【点击】的 top3 商品

DROP TABLE IF EXISTS `area_top3_product`; CREATE TABLE `area_top3_product` (
`taskid` varchar(255) DEFAULT NULL,
`area` varchar(255) DEFAULT NULL,
`areaLevel` varchar(255) DEFAULT NULL,
`productid` int(11) DEFAULT NULL,
`cityInfos` varchar(255) DEFAULT NULL,
`clickCount` int(11) DEFAULT NULL,
`productName` varchar(255) DEFAULT NULL,
`productStatus` varchar(255) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
1.	查询 task,获取日期范围,通过 Spark SQL,查询 user_visit_action 表中的指定日期范围内的数据,过滤出,商品点击行为, click_product_id is not null ; click_product_id != 'NULL'; click_product_id != 'null'; city_id, click_product_id。
2.	使用 Spark SQL 从 MySQL 中查询出来城市信息( city_id、city_name、area), 用户访问行为数据要跟城市信息进行 join, city_id、city_name、area、product_id, RDD,转换成 DataFrame,注册成一个临时表。
3.	Spark SQL 内置函数( case when),对 area 打标记(华东大区,A 级,华中大区, B 级, 东北大区, C 级, 西北大区, D 级), area_level。
4.	计算出来每个区域下每个商品的点击次数, group by area, product_id;保留每个区域的城市名称列表; 自定义 UDAF, group_concat_distinct()函数,聚合出来一个 city_names 字段, area、product_id、city_names、click_count。
5.	join 商品明细表,hive( product_id、product_name、extend_info),extend_info 是 json 类型,自定义 UDF,get_json_object()函数,取出其中的 product_status 字段, if()函数( Spark SQL 内置函数), 判断, 0 自营, 1 第三方;( area、product_id、city_names、click_count、product_name、product_status)。
6.	开窗函数,根据 area 来聚合,获取每个 area 下,click_count 排名前 3 的 product信息; area 、 area_level 、 product_id 、 city_names 、 click_count 、 product_name 、product_status
7.	结果写入 MySQL 表中。
 

需求七:广告黑名单实时统计

实现实时的动态黑名单机制:将每天对某个广告点击超过 100 次的用户拉黑。

DROP TABLE IF EXISTS `ad_blacklist`; CREATE TABLE `ad_blacklist` (
`userid` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS `ad_user_click_count`; CREATE TABLE `ad_user_click_count` (
`date` varchar(30) DEFAULT NULL,
`userid` int(11) DEFAULT NULL,
`adid` int(11) DEFAULT NULL,
`clickCount` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

需求八:广告点击量实时统计

每天各省各城市各广告的点击流量实时统计。

DROP TABLE IF EXISTS `ad_stat`; CREATE TABLE `ad_stat` (
`date` varchar(30) DEFAULT NULL,
`province` varchar(100) DEFAULT NULL,
`city` varchar(100) DEFAULT NULL,
`adid` int(11) DEFAULT NULL,
`clickCount` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

需求九:各省热门广告实时统计

统计每天各省 top3 热门广告。

DROP TABLE IF EXISTS `ad_province_top3`; CREATE TABLE `ad_province_top3` (
`date` varchar(30) DEFAULT NULL,
`province` varchar(100) DEFAULT NULL,
`adid` int(11) DEFAULT NULL,
`clickCount` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

需求十:最近一小时广告点击量实时统计

统计各广告最近 1 小时内的点击量趋势:各广告最近 1 小时内各分钟的点击量。

DROP TABLE IF EXISTS `ad_click_trend`; CREATE TABLE `ad_click_trend` (
`date` varchar(30) DEFAULT NULL,
`hour` varchar(30) DEFAULT NULL,
`minute` varchar(30) DEFAULT NULL,
`adid` int(11) DEFAULT NULL,
`clickCount` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;