博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark高级排序彻底解秘
阅读量:6129 次
发布时间:2019-06-21

本文共 20128 字,大约阅读时间需要 67 分钟。

  排序,真的非常重要!

  在其,没有罗列排序,不是说它不重要!

 

 

本博文的主要内容有:

  1、基础排序算法实战

  2、二次排序算法实战

  3、更高级别排序算法

  4、排序算法内幕解密

 

 

 

 

 

 

 

1、基础排序算法实战

  启动hdfs集群

spark@SparkSingleNode:/usr/local/hadoop/hadoop-2.6.0$ sbin/start-dfs.sh

 

 

 

 

 

 

 

 

 

  启动spark集群

 

spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6$ sbin/start-all.sh

 

 

 

 

 

 

  启动spark-shell

spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin$ ./spark-shell --master spark://SparkSingleNode:7077 --executor-memory 1g

 

 

scala> sc.setLogLevel("WARN")    //过滤日志提醒scala> sc.textFile("/README.md").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_,1).map(pair => (pair._2,pair._1)).sortByKey(false).map(pair => (pair._2,pair._1)).collect

 

 

 

 

scala> sc.textFile("/README.md").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_,1).map(pair => (pair._2,pair._1)).sortByKey(false).map(pair => (pair._2,pair._1)).collectres2: Array[(String, Int)] = Array(("",67), (the,21), (Spark,14), (to,14), (for,12), (a,10), (and,10), (##,8), (run,7), (is,6), (on,6), (can,6), (of,5), (also,5), (in,5), (if,4), (or,4), (Hadoop,4), (with,4), (you,4), (build,3), (including,3), (Please,3), (use,3), (particular,3), (documentation,3), (example,3), (an,3), (You,3), (building,3), (that,3), (guidance,3), (For,2), (This,2), (Hive,2), (To,2), (SparkPi,2), (refer,2), (Interactive,2), (be,2), (./bin/run-example,2), (1000:,2), (tests,2), (examples,2), (at,2), (using,2), (Shell,2), (class,2), (`examples`,2), (set,2), (Hadoop,,2), (cluster,2), (supports,2), (Python,2), (general,2), (locally,2), (following,2), (which,2), (should,2), ([project,2), (do,2), (how,2), (It,2), (Scala,2), (detailed,2), (return,2), (one,2), (Python,,2), (SQL...scala>

   则,可看出,是sortByKey(false)是按key排序且降序.

 

 

 

sortByKey源码

/** * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling * `collect` or `save` on the resulting RDD will return or output an ordered list of records * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in * order of the keys). */// TODO: this currently doesn't work on P other than Tuple2!def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)    : RDD[(K, V)] = self.withScope{  val part = new RangePartitioner(numPartitions, self, ascending)  new ShuffledRDD[K, V, V](self, part)    .setKeyOrdering(if (ascending) ordering else ordering.reverse)}

 

 

  由此,可看出,一旦排序,则产生ShuffledRDD。

 

  为什么我的是没显示出来?

  rangPartition是怎么排序的呢?

好的,基础排序算法实战至此。

 

 

 

 

 

 

2、二次排序算法实战

  所谓,二次排序,就是指排序的时候考虑两个维度

   如,在第一列,按照降序排,第一列的key相同,那么,再怎么排呢?则,考虑第二列,按照降序排。即,用到了二次排序

 

准备

【数据文件Input】

2 3 4 1 3 2 4 3 8 7 2 1

 

【运行结果Output】

2 1 2 3 3 2 4 1 4 3 8 7

  如果是去大公司的话,则要掌握,5个维度,甚至8个维度,而不是才2个维度而已。加油!zhouls。

 

 

   这里,就用,Scala IDE for Eclipse,来写,

  

 

 

  SecondarySortKey.java

package com.zhouls.spark.SparkApps.cores;import java.io.Serializable;import scala.math.Ordered;public class SecondarySortKey implements Ordered
,Serializable{  private int first;  private int second;  @Override  public boolean $greater(SecondarySortKey arg0) {  // TODO Auto-generated method stub    return false;  }  @Override  public boolean $greater$eq(SecondarySortKey arg0) {  // TODO Auto-generated method stub  return false;  }  @Override  public boolean $less(SecondarySortKey arg0) {  // TODO Auto-generated method stub  return false;  }  @Override  public boolean $less$eq(SecondarySortKey arg0) {  // TODO Auto-generated method stub  return false;  }  @Override  public int compare(SecondarySortKey arg0) {  // TODO Auto-generated method stub  return 0;  }  @Override  public int compareTo(SecondarySortKey arg0) {  // TODO Auto-generated method stub  return 0;  }}

 

 

 

  在这里,学下技巧。

  然后,修改成我们自己想要的。

 

 

 

 

最终的SecondarySortKey.java如下:

package com.zhouls.spark.SparkApps.cores;import java.io.Serializable;import scala.math.Ordered;public class SecondarySortKey implements Ordered
,Serializable{  private int first;  private int second;  //二次排序的公开构造器  public SecondarySortKey(int first,int second){  this.first=first;  this.second=second;  }  public boolean $greater(SecondarySortKey other) {    if(this.first>other.getFirst()){      return true;    }else if(this.first==other.getFirst()&&this.second>other.getSecond()){      return true;  }      return false;  }  public boolean $greater$eq(SecondarySortKey other) {    if(this.$greater(other)){      return true;    }else if(this.first==other.getFirst()&&this.second==other.getSecond()){      return true;    }      return false;  }  public boolean $less(SecondarySortKey other) {    if(this.first

 

 

 

 

 

 

 

 

 

 

 

SecondarySortApp.java的完整代码:
package com.zhouls.spark.SparkApps.cores;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;/** 二次排序,具体的实现步骤:* 第一步:安装Ordered和Serrializable接口实现自定义排序的key* 第二步:将要进行二次排序的文件加载进来
类型的RDD* 第三步:使用sortByKey基于自定义的Key进行二次排序* 第四步:去除掉排序的Key,只保留排序的结果*/public class SecondarySortApp {public static void main(String[] args) {SparkConf conf=new SparkConf().setAppName("SecondarySortApp").setMaster("local");JavaSparkContext sc=new JavaSparkContext(conf);//其底层实际上就是Scala的SparkContextJavaRDD
lines = sc.textFile("D://SoftWare//spark-1.5.2-bin-hadoop2.6//helloSpark.txt"); JavaPairRDD
pairs = lines.mapToPair(new PairFunction
() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2
call(String line) throws Exception {String[] splited = line.split(" ");SecondarySortKey key =new SecondarySortKey(Integer.valueOf(splited[0]),Integer.valueOf(splited[1]));return new Tuple2
(key,line);}});JavaPairRDD
sorted = pairs.sortByKey();//过滤掉排序后自定的Key,保留排序的结果JavaRDD
SecondaySorted=sorted.map(new Function
, String>() {private static final long serialVersionUID = 1L;@Overridepublic String call(Tuple2
sortedContent) throws Exception {System.out.println("sortedContent._1 "+(sortedContent._1).toString());System.out.println("sortedContent._2 "+sortedContent._2);return sortedContent._2;}});SecondaySorted.foreach(new VoidFunction
() {@Overridepublic void call(String sorted) throws Exception {System.out.println(sorted);}});}}

 

 

 

 

 

 

   Scala

 

 

package com.zhouls.spark.cores/**  * Created by Administrator on 2016/9/30.  */class SecondarySortKey(val first:Int,val second:Int) extends Ordered[SecondarySortKey] with Serializable {    def compare(that: SecondarySortKey): Int = {        if(this.first-that.first!=0){          return this.first-that.first        }else{          return this.second-that.second        }    }}

 

 

 

package com.zhouls.spark.coresimport org.apache.spark.{SparkConf, SparkContext}/**  * Created by Administrator on 2016/9/30.  * 二次排序:具体的实现步骤:  * 第一步:按照Ordered和Serrializable接口实现自定义排序的Key  * 第二步:将要进行二次排序的文件加载进来< key,value> 类型的RDD  * 第三步:使用sortByKey基于自定义的Key进行二次排序  * 第四步:去除掉排序的Key,只保留排序的结果  */object SecondarySortApp {  def main (args: Array[String]) {    val conf=new SparkConf().setAppName("SecondarySortApp").setMaster("local")//创建SparkConf,初始化程序的配置    val sc = new SparkContext(conf)//创建SparkContext,这是第一个RDD创建的唯一入口,也是Driver的灵魂,是通往集群的唯一通道    val lines = sc.textFile("D:\\SoftWare\\spark-1.5.2-bin-hadoop2.6\\helloSpark.txt")//读取文件//    val results = lines.map(line=>(new SecondarySortKey(line.split(" ")(0).toInt , line.split(" ")(1).toInt), line)).sortByKey().map(pair => pair._2)    val pairWithSortKey =  lines.map (line=> (      new SecondarySortKey(line.split(" ")(0).toInt ,line.split(" ")(1).toInt), line      ))    val sorted = pairWithSortKey.sortByKey()    val sortedResult = sorted.map(sortedLine => sortedLine._2)    sortedResult.collect.foreach(println)  }}

 

 

 

 

 

 

 

 

作业:

1:Scala实现二次排序

SecondarySortKey.scala的完整代码:

class SecondarySortKey(val first:Int,val second:Int) extends Ordered[SecondarySortKey] with Serializable{  override def compare(that: SecondarySortKey): Int = {    if(this.first-that.first!=0){      return this.first-that.first    }else{      return this.second-that.second    }  }} object SecondarySortKey extends scala.AnyRef with Serializable{  def apply(first:Int,second:Int): SecondarySortKey ={    new SecondarySortKey(first,second)  }}

 

 

SecondarySortApp.scala的完整代码:

object SecondarySortApp {  def main (args: Array[String]) {    val conf=new SparkConf().setAppName("SecondarySortApp").setMaster("local")    val sc=new SparkContext(conf)    val lines=sc.textFile("D:\\SoftWare\\spark-1.5.2-bin-hadoop2.6\\helloSpark.txt")    //val results=lines.map(line=>(new SecondarySortKey(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line)).sortByKey().map(pair=>pair._2)    val results=lines.map(line=>(SecondarySortKey.apply(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line)).sortByKey().map(pair=>pair._2)    results.collect.foreach(println)  }}

 

 

 

 

2:RangePartitioner的源码阅读:

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements.  See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License.  You may obtain a copy of the License at * *    http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.sparkimport java.io.{IOException, ObjectInputStream, ObjectOutputStream}import scala.collection.mutableimport scala.collection.mutable.ArrayBufferimport scala.reflect.{ClassTag, classTag}import scala.util.hashing.byteswap32import org.apache.spark.rdd.{PartitionPruningRDD, RDD}import org.apache.spark.serializer.JavaSerializerimport org.apache.spark.util.{CollectionsUtils, Utils}import org.apache.spark.util.random.{XORShiftRandom, SamplingUtils}/** * An object that defines how the elements in a key-value pair RDD are partitioned by key. * Maps each key to a partition ID, from 0 to `numPartitions - 1`. */abstract class Partitioner extends Serializable {  def numPartitions: Int  def getPartition(key: Any): Int}object Partitioner {  /**   * Choose a partitioner to use for a cogroup-like operation between a number of RDDs.   *   * If any of the RDDs already has a partitioner, choose that one.   *   * Otherwise, we use a default HashPartitioner. For the number of partitions, if   * spark.default.parallelism is set, then we'll use the value from SparkContext   * defaultParallelism, otherwise we'll use the max number of upstream partitions.   *   * Unless spark.default.parallelism is set, the number of partitions will be the   * same as the number of partitions in the largest upstream RDD, as this should   * be least likely to cause out-of-memory errors.   *   * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.   */  def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {    val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse    for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {      return r.partitioner.get    }    if (rdd.context.conf.contains("spark.default.parallelism")) {      new HashPartitioner(rdd.context.defaultParallelism)    } else {      new HashPartitioner(bySize.head.partitions.size)    }  }}/** * A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using * Java's `Object.hashCode`. * * Java arrays have hashCodes that are based on the arrays' identities rather than their contents, * so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will * produce an unexpected or incorrect result. */class HashPartitioner(partitions: Int) extends Partitioner {  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")  def numPartitions: Int = partitions  def getPartition(key: Any): Int = key match {    case null => 0    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)  }  override def equals(other: Any): Boolean = other match {    case h: HashPartitioner =>      h.numPartitions == numPartitions    case _ =>      false  }  override def hashCode: Int = numPartitions}/** * A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly * equal ranges. The ranges are determined by sampling the content of the RDD passed in. * * Note that the actual number of partitions created by the RangePartitioner might not be the same * as the `partitions` parameter, in the case where the number of sampled records is less than * the value of `partitions`. */class RangePartitioner[K : Ordering : ClassTag, V](    @transient partitions: Int,    @transient rdd: RDD[_ <: Product2[K, V]],    private var ascending: Boolean = true)  extends Partitioner {  // We allow partitions = 0, which happens when sorting an empty RDD under the default settings.  require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")  private var ordering = implicitly[Ordering[K]]  // An array of upper bounds for the first (partitions - 1) partitions  private var rangeBounds: Array[K] = {    if (partitions <= 1) {      Array.empty    } else {      // This is the sample size we need to have roughly balanced output partitions, capped at 1M.      val sampleSize = math.min(20.0 * partitions, 1e6)      // Assume the input partitions are roughly balanced and over-sample a little bit.      val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt      val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)      if (numItems == 0L) {        Array.empty      } else {        // If a partition contains much more than the average number of items, we re-sample from it        // to ensure that enough items are collected from that partition.        val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)        val candidates = ArrayBuffer.empty[(K, Float)]        val imbalancedPartitions = mutable.Set.empty[Int]        sketched.foreach { case (idx, n, sample) =>          if (fraction * n > sampleSizePerPartition) {            imbalancedPartitions += idx          } else {            // The weight is 1 over the sampling probability.            val weight = (n.toDouble / sample.size).toFloat            for (key <- sample) {              candidates += ((key, weight))            }          }        }        if (imbalancedPartitions.nonEmpty) {          // Re-sample imbalanced partitions with the desired sampling probability.          val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)          val seed = byteswap32(-rdd.id - 1)          val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()          val weight = (1.0 / fraction).toFloat          candidates ++= reSampled.map(x => (x, weight))        }        RangePartitioner.determineBounds(candidates, partitions)      }    }  }  def numPartitions: Int = rangeBounds.length + 1  private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]  def getPartition(key: Any): Int = {    val k = key.asInstanceOf[K]    var partition = 0    if (rangeBounds.length <= 128) {      // If we have less than 128 partitions naive search      while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {        partition += 1      }    } else {      // Determine which binary search method to use only once.      partition = binarySearch(rangeBounds, k)      // binarySearch either returns the match location or -[insertion point]-1      if (partition < 0) {        partition = -partition-1      }      if (partition > rangeBounds.length) {        partition = rangeBounds.length      }    }    if (ascending) {      partition    } else {      rangeBounds.length - partition    }  }  override def equals(other: Any): Boolean = other match {    case r: RangePartitioner[_, _] =>      r.rangeBounds.sameElements(rangeBounds) && r.ascending == ascending    case _ =>      false  }  override def hashCode(): Int = {    val prime = 31    var result = 1    var i = 0    while (i < rangeBounds.length) {      result = prime * result + rangeBounds(i).hashCode      i += 1    }    result = prime * result + ascending.hashCode    result  }  @throws(classOf[IOException])  private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {    val sfactory = SparkEnv.get.serializer    sfactory match {      case js: JavaSerializer => out.defaultWriteObject()      case _ =>        out.writeBoolean(ascending)        out.writeObject(ordering)        out.writeObject(binarySearch)        val ser = sfactory.newInstance()        Utils.serializeViaNestedStream(out, ser) { stream =>          stream.writeObject(scala.reflect.classTag[Array[K]])          stream.writeObject(rangeBounds)        }    }  }  @throws(classOf[IOException])  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {    val sfactory = SparkEnv.get.serializer    sfactory match {      case js: JavaSerializer => in.defaultReadObject()      case _ =>        ascending = in.readBoolean()        ordering = in.readObject().asInstanceOf[Ordering[K]]        binarySearch = in.readObject().asInstanceOf[(Array[K], K) => Int]        val ser = sfactory.newInstance()        Utils.deserializeViaNestedStream(in, ser) { ds =>          implicit val classTag = ds.readObject[ClassTag[Array[K]]]()          rangeBounds = ds.readObject[Array[K]]()        }    }  }}private[spark] object RangePartitioner {  /**   * Sketches the input RDD via reservoir sampling on each partition.   *   * @param rdd the input RDD to sketch   * @param sampleSizePerPartition max sample size per partition   * @return (total number of items, an array of (partitionId, number of items, sample))   */  def sketch[K : ClassTag](      rdd: RDD[K],      sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {    val shift = rdd.id    // val classTagK = classTag[K] // to avoid serializing the entire partitioner object    val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>      val seed = byteswap32(idx ^ (shift << 16))      val (sample, n) = SamplingUtils.reservoirSampleAndCount(        iter, sampleSizePerPartition, seed)      Iterator((idx, n, sample))    }.collect()    val numItems = sketched.map(_._2.toLong).sum    (numItems, sketched)  }  /**   * Determines the bounds for range partitioning from candidates with weights indicating how many   * items each represents. Usually this is 1 over the probability used to sample this candidate.   *   * @param candidates unordered candidates with weights   * @param partitions number of partitions   * @return selected bounds   */  def determineBounds[K : Ordering : ClassTag](      candidates: ArrayBuffer[(K, Float)],      partitions: Int): Array[K] = {    val ordering = implicitly[Ordering[K]]    val ordered = candidates.sortBy(_._1)    val numCandidates = ordered.size    val sumWeights = ordered.map(_._2.toDouble).sum    val step = sumWeights / partitions    var cumWeight = 0.0    var target = step    val bounds = ArrayBuffer.empty[K]    var i = 0    var j = 0    var previousBound = Option.empty[K]    while ((i < numCandidates) && (j < partitions - 1)) {      val (key, weight) = ordered(i)      cumWeight += weight      if (cumWeight > target) {        // Skip duplicate values.        if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {          bounds += key          target += step          j += 1          previousBound = Some(key)        }      }      i += 1    }    bounds.toArray  }}

 

 

本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/5921572.html,如需转载请自行联系原作者

你可能感兴趣的文章
Rushcrm:如何利用CRM系统的权限设置
查看>>
《Cisco IPv6网络实现技术(修订版)》一2.7 复习题
查看>>
Facebook 开源 Android 调试工具 —— Stetho
查看>>
生活不止有苟且,还有N个免费DevOps开源工具
查看>>
视频直播Android推流SDK初体验
查看>>
第十三天:制定预算
查看>>
java技术团队必须要注意的那几个点
查看>>
Hibernate ORM 5.1.7 发布,数据持久层框架
查看>>
数百万网站因流行 PHP 脚本的安全漏洞而受影响
查看>>
《走进SAP(第2版)》——2.7 SAP对业务流程的支持
查看>>
《C语言解惑》—— 2.9 输出值的操作符
查看>>
Project Volta 让 Android 续航提升了多少?
查看>>
《树莓派实战秘籍》——1.7 技巧07使用过压获得更高的性能
查看>>
《SAS 统计分析与应用从入门到精通(第二版)》一1.4 SAS系统的文件管理
查看>>
《众妙之门——网页设计专业之道》——2.4 总结
查看>>
MySQL sql_mode 说明(及处理一起 sql_mode 引发的问题)
查看>>
Java 注解详解 (annotation)
查看>>
鹰眼跟踪、限流降级,EDAS的微服务解决之道
查看>>
秘籍:程序猿该如何实力撩妹
查看>>
网络编程socket基本API详解
查看>>