博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
数据清洗
阅读量:5217 次
发布时间:2019-06-14

本文共 8404 字,大约阅读时间需要 28 分钟。

 

hadoop fs -rm -r -skipTrash /flumu //删除跳过垃圾回收站

 导入数据到HDFS

[kris@hadoop102 ~]$ hadoop fs -mkdir -p /user/hive/warehouse/ods.db/origin_user_behavior/2019-05-14把数据上传到ods层[kris@hadoop101 datas]$ hadoop fs -put *.txt /user/hive/warehouse/ods.db/origin_user_behavior/2019-05-14
[kris@hadoop101 spark]$  bin/spark-submit --total-executor-cores 2 --class com.atguigu.data_monitor.GeneratorUserBehaviorMonitorData Online-Edu-1.0-SNAPSHOT.jar 2019-05-20[kris@hadoop101 spark]$ hadoop fs -mkdir -p /user/hive/warehouse/ods.db/origin_user_behavior/2019-05-20[kris@hadoop101 spark]$ hadoop fs -put *.txt /user/hive/warehouse/ods.db/origin_user_behavior/2019-05-20[kris@hadoop101 spark]$ rm -r *.txt

 

 

数据清洗和加载

Hive字段如下:

uid STRING comment "用户唯一标识",username STRING comment "用户昵称",gender STRING comment "性别",level TINYINT comment "1代表小学,2代表初中,3代表高中",is_vip TINYINT comment "0代表不是会员,1代表是会员",os STRING comment "操作系统:os,android等",channel STRING comment "下载渠道:auto,toutiao,huawei",net_config STRING comment "当前网络类型",ip STRING comment "IP地址",phone STRING comment "手机号码",video_id INT comment "视频id",video_length INT comment "视频时长,单位秒",start_video_time BIGINT comment "开始看视频的时间缀,秒级",end_video_time BIGINT comment "退出视频时的时间缀,秒级",version STRING comment "版本",event_key STRING comment "事件类型",event_time BIGINT comment "事件发生时的时间缀,秒级"
View Code

1) 用SparkCore将数据清洗,清洗需求如下:

  a)  手机号脱敏:187xxxx2659

  b) 过滤重复行(重复条件,uid,event_key,event_time三者都相同即为重复)

  c)  最终数据保存到dwd.user_behavior分区表,以dt(天)为分区条件,表的文件存储格式为ORC,数据总量为xxxx条

整体流程(使用调度系统调度)

  a) SparkCore清洗数据,写入到/user/hive/warehouse/tmp.db/user_behavior_${day}目录

  b) 建立tmp.user_behavior_${day}临时表,并加载上面清洗后的数据

  c)  使用hive引擎,并用开窗函数row_number,将tmp.user_behavior_${day}表数据插入到dwd.user_behavior表中

  d) 删除tmp.user_behavior_${day}临时表

在IDEA中配置参数;在resources在拷贝core-site.xml、hdfs-site.xml、hive-site.xml文件

数据清洗代码实现:

/**  * 用户行为数据清洗  * 1、验证数据格式是否正确,切分后长度必须为17  * 2、手机号脱敏,格式为123xxxx4567  * 3、去掉username中带有的\n,否则导致写入HDFS时会换行  */object UserBehaviorCleaner {  def main(args: Array[String]): Unit = {    if (args.length != 2){      println("Usage:please input inputPath and outputPath")      System.exit(1)    }    val inputPaht = args(0)    val outputPath = args(1)    val conf: SparkConf = new SparkConf().setAppName(getClass.getSimpleName) //.setMaster("local[*]")    val sparkContext = new SparkContext(conf)    // 通过输入路径获取RDD    val eventRDD: RDD[String] = sparkContext.textFile(inputPaht)    // 清洗数据,在算子中不要写大量业务逻辑,应该将逻辑封装到方法中    eventRDD.filter(event => checkEventValid(event)).map(event => maskPhone(event))      .map(event => repairUsername(event))      .coalesce(3)      .saveAsTextFile(outputPath)    sparkContext.stop()  }  def repairUsername(event:String)= {    val field: Array[String] = event.split("\t")    //取出用户昵称    val username: String = field(1)    if (username != null && "".equals(username)){      field(1) = username.replace("\n", "")    }    field.mkString("\t")  }  def maskPhone(event:String): String ={    var maskPhone = new StringBuilder    val fields: Array[String] = event.split("\t")    //取出手机号    var phone: String = fields(9)    // 手机号不为空时做掩码处理    if (phone != null && !"".equals(phone)){      maskPhone = maskPhone.append(phone.substring(0,3)).append("xxxx").append(phone.substring(7,11))      fields(9) = maskPhone.toString()    }    fields.mkString("\t")  }  def checkEventValid(event:String) ={    event.split("\t").length == 17  }}
View Code

               

    

    -DHADOOP_USER_NAME=kris -Dspark.master=local[2]

    输入路径:/user/hive/warehouse/ods.db/origin_user_behavior2019-05-14

    输出路径:/user/hive/warehouse/tmp.db/user_behavior20190514

Spark on yarn完整提交命令  输入:HDFS中ODS目录; 输出: HDFS中tmp目录

java -cp  mainClass需要指定全类名; java -jar 就不需要指定了,指定jar包即可

spark-submit --master yarn --deploy-mode cluster \--num-executors 8 \--executor-cores 4 \--executor-memory 12G \--class com.kris.user_behavior.UserBehaviorCleaner UserBehaviorCleaner.jar \hdfs://hadoop101:9000/user/hive/warehouse/ods.db/origin_user_behavior/${day} \hdfs://hadoop101:9000/user/hive/warehouse/tmp.db/user_behavior_${day}
View Code

 

创建临时--内部表:

路径为刚刚输入清洗后的输出路径,就可直接在Hive表中查询出数据;

 

创建内部表: //数据已经导入了;create database tmp;drop table if exists tmp.user_behavior20190514; create table if not exists tmp.user_behavior20190514(uid STRING comment "用户唯一标识",username STRING comment "用户昵称",gender STRING comment "性别",level TINYINT comment "1代表小学,2代表初中,3代表高中",is_vip TINYINT comment "0代表不是会员,1代表是会员",os STRING comment "操作系统:os,android等",channel STRING comment  "下载渠道:auto,toutiao,huawei",net_config STRING comment "当前网络类型",ip STRING comment "IP地址",phone STRING comment "手机号码",video_id INT comment "视频id",video_length INT comment "视频时长,单位秒",start_video_time BIGINT comment "开始看视频的时间缀,秒级",end_video_time BIGINT comment "退出视频时的时间缀,秒级",version STRING comment "版本",event_key STRING comment  "事件类型",event_time BIGINT comment  "事件发生时的时间缀,秒级")row format delimited fields terminated by "\t" location "/user/hive/warehouse/tmp.db/user_behavior20190514"; //数据所在的目录;

创建orc格式的外部表

.txt文件不能直接load成orc格式文件

Caused by: java.io.IOException: Malformed ORC file
原因:
ORC格式是列式存储的表,不能直接从本地文件导入数据,只有当数据源表也是ORC格式存储时,才可以直接加载,否则会出现上述报错。
解决办法:
要么将数据源表改为以ORC格式存储的表,要么新建一个以textfile格式的临时表先将源文件数据加载到该表,然后在从textfile表中insert数据到ORC目标表中。

通过insert overwrite将txt格式转换成orc格式;

create database dwd;drop table if exists dwd.user_behavior; create external table if not exists dwd.user_behavior(uid STRING comment "用户唯一标识",username STRING comment "用户昵称",gender STRING comment "性别",level TINYINT comment "1代表小学,2代表初中,3代表高中",is_vip TINYINT comment "0代表不是会员,1代表是会员",os STRING comment "操作系统:os,android等",channel STRING comment  "下载渠道:auto,toutiao,huawei",net_config STRING comment "当前网络类型",ip STRING comment "IP地址",phone STRING comment "手机号码",video_id INT comment "视频id",video_length INT comment "视频时长,单位秒",start_video_time BIGINT comment "开始看视频的时间缀,秒级",end_video_time BIGINT comment "退出视频时的时间缀,秒级",version STRING comment "版本",event_key STRING comment  "事件类型",event_time BIGINT comment  "事件发生时的时间缀,秒级") partitioned by(dt INT)  row format delimited fields terminated by "\t" stored as ORC

查询并在外部表中插入数据:

create table if not exists tmp.user_behavior20190307(uid STRING comment "用户唯一标识",username STRING comment "用户昵称",gender STRING comment "性别",level TINYINT comment "1代表小学,2代表初中,3代表高中",is_vip TINYINT comment "0代表不是会员,1代表是会员",os STRING comment "操作系统:os,android等",channel STRING comment  "下载渠道:auto,toutiao,huawei",net_config STRING comment "当前网络类型",ip STRING comment "IP地址",phone STRING comment "手机号码",video_id INT comment "视频id",video_length INT comment "视频时长,单位秒",start_video_time BIGINT comment "开始看视频的时间缀,秒级",end_video_time BIGINT comment "退出视频时的时间缀,秒级",version STRING comment "版本",event_key STRING comment  "事件类型",event_time BIGINT comment  "事件发生时的时间缀,秒级")row format delimited fields terminated by "\t" location "/user/hive/warehouse/tmp.db/user_behavior20190307"; insert overwrite table dwd.user_behavior partition(dt=20190307)    select uid,username,gender,level,is_vip,os,channel,net_config,ip,phone,video_id,    video_length,start_video_time,end_video_time,version,event_key,event_time from (select     uid,username,gender,level,is_vip,os,channel,net_config,ip,phone,video_id,    video_length,start_video_time,end_video_time,version,event_key,event_time,    row_number() OVER (PARTITION BY uid,event_key,event_time ORDER BY event_time) u_rank from tmp.user_behavior20190307) temp where u_rank = 1;

 

远程调试

在IDEA中进行调试:

       场景:以后工作中经常会遇到在本地执行没有问题,到了服务器跑的数据就是错误的

    IDEA设置:Run --> Edit Configurations添加Remote

 

[kris@hadoop101 spark]$ bin/spark-submit --master local[2] --driver-java-options "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=18888" --class com.atguigu.user_behavior.UserBehaviorCleaner Online-Edu-1.0-SNAPSHOT.jar hdfs://hadoop101:9000/user/hive/warehouse/ods.db/origin_user_behavior/2019-05-14 hdfs://hadoop101:9000/user/hive/warehouse/tmp.db/user_behavior20190514Listening for transport dt_socket at address: 18888

在IDEA中就可以进行EDBUG:

使用spark-shell进行调试

[kris@hadoop101 spark]$ bin/spark-shell --master local[2] --jars Online-Edu-1.0-SNAPSHOT.jar scala> sc.textFile("/user/hive/warehouse/ods.db/origin_user_behavior/2019-05-14")res0: org.apache.spark.rdd.RDD[String] = /user/hive/warehouse/ods.db/origin_user_behavior/2019-05-14 MapPartitionsRDD[1] at textFile at 
:25scala> def checkEventValid(event:String) ={ | event.split("\t").length == 17 | }checkEventValid: (event: String)Boolean

 

转载于:https://www.cnblogs.com/shengyang17/p/10868914.html

你可能感兴趣的文章
windows自带的可生成各种数据库连接字符串工具打开方法
查看>>
Python命名规范
查看>>
滚动条
查看>>
程序员的自我修养九Windows下的动态链接
查看>>
Codeforces Round #361 (Div. 2)
查看>>
细说WebSocket - Node篇
查看>>
jenkins+testNG
查看>>
[洛谷1485] 火枪打怪
查看>>
PAT B1018.锤子剪刀布(20)
查看>>
Extjs控件之 grid打印功能
查看>>
枚举类型(不常用)递归
查看>>
ETL
查看>>
Tomcat源码分析(六)--日志记录器和国际化
查看>>
YII缓存依赖的应用
查看>>
决策树在机器学习的理论学习与实践
查看>>
Biee 11g权限详解
查看>>
minggw 安装
查看>>
Jquery操作cookie,实现简单的记住用户名的操作
查看>>
[BZOJ1196][HNOI2006]公路修建问题 二分答案+最小生成树
查看>>
PHP基础入门(二)
查看>>