RDD常用的算子操作

詹学伟
詹学伟
发布于 2024-04-21 / 3 阅读
0
0

RDD常用的算子操作

一、map

map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。 任何原RDD中的元素在新
RDD中都有且只有一个元素与之对应

@Test
public void testMap() {
    SparkConf sparkConf = new SparkConf()
            .setAppName("RddOperation")
            .setMaster("local[*]");
    JavaSparkContext jsc = new JavaSparkContext(sparkConf);

    // 生成一个测试rdd数据集
    JavaRDD<Integer> rdd1 = jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5));

    // 将rdd1中每一个元素都乘以2,生成新的rdd
    // JavaRDD<Integer> rdd2 = rdd1.map(new MyFunction()); 方案一
    JavaRDD<Integer> rdd2 = rdd1.map(v1 -> v1 * 2); //方案二

    List<Integer> list = rdd2.collect();
    list.forEach(integer -> System.out.println(integer));

    jsc.stop();
}

二、filter

filter 是对RDD中的每个元素都执行一个指定的函数来过滤产生一个新的RDD。 任何原RDD中的元素在
新RDD中都有且只有一个元素与之对应。

@Test
public void testFilter(){
    SparkConf sparkConf = new SparkConf()
            .setAppName("RddOperation")
            .setMaster("local[*]");
    JavaSparkContext jsc = new JavaSparkContext(sparkConf);

    // 生成一个测试rdd数据集
    JavaRDD<Integer> rdd1 = jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5));

    // 过滤数据,用大于2的数据组成一个新的rdd
    JavaRDD<Integer> rdd2 = rdd1.filter(v1 -> v1 > 2);

    List<Integer> list = rdd2.collect();

    list.forEach(integer -> System.out.println(integer));

    jsc.stop();
}

三、flatMap

与map类似,区别是原RDD中的元素经map处理后只能生成一个元素,而原RDD中的元素经flatmap处
理后可生成多个元素来构建新RDD。 举例:对原RDD中的每个元素x产生y个元素(从1到y,y为元素x
的值)

@Test
public void testFlatMap() {
    SparkConf sparkConf = new SparkConf()
            .setAppName("RddOperation")
            .setMaster("local[*]");
    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
    // 生成RDD
    JavaRDD<String> rdd = jsc.parallelize(Arrays.asList("hello world","hello spark"));
    // 数据压扁操作
    JavaRDD<String> rdd2 = rdd.flatMap(s -> Arrays.asList(s.split(" ")).iterator());

    //收集数据
    List<String> list = rdd2.collect();
    for (String o : list) {
        System.out.println(o);
    }
}

四、mapPartitions

mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输
入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。

@Test
public void testMapPartitions (){
    SparkConf sparkConf = new SparkConf()
            .setAppName("RddOperation")
            .setMaster("local[*]");
    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
    // 生成RDD,分3个区存储
    JavaRDD<Integer> rdd = jsc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10),3);
    // 分区操作数据
    JavaRDD<Integer> rdd2 = rdd.mapPartitions(integerIterator -> {
        List<Integer> result = new ArrayList<>();
        while (integerIterator.hasNext()){
            Integer i = integerIterator.next();
            result.add(i);
        }
        System.out.println("分区数据:" + result);
        return result.iterator();
    });

    //收集数据
    List<Integer> list = rdd2.collect();
    list.forEach(integer ->  System.out.println(integer));
}

五、mapToPair

此函数会对一个RDD中的每个元素调用f函数,其中原来RDD中的每一个元素都是T类型的,调用f函数
后会进行一定的操作把每个元素都转换成一个<K2,V2>类型的对象

@Test
public void testMapToPair (){
    SparkConf sparkConf = new SparkConf()
            .setAppName("RddOperation")
            .setMaster("local[*]");
    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
    // 生成RDD
    JavaRDD<String> rdd = jsc.parallelize(Arrays.asList("hello world","hello spark"));

    // 按照空格分割
    JavaRDD<String> rdd1 = rdd.flatMap(s -> Arrays.asList(s.split(" ")).iterator());

    // 把每一个单词的出现数量标记为1
    JavaPairRDD<Object, Object> rdd2 = rdd1.mapToPair(s -> new Tuple2<>(s, 1));

    //收集数据
    List<Tuple2<Object, Object>> list = rdd2.collect();
    for (Tuple2<Object, Object> o : list) {
        System.out.println(o);
    }
}

六、reduceByKey

顾名思义,reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行reduce,因此,Key
相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。

@Test
public void testReduceByKey (){
    SparkConf sparkConf = new SparkConf()
            .setAppName("RddOperation")
            .setMaster("local[*]");
    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
    // 生成RDD
    JavaRDD<String> rdd = jsc.parallelize(Arrays.asList("hello world","hello spark"));

    // 按照空格分割
    JavaRDD<String> rdd1 = rdd.flatMap(s -> Arrays.asList(s.split(" ")).iterator());

    // 把每一个单词的出现数量标记为1
    JavaPairRDD<Object, Integer> rdd2 = rdd1.mapToPair(s -> new Tuple2<>(s, 1));

    // 按照key相同的数进行相加操作
    JavaPairRDD<Object, Integer> rdd3 = rdd2.reduceByKey((v1, v2) -> v1 + v2);

    //收集数据
    List<Tuple2<Object, Integer>> list = rdd3.collect();
    for (Tuple2<Object, Integer> o : list) {
        System.out.println(o);
    }
}

七、coalesce

该函数用于将RDD进行重分区,使用HashPartitioner。第一个参数为重分区的数目,第二个为是否进
行shuffle,默认为false;

[ˌkəʊəˈles]

[ˌkoʊəˈles]

@Test
public void testCoalesce (){
    SparkConf sparkConf = new SparkConf()
            .setAppName("RddOperation")
            .setMaster("local[*]");
    JavaSparkContext jsc = new JavaSparkContext(sparkConf);
    // 生成RDD
    JavaRDD<Integer> rdd = jsc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));

    System.out.println(rdd.getNumPartitions()); //4
    // 重新分区,shuffle:是否重新分配存储,如果分区数大于原有分区数,就需要设置为true
    JavaRDD<Integer> rdd2 = rdd.coalesce(6, true);
    System.out.println(rdd2.getNumPartitions()); //2

    //收集数据
    List<Integer> list = rdd2.collect();
    for (Integer o : list) {
        System.out.println(o);
    }
}


评论