前提:搭建hadoop集群环境、搭建好spark集群环境
1.新建maven项目
2.导入maven依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zxw.spark</groupId>
<artifactId>zxw-spark</artifactId>
<version>1.0</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<!-- spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<!-- scala-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
</dependency>
<!-- fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<!-- <configuration>-->
<!-- <archive>-->
<!-- <manifest>-->
<!-- <addClasspath>ture</addClasspath>-->
<!-- <classpathPrefix>lib/</classpathPrefix>-->
<!-- <mainClass>com.zxw.spark.WordCount</mainClass>-->
<!-- </manifest>-->
<!-- </archive>-->
<!-- </configuration>-->
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
3.编写代码
实体类:
package com.zxw.spark.pojo;
/**
* @author admin
* @version 1.0
* @project gckj-biz-server
* @description
* @date 2023/4/10 17:40:01
*/
public class LogEntity {
/**
* id
*/
private String id;
/**
* 客户端ip地址
*/
private String ip;
/**
* 省
*/
private String province;
/**
* 请求url
*/
private String url;
/**
* 请求方式
*/
private String method;
/**
* 响应状态码
*/
private Integer code;
/**
* 时间
*/
private String create_time;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getProvince() {
return province;
}
public void setProvince(String province) {
this.province = province;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getMethod() {
return method;
}
public void setMethod(String method) {
this.method = method;
}
public Integer getCode() {
return code;
}
public void setCode(Integer code) {
this.code = code;
}
public String getCreate_time() {
return create_time;
}
public void setCreate_time(String create_time) {
this.create_time = create_time;
}
@Override
public String toString() {
return "LogEntity{" +
"id='" + id + '\'' +
", ip='" + ip + '\'' +
", province='" + province + '\'' +
", url='" + url + '\'' +
", method='" + method + '\'' +
", code=" + code +
", create_time='" + create_time + '\'' +
'}';
}
}
工具类:
package com.zxw.spark.utils;
import org.apache.commons.lang3.StringUtils;
/**
* @author admin
* @version 1.0
* @project example-mr
* @description
* @date 2023/4/12 15:58:47
*/
public class StrUtil {
/**
* 判断日志是否包含"{"、"}"
*
* @param str 目标字符串
* @return 是否包含"{"、"}"
*/
public static boolean isLegal(String str) {
return StringUtils.isNotBlank(str) && str.contains("{") && str.contains("}");
}
/**
* 截取目标字符串中json字符串部分
*
* @param str 目标字符串
* @return json字符串部分
*/
public static String substring(String str) {
int start = str.indexOf("{");
int end = str.lastIndexOf("}");
return str.substring(start, end + 1);
}
/**
* 判断目标字符串是否是json格式
*
* @param str 目标字符串
* @return 是否是json格式
*/
public static boolean isJSON(String str) {
boolean result = false;
if (str.startsWith("{") && str.endsWith("}")) {
result = true;
} else if (str.startsWith("[") && str.endsWith("]")) {
result = true;
}
return result;
}
}
数据清洗:
package com.zxw.spark;
import com.zxw.spark.pojo.LogEntity;
import com.zxw.spark.utils.StrUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.codehaus.jackson.map.ObjectMapper;
/**
* @author admin
* @version 1.0
* @project zxw-spark
* @description
* @date 2023/4/18 14:36:32
*/
public class Etl {
public static void main(String[] args) {
long start = System.currentTimeMillis();
/**
* 1、创建SparkConf对象,设置Spark应用程序的配置信息
*/
SparkConf sparkConf = new SparkConf()
//设置Spark应用程序的名称
.setAppName("wordCount_v2")
// 设置模式为本地模式 [*] 为使用本机核数
// .setMaster("local[*]");
.setMaster("spark://node3:7077");
/**
* 2、创建SparkContext对象,Java开发使用JavaSparkContext;Scala开发使用SparkContext
* 在Spark中,SparkContext负责连接Spark集群,创建RDD、累积量和广播量等。
* Master参数是为了创建TaskSchedule(较低级的调度器,高层次的调度器为DAGSchedule),如下:
* 如果setMaster("local")则创建LocalSchedule;
* 如果setMaster("spark")则创建SparkDeploySchedulerBackend。在SparkDeploySchedulerBackend的start函数,会启动一个Client对象,连接到Spark集群。
*/
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
// 读取本地文件
JavaRDD<String> stringJavaRDD = javaSparkContext.textFile(args[0]);
JavaRDD<String> mapJavaRDD = stringJavaRDD.map(new Function<String, String>() {
@Override
public String call(String line) throws Exception {
//2:解析一行数据
boolean result = StrUtil.isLegal(line);
if (!result) {
return "";
}
String substring = StrUtil.substring(line);
ObjectMapper om = new ObjectMapper();
LogEntity logEntity = om.readValue(substring, LogEntity.class);
String sb = logEntity.getId() +
"," +
logEntity.getIp() +
"," +
logEntity.getProvince() +
"," +
logEntity.getUrl() +
"," +
logEntity.getMethod() +
"," +
logEntity.getCode() +
"," +
logEntity.getCreate_time();
return sb;
}
});
JavaRDD<String> filterJavaRDD = mapJavaRDD.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String s) throws Exception {
return !s.equals("");
}
});
int size = filterJavaRDD.collect().size();
filterJavaRDD.saveAsTextFile(args[1]);
javaSparkContext.close();
long end = System.currentTimeMillis();
System.out.println(end);
System.out.println("总数据:" + size);
long time = (end - start) / 1000;
System.out.println("耗时:" + time);
}
}
4.说明
本地运行模式:edit configuration->设置参数(空格隔开)
hdfs://node3:8020/sdk/20230418/logs-.* hdfs://node3:8020/data/spark
集群运行模式:maven打成jar包,上传至sprark master节点,
/usr/local/spark-3.3.0/bin/spark-submit --master spark://node3:7077 \
--class com.zxw.spark.Etl \
--executor-memory 1g \
--total-executor-cores 1 \
/opt/zxw-spark-1.0.jar hdfs://node3:8020/sdk/20230418/logs-.* hdfs://node3:8020/data/spark2