spark@SparkSingleNode:/usr/local/hadoop/hadoop-2.6.0$ sbin/start-dfs.sh












spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6$ sbin/start-all.sh








spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin$ ./spark-shell --master spark://SparkSingleNode:7077 --executor-memory 1g



scala> sc.setLogLevel("WARN")    //过滤日志提醒scala> sc.textFile("/README.md").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_,1).map(pair => (pair._2,pair._1)).sortByKey(false).map(pair => (pair._2,pair._1)).collect





scala> sc.textFile("/README.md").flatMap(_.split(" ")).map(word =>(word,1)).reduceByKey(_+_,1).map(pair => (pair._2,pair._1)).sortByKey(false).map(pair => (pair._2,pair._1)).collectres2: Array[(String, Int)] = Array(("",67), (the,21), (Spark,14), (to,14), (for,12), (a,10), (and,10), (##,8), (run,7), (is,6), (on,6), (can,6), (of,5), (also,5), (in,5), (if,4), (or,4), (Hadoop,4), (with,4), (you,4), (build,3), (including,3), (Please,3), (use,3), (particular,3), (documentation,3), (example,3), (an,3), (You,3), (building,3), (that,3), (guidance,3), (For,2), (This,2), (Hive,2), (To,2), (SparkPi,2), (refer,2), (Interactive,2), (be,2), (./bin/run-example,2), (1000:,2), (tests,2), (examples,2), (at,2), (using,2), (Shell,2), (class,2), (`examples`,2), (set,2), (Hadoop,,2), (cluster,2), (supports,2), (Python,2), (general,2), (locally,2), (following,2), (which,2), (should,2), ([project,2), (do,2), (how,2), (It,2), (Scala,2), (detailed,2), (return,2), (one,2), (Python,,2), (SQL...scala>






/** * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling * `collect` or `save` on the resulting RDD will return or output an ordered list of records * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in * order of the keys). */// TODO: this currently doesn't work on P other than Tuple2!def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)    : RDD[(K, V)] = self.withScope{  val part = new RangePartitioner(numPartitions, self, ascending)  new ShuffledRDD[K, V, V](self, part)    .setKeyOrdering(if (ascending) ordering else ordering.reverse)}




















2 3 4 1 3 2 4 3 8 7 2 1



2 1 2 3 3 2 4 1 4 3 8 7




   这里,就用,Scala IDE for Eclipse,来写,





package com.zhouls.spark.SparkApps.cores;import java.io.Serializable;import scala.math.Ordered;public class SecondarySortKey implements Ordered
,Serializable{  private int first;  private int second;  @Override  public boolean $greater(SecondarySortKey arg0) {  // TODO Auto-generated method stub    return false;  }  @Override  public boolean $greater$eq(SecondarySortKey arg0) {  // TODO Auto-generated method stub  return false;  }  @Override  public boolean $less(SecondarySortKey arg0) {  // TODO Auto-generated method stub  return false;  }  @Override  public boolean $less$eq(SecondarySortKey arg0) {  // TODO Auto-generated method stub  return false;  }  @Override  public int compare(SecondarySortKey arg0) {  // TODO Auto-generated method stub  return 0;  }  @Override  public int compareTo(SecondarySortKey arg0) {  // TODO Auto-generated method stub  return 0;  }}











package com.zhouls.spark.SparkApps.cores;import java.io.Serializable;import scala.math.Ordered;public class SecondarySortKey implements Ordered
,Serializable{  private int first;  private int second;  //二次排序的公开构造器  public SecondarySortKey(int first,int second){  this.first=first;  this.second=second;  }  public boolean $greater(SecondarySortKey other) {    if(this.first>other.getFirst()){      return true;    }else if(this.first==other.getFirst()&&this.second>other.getSecond()){      return true;  }      return false;  }  public boolean $greater$eq(SecondarySortKey other) {    if(this.$greater(other)){      return true;    }else if(this.first==other.getFirst()&&this.second==other.getSecond()){      return true;    }      return false;  }  public boolean $less(SecondarySortKey other) {    if(this.first












package com.zhouls.spark.SparkApps.cores;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;/** 二次排序,具体的实现步骤:* 第一步:安装Ordered和Serrializable接口实现自定义排序的key* 第二步:将要进行二次排序的文件加载进来
类型的RDD* 第三步:使用sortByKey基于自定义的Key进行二次排序* 第四步:去除掉排序的Key,只保留排序的结果*/public class SecondarySortApp {public static void main(String[] args) {SparkConf conf=new SparkConf().setAppName("SecondarySortApp").setMaster("local");JavaSparkContext sc=new JavaSparkContext(conf);//其底层实际上就是Scala的SparkContextJavaRDD
lines = sc.textFile("D://SoftWare//spark-1.5.2-bin-hadoop2.6//helloSpark.txt"); JavaPairRDD
pairs = lines.mapToPair(new PairFunction
() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2
call(String line) throws Exception {String[] splited = line.split(" ");SecondarySortKey key =new SecondarySortKey(Integer.valueOf(splited[0]),Integer.valueOf(splited[1]));return new Tuple2
sorted = pairs.sortByKey();//过滤掉排序后自定的Key,保留排序的结果JavaRDD
SecondaySorted=sorted.map(new Function
, String>() {private static final long serialVersionUID = 1L;@Overridepublic String call(Tuple2
sortedContent) throws Exception {System.out.println("sortedContent._1 "+(sortedContent._1).toString());System.out.println("sortedContent._2 "+sortedContent._2);return sortedContent._2;}});SecondaySorted.foreach(new VoidFunction
() {@Overridepublic void call(String sorted) throws Exception {System.out.println(sorted);}});}}










package com.zhouls.spark.cores/**  * Created by Administrator on 2016/9/30.  */class SecondarySortKey(val first:Int,val second:Int) extends Ordered[SecondarySortKey] with Serializable {    def compare(that: SecondarySortKey): Int = {        if(this.first-that.first!=0){          return this.first-that.first        }else{          return this.second-that.second        }    }}




package com.zhouls.spark.coresimport org.apache.spark.{SparkConf, SparkContext}/**  * Created by Administrator on 2016/9/30.  * 二次排序:具体的实现步骤:  * 第一步:按照Ordered和Serrializable接口实现自定义排序的Key  * 第二步:将要进行二次排序的文件加载进来< key,value> 类型的RDD  * 第三步:使用sortByKey基于自定义的Key进行二次排序  * 第四步:去除掉排序的Key,只保留排序的结果  */object SecondarySortApp {  def main (args: Array[String]) {    val conf=new SparkConf().setAppName("SecondarySortApp").setMaster("local")//创建SparkConf,初始化程序的配置    val sc = new SparkContext(conf)//创建SparkContext,这是第一个RDD创建的唯一入口,也是Driver的灵魂,是通往集群的唯一通道    val lines = sc.textFile("D:\\SoftWare\\spark-1.5.2-bin-hadoop2.6\\helloSpark.txt")//读取文件//    val results = lines.map(line=>(new SecondarySortKey(line.split(" ")(0).toInt , line.split(" ")(1).toInt), line)).sortByKey().map(pair => pair._2)    val pairWithSortKey =  lines.map (line=> (      new SecondarySortKey(line.split(" ")(0).toInt ,line.split(" ")(1).toInt), line      ))    val sorted = pairWithSortKey.sortByKey()    val sortedResult = sorted.map(sortedLine => sortedLine._2)    sortedResult.collect.foreach(println)  }}












class SecondarySortKey(val first:Int,val second:Int) extends Ordered[SecondarySortKey] with Serializable{  override def compare(that: SecondarySortKey): Int = {    if(this.first-that.first!=0){      return this.first-that.first    }else{      return this.second-that.second    }  }} object SecondarySortKey extends scala.AnyRef with Serializable{  def apply(first:Int,second:Int): SecondarySortKey ={    new SecondarySortKey(first,second)  }}




object SecondarySortApp {  def main (args: Array[String]) {    val conf=new SparkConf().setAppName("SecondarySortApp").setMaster("local")    val sc=new SparkContext(conf)    val lines=sc.textFile("D:\\SoftWare\\spark-1.5.2-bin-hadoop2.6\\helloSpark.txt")    //val results=lines.map(line=>(new SecondarySortKey(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line)).sortByKey().map(pair=>pair._2)    val results=lines.map(line=>(SecondarySortKey.apply(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line)).sortByKey().map(pair=>pair._2)    results.collect.foreach(println)  }}






