Spark数据清洗案例

詹学伟
詹学伟
发布于 2024-04-21 / 3 阅读
0
0

Spark数据清洗案例

前提:搭建hadoop集群环境、搭建好spark集群环境

1.新建maven项目

2.导入maven依赖

<?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.zxw.spark</groupId>
        <artifactId>zxw-spark</artifactId>
        <version>1.0</version>

        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>

        <dependencies>
            <!-- spark -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.4.5</version>
            </dependency>

            <!-- scala-->
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>2.11.12</version>
            </dependency>

            <!-- fastjson -->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.83</version>
                <scope>compile</scope>
            </dependency>
        </dependencies>

        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-jar-plugin</artifactId>
                    <version>2.4</version>
    <!--                <configuration>-->
    <!--                    <archive>-->
    <!--                        <manifest>-->
    <!--                            <addClasspath>ture</addClasspath>-->
    <!--                            <classpathPrefix>lib/</classpathPrefix>-->
    <!--                            <mainClass>com.zxw.spark.WordCount</mainClass>-->
    <!--                        </manifest>-->
    <!--                    </archive>-->
    <!--                </configuration>-->
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <encoding>UTF-8</encoding>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>

3.编写代码

  • 实体类:

package com.zxw.spark.pojo;


/**
 * @author admin
 * @version 1.0
 * @project gckj-biz-server
 * @description
 * @date 2023/4/10 17:40:01
 */
public class LogEntity {

    /**
     * id
     */
    private String id;
    /**
     * 客户端ip地址
     */
    private String ip;
    /**
     * 省
     */
    private String province;
    /**
     * 请求url
     */
    private String url;
    /**
     * 请求方式
     */
    private String method;
    /**
     * 响应状态码
     */
    private Integer code;
    /**
     * 时间
     */
    private String create_time;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public String getProvince() {
        return province;
    }

    public void setProvince(String province) {
        this.province = province;
    }

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public String getMethod() {
        return method;
    }

    public void setMethod(String method) {
        this.method = method;
    }

    public Integer getCode() {
        return code;
    }

    public void setCode(Integer code) {
        this.code = code;
    }

    public String getCreate_time() {
        return create_time;
    }

    public void setCreate_time(String create_time) {
        this.create_time = create_time;
    }

    @Override
    public String toString() {
        return "LogEntity{" +
                "id='" + id + '\'' +
                ", ip='" + ip + '\'' +
                ", province='" + province + '\'' +
                ", url='" + url + '\'' +
                ", method='" + method + '\'' +
                ", code=" + code +
                ", create_time='" + create_time + '\'' +
                '}';
    }
}

  • 工具类:

package com.zxw.spark.utils;

import org.apache.commons.lang3.StringUtils;

/**
 * @author admin
 * @version 1.0
 * @project example-mr
 * @description
 * @date 2023/4/12 15:58:47
 */
public class StrUtil {

    /**
     * 判断日志是否包含"{"、"}"
     *
     * @param str 目标字符串
     * @return 是否包含"{"、"}"
     */
    public static boolean isLegal(String str) {
        return StringUtils.isNotBlank(str) && str.contains("{") && str.contains("}");
    }

    /**
     * 截取目标字符串中json字符串部分
     *
     * @param str 目标字符串
     * @return json字符串部分
     */
    public static String substring(String str) {
        int start = str.indexOf("{");
        int end = str.lastIndexOf("}");
        return str.substring(start, end + 1);
    }

    /**
     * 判断目标字符串是否是json格式
     *
     * @param str 目标字符串
     * @return 是否是json格式
     */
    public static boolean isJSON(String str) {
        boolean result = false;
        if (str.startsWith("{") && str.endsWith("}")) {
            result = true;
        } else if (str.startsWith("[") && str.endsWith("]")) {
            result = true;
        }
        return result;
    }
}

  • 数据清洗:

package com.zxw.spark;

import com.zxw.spark.pojo.LogEntity;
import com.zxw.spark.utils.StrUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.codehaus.jackson.map.ObjectMapper;

/**
 * @author admin
 * @version 1.0
 * @project zxw-spark
 * @description
 * @date 2023/4/18 14:36:32
 */
public class Etl {

    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        /**
         * 1、创建SparkConf对象,设置Spark应用程序的配置信息
         */
        SparkConf sparkConf = new SparkConf()
                //设置Spark应用程序的名称
                .setAppName("wordCount_v2")
                // 设置模式为本地模式 [*] 为使用本机核数
                // .setMaster("local[*]");
                .setMaster("spark://node3:7077");
        /**
         * 2、创建SparkContext对象,Java开发使用JavaSparkContext;Scala开发使用SparkContext
         * 在Spark中,SparkContext负责连接Spark集群,创建RDD、累积量和广播量等。
         * Master参数是为了创建TaskSchedule(较低级的调度器,高层次的调度器为DAGSchedule),如下:
         *  如果setMaster("local")则创建LocalSchedule;
         *  如果setMaster("spark")则创建SparkDeploySchedulerBackend。在SparkDeploySchedulerBackend的start函数,会启动一个Client对象,连接到Spark集群。
         */
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
        // 读取本地文件
        JavaRDD<String> stringJavaRDD = javaSparkContext.textFile(args[0]);

        JavaRDD<String> mapJavaRDD = stringJavaRDD.map(new Function<String, String>() {
            @Override
            public String call(String line) throws Exception {
                //2:解析一行数据
                boolean result = StrUtil.isLegal(line);
                if (!result) {
                    return "";
                }
                String substring = StrUtil.substring(line);

                ObjectMapper om = new ObjectMapper();
                LogEntity logEntity = om.readValue(substring, LogEntity.class);
                String sb = logEntity.getId() +
                        "," +
                        logEntity.getIp() +
                        "," +
                        logEntity.getProvince() +
                        "," +
                        logEntity.getUrl() +
                        "," +
                        logEntity.getMethod() +
                        "," +
                        logEntity.getCode() +
                        "," +
                        logEntity.getCreate_time();

                return sb;
            }
        });

        JavaRDD<String> filterJavaRDD = mapJavaRDD.filter(new Function<String, Boolean>() {
            @Override
            public Boolean call(String s) throws Exception {
                return !s.equals("");
            }
        });
        int size = filterJavaRDD.collect().size();
        filterJavaRDD.saveAsTextFile(args[1]);
        javaSparkContext.close();
        long end = System.currentTimeMillis();
        System.out.println(end);
        System.out.println("总数据:" + size);
        long time = (end - start) / 1000;
        System.out.println("耗时:" + time);
    }
}

4.说明

本地运行模式:edit configuration->设置参数(空格隔开)

hdfs://node3:8020/sdk/20230418/logs-.* hdfs://node3:8020/data/spark

集群运行模式:maven打成jar包,上传至sprark master节点,

 /usr/local/spark-3.3.0/bin/spark-submit --master spark://node3:7077 \
--class com.zxw.spark.Etl \
--executor-memory 1g \
--total-executor-cores 1 \
/opt/zxw-spark-1.0.jar hdfs://node3:8020/sdk/20230418/logs-.* hdfs://node3:8020/data/spark2


评论