实时统计每天pv,uv的sparkStreaming结合redis结果存入mysql供前端展示
最近有个需求,实时统计pv,uv
,结果按照date,hour,pv,uv
来展示,按天统计,第二天重新统计,当然了实际还需要按照类型字段分类统计pv,uv
,比如按照date,hour,pv,uv,type
来展示。这里介绍最基本的pv,uv
的展示。
1、项目流程
日志数据从flume
采集过来,落到hdfs
供其它离线业务使用,也会sink
到kafka
,sparkStreaming
从kafka
拉数据过来,计算pv,uv
,uv
是用的redis
的set
集合去重,最后把结果写入mysql
数据库,供前端展示使用。
2、具体过程
1)pv
的计算
拉取数据有两种方式,基于received
和direct
方式,这里用direct
直拉的方式,用的mapWithState
算子保存状态,这个算子与updateStateByKey
一样,并且性能更好。当然了实际中数据过来需要经过清洗,过滤,才能使用。
定义一个状态函数
这样就很容易的把pv
计算出来了。
2)uv的计算
uv是要全天去重的,每次进来一个batch
的数据,如果用原生的reduceByKey
或者groupByKey
对配置要求太高,在配置较低情况下,我们申请了一个93G
的redis
用来去重,原理是每进来一条数据,将date
作为key
,guid
加入set
集合,20
秒刷新一次,也就是将set
集合的尺寸取出来,更新一下数据库即可。
redis连接池代码RedisPoolUtil.scala
:
3)结果保存到数据库
结果保存到mysql
,数据库,20
秒刷新一次数据库,前端展示刷新一次,就会重新查询一次数据库,做到实时统计展示pv,uv
的目的。
msql 连接池代码MysqlPoolUtil.scala
4)数据容错
流处理消费kafka都会考虑到数据丢失问题,一般可以保存到任何存储系统,包括mysql,hdfs,hbase,redis,zookeeper
等到。这里用SparkStreaming
自带的checkpoint
机制来实现应用重启时数据恢复。
checkpoint
这里采用的是checkpoint
机制,在重启或者失败后重启可以直接读取上次没有完成的任务,从kafka
对应offset
读取数据。
checkpoint是每天一个目录,在第二天凌晨定时销毁StreamingContext对象,重新统计计算pv,uv。
注意
ssc.stop(false,true)
表示优雅地销毁StreamingContext
对象,不能销毁SparkContext
对象,ssc.stop(true,true)
会停掉SparkContext
对象,程序就直接停了。
应用迁移或者程序升级
在这个过程中,我们把应用升级了一下,比如说某个功能写的不够完善,或者有逻辑错误,这时候都是需要修改代码,重新打jar包的,这时候如果把程序停了,新的应用还是会读取老的checkpoint
,可能会有两个问题:
- 执行的还是上一次的程序,因为
checkpoint
里面也有序列化的代码;- 直接执行失败,反序列化失败;
其实有时候,修改代码后不用删除checkpoint
也是可以直接生效,经过很多测试,我发现如果对数据的过滤操作导致数据过滤逻辑改变,还有状态操作保存修改,也会导致重启失败,只有删除checkpoint才行,可是实际中一旦删除checkpoint
,就会导致上一次未完成的任务和消费kafka
的offset
丢失,直接导致数据丢失,这种情况下我一般这么做。
这种情况一般是在另外一个集群,或者把
checkpoint
目录修改下,我们是代码与配置文件分离,所以修改配置文件checkpoint
的位置还是很方便的。然后两个程序一起跑,除了checkpoint
目录不一样,会重新建,都插入同一个数据库,跑一段时间后,把旧的程序停掉就好。以前看官网这么说,只能记住不能清楚明了,只有自己做时才会想一下办法去保证数据准确。
5)日志
日志用的log4j2
,本地保存一份,ERROR
级别的日志会通过邮件发送到邮箱。
3、主要代码
需要的maven
依赖:
读取配置文件代码ConfigFactory .java
:
主要业务代码,如下:
package com.js.ipflow.flash.helper
import java.sql.Connection
import java.util.{Calendar, Date}
import com.alibaba.fastjson.JSON
import com.js.ipflow.start.ConfigFactory
import com.js.ipflow.utils.{DateUtil, MysqlPoolUtil, RedisPoolUtil}
import kafka.serializer.StringDecoder
import org.apache.logging.log4j.LogManager
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import redis.clients.jedis.Jedis
object HelperHandle {
val logger = LogManager.getLogger(HelperHandle.getClass.getSimpleName)
// 邮件level=error日志
val logger2 = LogManager.getLogger("email")
def main(args: Array[String]): Unit = {
helperHandle(args(0))
}
def helperHandle(consumeRate: String): Unit = {
// 初始化配置文件
ConfigFactory.initConfig()
val conf = new SparkConf().setAppName(ConfigFactory.sparkstreamname)
conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
conf.set("spark.streaming.kafka.maxRatePerPartition", consumeRate)
conf.set("spark.default.parallelism", "30")
val sc = new SparkContext(conf)
while (true) {
val ssc = StreamingContext.getOrCreate(ConfigFactory.checkpointdir + DateUtil.getDay(0), getStreamingContext _)
ssc.start()
ssc.awaitTerminationOrTimeout(resetTime)
ssc.stop(false, true)
}
def getStreamingContext(): StreamingContext = {
val stateSpec = StateSpec.function(mapFunction)
val ssc = new StreamingContext(sc, Seconds(ConfigFactory.sparkstreamseconds))
ssc.checkpoint(ConfigFactory.checkpointdir + DateUtil.getDay(0))
val zkQuorm = ConfigFactory.kafkazookeeper
val topics = ConfigFactory.kafkatopic
val topicSet = Set(topics)
val kafkaParams = Map[String, String](
"metadata.broker.list" -> (ConfigFactory.kafkaipport)
, "group.id" -> (ConfigFactory.kafkagroupid)
, "auto.offset.reset" -> kafka.api.OffsetRequest.LargestTimeString
)
val rmessage = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicSet
)
// helper数据 (dateHour,guid,helperversion)
val helper_data = FilterHelper.getHelperData(rmessage.map(x => {
val message = JSON.parseObject(x._2).getString("message")
JSON.parseObject(message)
})).repartition(60).cache()
// (guid, datehour + helperversion)
val helper_data_dis = helper_data.map(x => (x._2, addTab(x._1) + x._3)).reduceByKey((x, y) => y)
// pv,uv
val helper_count = helper_data.map(x => (x._1, 1L)).mapWithState(stateSpec).stateSnapshots().repartition(2)
// helperversion
val helper_helperversion_count = helper_data.map(x => (addTab(x._1) + x._3, 1L)).mapWithState(stateSpec).stateSnapshots().repartition(2)
helper_data_dis.foreachRDD(rdd => {
rdd.foreachPartition(eachPartition => {
var jedis: Jedis = null
try {
jedis = getJedis
eachPartition.foreach(x => {
val arr = x._2.split("\t")
val date: String = arr(0).split(":")(0)
// helper 统计
val key0 = "helper_" + date
jedis.sadd(key0, x._1)
jedis.expire(key0, ConfigFactory.rediskeyexists)
// helperversion 统计
val key = date + "_" + arr(1)
jedis.sadd(key, x._1)
jedis.expire(key, ConfigFactory.rediskeyexists)
})
} catch {
case e: Exception => {
logger.error(e)
logger2.error(HelperHandle.getClass.getSimpleName + e)
}
} finally {
if (jedis != null) {
closeJedis(jedis)
}
}
})
})
insertHelper(helper_helperversion_count, "statistic_realtime_flash_helper", "date", "hour", "count_all", "count", "helperversion", "datehour", "dh")
insertHelper(helper_count, "statistic_realtime_helper_count", "date", "hour", "helper_count_all", "helper_count", "dh")
ssc
}
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// 计算当前时间距离次日零点的时长(毫秒)
def resetTime = {
val now = new Date()
val todayEnd = Calendar.getInstance
todayEnd.set(Calendar.HOUR_OF_DAY, 23) // Calendar.HOUR 12小时制
todayEnd.set(Calendar.MINUTE, 59)
todayEnd.set(Calendar.SECOND, 59)
todayEnd.set(Calendar.MILLISECOND, 999)
todayEnd.getTimeInMillis - now.getTime
}
/**
* 插入数据
*
* @param data (addTab(datehour)+helperversion)
* @param tbName
* @param colNames
*/
def insertHelper(data: DStream[(String, Long)], tbName: String, colNames: String*): Unit = {
data.foreachRDD(rdd => {
val tmp_rdd = rdd.map(x => x._1.substring(11, 13).toInt)
if (!rdd.isEmpty()) {
val hour_now = tmp_rdd.max() // 获取当前结果中最大的时间,在数据恢复中可以起作用
rdd.foreachPartition(eachPartition => {
var jedis: Jedis = null
var conn: Connection = null
try {
jedis = getJedis
conn = MysqlPoolUtil.getConnection()
conn.setAutoCommit(false)
val stmt = conn.createStatement()
eachPartition.foreach(x => {
if (colNames.length == 7) {
val datehour = x._1.split("\t")(0)
val helperversion = x._1.split("\t")(1)
val date_hour = datehour.split(":")
val date = date_hour(0)
val hour = date_hour(1).toInt
val colName0 = colNames(0) // date
val colName1 = colNames(1) // hour
val colName2 = colNames(2) // count_all
val colName3 = colNames(3) // count
val colName4 = colNames(4) // helperversion
val colName5 = colNames(5) // datehour
val colName6 = colNames(6) // dh
val colValue0 = addYin(date)
val colValue1 = hour
val colValue2 = x._2.toInt
val colValue3 = jedis.scard(date + "_" + helperversion) // // 2018-07-08_10.0.1.22
val colValue4 = addYin(helperversion)
var colValue5 = if (hour < 10) "'" + date + " 0" + hour + ":00 " + helperversion + "'" else "'" + date + " " + hour + ":00 " + helperversion + "'"
val colValue6 = if (hour < 10) "'" + date + " 0" + hour + ":00'" else "'" + date + " " + hour + ":00'"
var sql = ""
if (hour == hour_now) { // uv只对现在更新
sql = s"insert into ${tbName}(${colName0},${colName1},${colName2},${colName3},${colName4},${colName5},${colName6}) values(${colValue0},${colValue1},${colValue2},${colValue3},${colValue4},${colValue5},${colValue6}) on duplicate key update ${colName2} = ${colValue2},${colName3} = ${colValue3}"
logger.warn(sql)
stmt.addBatch(sql)
} /* else {
sql = s"insert into ${tbName}(${colName0},${colName1},${colName2},${colName4},${colName5},${colName6}) values(${colValue0},${colValue1},${colValue2},${colValue4},${colValue5},${colValue6}) on duplicate key update ${colName2} = ${colValue2}"
}*/
} else if (colNames.length == 5) {
val date_hour = x._1.split(":")
val date = date_hour(0)
val hour = date_hour(1).toInt
val colName0 = colNames(0) // date
val colName1 = colNames(1) // hour
val colName2 = colNames(2) // helper_count_all
val colName3 = colNames(3) // helper_count
val colName4 = colNames(4) // dh
val colValue0 = addYin(date)
val colValue1 = hour
val colValue2 = x._2.toInt
val colValue3 = jedis.scard("helper_" + date) // // helper_2018-07-08
val colValue4 = if (hour < 10) "'" + date + " 0" + hour + ":00'" else "'" + date + " " + hour + ":00'"
var sql = ""
if (hour == hour_now) { // uv只对现在更新
sql = s"insert into ${tbName}(${colName0},${colName1},${colName2},${colName3},${colName4}) values(${colValue0},${colValue1},${colValue2},${colValue3},${colValue4}) on duplicate key update ${colName2} = ${colValue2},${colName3} = ${colValue3}"
logger.warn(sql)
stmt.addBatch(sql)
}
}
})
stmt.executeBatch() // 批量执行sql语句
conn.commit()
} catch {
case e: Exception => {
logger.error(e)
logger2.error(HelperHandle.getClass.getSimpleName + e)
}
} finally {
if (jedis != null) {
closeJedis(jedis)
}
if(conn != null){
conn.close()
}
}
})
}
})
}
def addYin(str: String): String = {
"'" + str + "'"
}
// 字符串添加tab格式化方法
def addTab(str: String): String = {
str + "\t";
}
// 实时流量状态更新函数
val mapFunction = (datehour: String, pv: Option[Long], state: State[Long]) => {
val accuSum = pv.getOrElse(0L) + state.getOption().getOrElse(0L)
val output = (datehour, accuSum)
state.update(accuSum)
output
}
// 获取jedis连接
def getJedis: Jedis = {
val jedis = RedisPoolUtil.getPool.getResource
jedis
}
// 释放jedis连接
def closeJedis(jedis: Jedis): Unit = {
RedisPoolUtil.getPool.returnResource(jedis)
}
}
作者:柯广的网络日志
微信公众号:Java大数据与数据仓库