一、map
map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。 任何原RDD中的元素在新
RDD中都有且只有一个元素与之对应
@Test
public void testMap() {
SparkConf sparkConf = new SparkConf()
.setAppName("RddOperation")
.setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// 生成一个测试rdd数据集
JavaRDD<Integer> rdd1 = jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
// 将rdd1中每一个元素都乘以2,生成新的rdd
// JavaRDD<Integer> rdd2 = rdd1.map(new MyFunction()); 方案一
JavaRDD<Integer> rdd2 = rdd1.map(v1 -> v1 * 2); //方案二
List<Integer> list = rdd2.collect();
list.forEach(integer -> System.out.println(integer));
jsc.stop();
}
二、filter
filter 是对RDD中的每个元素都执行一个指定的函数来过滤产生一个新的RDD。 任何原RDD中的元素在
新RDD中都有且只有一个元素与之对应。
@Test
public void testFilter(){
SparkConf sparkConf = new SparkConf()
.setAppName("RddOperation")
.setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// 生成一个测试rdd数据集
JavaRDD<Integer> rdd1 = jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
// 过滤数据,用大于2的数据组成一个新的rdd
JavaRDD<Integer> rdd2 = rdd1.filter(v1 -> v1 > 2);
List<Integer> list = rdd2.collect();
list.forEach(integer -> System.out.println(integer));
jsc.stop();
}
三、flatMap
与map类似,区别是原RDD中的元素经map处理后只能生成一个元素,而原RDD中的元素经flatmap处
理后可生成多个元素来构建新RDD。 举例:对原RDD中的每个元素x产生y个元素(从1到y,y为元素x
的值)
@Test
public void testFlatMap() {
SparkConf sparkConf = new SparkConf()
.setAppName("RddOperation")
.setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// 生成RDD
JavaRDD<String> rdd = jsc.parallelize(Arrays.asList("hello world","hello spark"));
// 数据压扁操作
JavaRDD<String> rdd2 = rdd.flatMap(s -> Arrays.asList(s.split(" ")).iterator());
//收集数据
List<String> list = rdd2.collect();
for (String o : list) {
System.out.println(o);
}
}
四、mapPartitions
mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输
入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。
@Test
public void testMapPartitions (){
SparkConf sparkConf = new SparkConf()
.setAppName("RddOperation")
.setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// 生成RDD,分3个区存储
JavaRDD<Integer> rdd = jsc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10),3);
// 分区操作数据
JavaRDD<Integer> rdd2 = rdd.mapPartitions(integerIterator -> {
List<Integer> result = new ArrayList<>();
while (integerIterator.hasNext()){
Integer i = integerIterator.next();
result.add(i);
}
System.out.println("分区数据:" + result);
return result.iterator();
});
//收集数据
List<Integer> list = rdd2.collect();
list.forEach(integer -> System.out.println(integer));
}
五、mapToPair
此函数会对一个RDD中的每个元素调用f函数,其中原来RDD中的每一个元素都是T类型的,调用f函数
后会进行一定的操作把每个元素都转换成一个<K2,V2>类型的对象
@Test
public void testMapToPair (){
SparkConf sparkConf = new SparkConf()
.setAppName("RddOperation")
.setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// 生成RDD
JavaRDD<String> rdd = jsc.parallelize(Arrays.asList("hello world","hello spark"));
// 按照空格分割
JavaRDD<String> rdd1 = rdd.flatMap(s -> Arrays.asList(s.split(" ")).iterator());
// 把每一个单词的出现数量标记为1
JavaPairRDD<Object, Object> rdd2 = rdd1.mapToPair(s -> new Tuple2<>(s, 1));
//收集数据
List<Tuple2<Object, Object>> list = rdd2.collect();
for (Tuple2<Object, Object> o : list) {
System.out.println(o);
}
}
六、reduceByKey
顾名思义,reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行reduce,因此,Key
相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。
@Test
public void testReduceByKey (){
SparkConf sparkConf = new SparkConf()
.setAppName("RddOperation")
.setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// 生成RDD
JavaRDD<String> rdd = jsc.parallelize(Arrays.asList("hello world","hello spark"));
// 按照空格分割
JavaRDD<String> rdd1 = rdd.flatMap(s -> Arrays.asList(s.split(" ")).iterator());
// 把每一个单词的出现数量标记为1
JavaPairRDD<Object, Integer> rdd2 = rdd1.mapToPair(s -> new Tuple2<>(s, 1));
// 按照key相同的数进行相加操作
JavaPairRDD<Object, Integer> rdd3 = rdd2.reduceByKey((v1, v2) -> v1 + v2);
//收集数据
List<Tuple2<Object, Integer>> list = rdd3.collect();
for (Tuple2<Object, Integer> o : list) {
System.out.println(o);
}
}
七、coalesce
该函数用于将RDD进行重分区,使用HashPartitioner。第一个参数为重分区的数目,第二个为是否进
行shuffle,默认为false;
英
[ˌkəʊəˈles]
美
[ˌkoʊəˈles]
@Test
public void testCoalesce (){
SparkConf sparkConf = new SparkConf()
.setAppName("RddOperation")
.setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// 生成RDD
JavaRDD<Integer> rdd = jsc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));
System.out.println(rdd.getNumPartitions()); //4
// 重新分区,shuffle:是否重新分配存储,如果分区数大于原有分区数,就需要设置为true
JavaRDD<Integer> rdd2 = rdd.coalesce(6, true);
System.out.println(rdd2.getNumPartitions()); //2
//收集数据
List<Integer> list = rdd2.collect();
for (Integer o : list) {
System.out.println(o);
}
}