RDD是什么?
RDD (resilientdistributed dataset),指的是一个只读的,可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用。
RDD内部可以有许多分区(partitions),每个分区又拥有大量的记录(records)。
五个特征:
dependencies:建立RDD的依赖关系,主要rdd之间是宽窄依赖的关系,具有窄依赖关系的rdd可以在同一个stage中进行计算。
partition:一个rdd会有若干个分区,分区的大小决定了对这个rdd计算的粒度,每个rdd的分区的计算都在一个单独的任务中进行。
preferedlocations:按照“移动数据不如移动计算”原则,在spark进行任务调度的时候,优先将任务分配到数据块存储的位置
compute:spark中的计算都是以分区为基本单位的,compute函数只是对迭代器进行复合,并不保存单次计算的结果。
partitioner:只存在于(K,V)类型的rdd中,非(K,V)类型的partitioner的值就是None。
rdd的算子action会触发真正的作业提交,而transformation算子是不会立即触发作业提交的。
在Spark中,所有RDD的转换都是是惰性求值的。RDD的转换操作transformation会生成新的RDD,新的RDD的数据依赖于原来的RDD的数据,每个RDD又包含多个分区。那么一段程序实际上就构造了一个由相互依赖的多个RDD组成的有向无环图(DAG)。并通过在RDD上执行action动作将这个有向无环图作为一个Job提交给Spark执行
在DAG中又进行stage的划分,划分的依据是依赖算子是否是shuffle(如reduceByKey,Join等)的,每个stage又可以划分成若干task。接下来的事情就是driver发送task到executor,executor自己的线程池去执行这些task,完成之后将结果返回给driver。action算子是划分不同job的依据。
Spark对于有向无环图Job进行调度,确定阶段(Stage),分区(Partition),流水线(Pipeline),任务(Task)和缓存(Cache),进行优化,并在Spark集群上运行Job。RDD之间的依赖分为宽依赖(依赖多个分区)和窄依赖(只依赖一个分区),在确定阶段时,需要根据宽依赖shuffle划分阶段。根据分区划分任务。
Spark支持故障恢复的方式也不同,提供两种方式,Linage,通过数据的血缘关系,再执行一遍前面的处理,Checkpoint,将数据集存储到持久存储中。 Spark为迭代式数据处理提供更好的支持。每次迭代的数据可以保存在内存中,而不是写入文件
这里注意两个算子coalesce()和repartition()
coalesce
def coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T]
该函数用于将RDD进行重分区,使用HashPartitioner。 第一个参数为重分区的数目,第二个为是否进行shuffle,默认为false。repartition
def repartition(numPartitions: Int): RDD[T]
该函数其实就是coalesce函数第二个参数为true的实现。使用注意
他们两个都是RDD的分区进行重新划分,repartition只是coalesce接口中shuffle为true的简易实现,(假设RDD有N个分区,需要重新划分成M个分区)
1)N < M。一般情况下N个分区有数据分布不均匀的状况,利用HashPartitioner函数将数据重新分区为M个,这时需要将shuffle设置为true。 2)如果N > M并且N和M相差不多,(假如N是1000,M是100)那么就可以将N个分区中的若干个分区合并成一个新的分区,最终合并为M个分区,这时可以将shuff设置为false,在shuffl为false的情况下,如果M>N时,coalesce为无效的,不进行shuffle过程,父RDD和子RDD之间是窄依赖关系。 3)如果N > M并且两者相差悬殊,这时如果将shuffle设置为false,父子RDD是窄依赖关系,他们同处在一个stage中,就可能造成Spark程序的并行度不够,从而影响性能,如果在M为1的时候,为了使coalesce之前的操作有更好的并行度,可以讲shuffle设置为true。总之:如果shuff为false时,如果传入的参数大于现有的分区数目,RDD的分区数不变,也就是说不经过shuffle,是无法将RDDde分区数变多的
参考:
更多RDD算子内容推荐参考
窄依赖和宽依赖
shuffle 是划分 DAG 中 stage 的标识,同时影响 Spark 执行速度的关键步骤. RDD 的 Transformation 函数中,又分为窄依赖(narrow dependency)和宽依赖(wide dependency)的操作.窄依赖跟宽依赖的区别是是否发生 shuffle(洗牌) 操作.宽依赖会发生 shuffle 操作. 窄依赖是子 RDD的各个分片(partition)不依赖于其他分片,能够独立计算得到结果,宽依赖指子 RDD 的各个分片会依赖于父RDD 的多个分片,所以会造成父 RDD 的各个分片在集群中重新分片。
如下图所示:map就是一种窄依赖,而join则会导致宽依赖
如上面的map,filter,union属于第一类窄依赖,而join with inputs co-partitioned(对输入进行协同划分的join操作,也就是说先按照key分组然后shuffle write的时候一个父分区对应一个子分区)则为第二类窄依赖
groupByKey和对输入未协同划分的join操作就是宽依赖,这是shuffle类操作。
细说:
首先,窄依赖允许在单个集群节点上流水线式执行,这个节点可以计算所有父级分区。例如,可以逐个元素地依次执行filter操作和map操作。相反,宽依赖需要所有的父RDD数据可用并且数据已经通过类MapReduce的操作shuffle完成。
其次,在窄依赖中,节点失败后的恢复更加高效。因为只有丢失的父级分区需要重新计算,并且这些丢失的父级分区可以并行地在不同节点上重新计算。与此相反,在宽依赖的继承关系中,单个失败的节点可能导致一个RDD的所有先祖RDD中的一些分区丢失,导致计算的重新执行。
// Map: "cat" -> c, catval rdd1 = rdd.Map(x => (x.charAt(0), x))// groupby same key and countval rdd2 = rdd1.groupBy(x => x._1). Map(x => (x._1, x._2.toList.length))
第一个 Map 操作将 RDD 里的各个元素进行映射, RDD 的各个数据元素之间不存在依赖,可以在集群的各个内存中独立计算,也就是并行化,第二个 groupby 之后的 Map 操作,为了计算相同 key 下的元素个数,需要把相同 key 的元素聚集到同一个 partition 下,所以造成了数据在内存中的重新分布,即 shuffle 操作.shuffle 操作是 spark 中最耗时的操作,应尽量避免不必要的 shuffle.
根据是否发生 shuffle 操作能够将其分成如下的 stage 类型
(join 需要针对同一个 key 合并,所以需要 shuffle)
PS:shuffle 操作的时候可以用 combiner 压缩数据,减少 IO 的消耗
参考:
一:DataFrame创建
SparkSQL可以以其他RDD对象、parquet文件、json文件、hive表,以及通过JDBC连接到其他关系型数据库作为数据源来生成DataFrame对象。
1)jdbc
【读】
postgresUrl="jdbc:postgresql://127.0.0.1:5432/testdb"dimDF = sqlContext.read.format('jdbc').options(url=postgresUrl,dbtable=tableName,user="root",password="root") .load()dimDF.registerTempTable(tmpTableName)
【写】
self.postgresURL = str(self.postgresIP) + ":" + str(self.postgresPort) + "/" + str(self.postgresDB) self.postgresqlDatasource = { "url" : "jdbc:postgresql://" + self.postgresURL, "user" : self.postgresUser, "password" : self.postgresPwd } resultDF.coalesce(int(partitionNum)).write.jdbc(url=postgresqlDatasource["url"], table=reportTable, mode='append', properties=postgresqlDatasource)
2)parquet
【读】
telematicFilePath = "/user/spark/test/telematic.parquet/key=" + handleRecordDateStrif( common.fileExist(telematicFilePath, self.sc) ): df = self.sqlContext.read.schema(TELEMATIC_PARQUET_SCHEMA).parquet(telematicFilePath).coalesce(int(self.partitionNum))
# schema for /user/spark/test/telematic.parquetTELEMATIC_PARQUET_SCHEMA = SQLType.StructType([ SQLType.StructField('dm_transct_date_hr_key', SQLType.LongType(), True), SQLType.StructField('dm_vehicle_dim_key', SQLType.IntegerType(), True), SQLType.StructField('dm_driver_dim_key', SQLType.IntegerType(), True), SQLType.StructField('dm_company_dim_key', SQLType.IntegerType(), True), SQLType.StructField('deviceId', SQLType.StringType(), True), SQLType.StructField('companyId', SQLType.StringType(), True)])
【写】
df.write.parquet(parquetPath, mode="overwrite")
3)json
df = sqlContext.read.json(path)
4)list列表
dataList = resultDF.collect()
resultDF = self.sqlContext.createDataFrame(dataList)
5)Rdd
if rddSchema is None: df = sqlContext.createDataFrame(rdd)else: df = sqlContext.createDataFrame(rdd, rddSchema)
rdd = sc.parallelize(resultList)df = self.sqlContext.createDataFrame(rdd)
二:Transform操作
三:Action操作
1、 collect() ,返回一个数组,包括dataframe集合所有的行
df = sqlContext.createDataFrame(parquetRecordList, PARQUET_FILE_SCHEMA)for key in df.rdd.map(lambda x: x["key"]).distinct().collect(): filePath = "/user/spark/test.parquet/key=20171110" df.filter("key="+str(key)).drop("key").write.parquet(filePath, mode="append")
2、 collectAsList() 返回值是一个java类型的数组,返回dataframe集合所有的行3、 count() 返回一个number类型的,返回dataframe集合的行数4、 toJson5、 first() 返回第一行 ,类型是row类型6、 head() 返回第一行 ,类型是row类型7、 head(n:Int)返回n行 ,类型是row 类型8、 show()返回dataframe集合的值 默认是20行,返回类型是unit9、 show(n:Int)返回n行,,返回值类型是unit10、table(n:Int) 返回n行 ,类型是row 类型
dataframe的基本操作1、 cache()同步数据的内存
data = self.sqlContext.sql(queryStr).toJSON().cache().collect()
2、 columns 返回一个string类型的数组,返回值是所有列的名字3、 dtypes返回一个string类型的二维数组,返回值是所有列的名字以及类型4、 explan()打印执行计划 物理的5、 toJSON 转换为json格式数据6、 isLocal 返回值是Boolean类型,如果允许模式是local返回true 否则返回false7、 persist(newlevel:StorageLevel) 返回一个dataframe.this.type 输入存储模型类型
稍后详解8、 printSchema() 打印出字段名称和类型 按照树状结构来打印9、 registerTempTable(tablename:String) 返回Unit ,将df的对象只放在一张表里面,这个表随着对象的删除而删除了10、 schema 返回structType 类型,将字段名称和类型按照结构体类型返回11、 toDF()返回一个新的dataframe类型的12、 toDF(colnames:String*)将参数中的几个字段返回一个新的dataframe类型的,13、 unpersist() 返回dataframe.this.type 类型,去除模式中的数据14、 unpersist(blocking:Boolean)返回dataframe.this.type类型 true 和unpersist是一样的作用false 是去除RDD
集成查询:1、 agg(expers:column*) 返回dataframe类型 ,按每个device分组查最小时间
df = sqlContext.createDataFrame(tensRdd)resultDF = df.groupBy("device_id").agg({RegularDataEtlConstants.TIME: 'min'})resultDF.repartition(self._partitionNum).foreachPartition(lambda iterator: self.__saveToHBase(iterator))
startTime = df.filter((df.startTime != "") & (df.startTime >= minStartTimeCurrent)).agg({ "startTime": "min"}).collect()[0][0]
4、 apply(colName: String) 返回column类型,捕获输入进去列的对象5、 as(alias: String) 返回一个新的dataframe类型,就是原来的一个别名6、 col(colName: String) 返回column类型,捕获输入进去列的对象7、 cube(col1: String, cols: String*) 返回一个GroupedData类型,根据某些字段来汇总8、 distinct 去重 返回一个dataframe类型9、 drop(col: Column) 删除某列 返回dataframe类型
columnList = ['key', 'type', 'timestamp', 'data']df = sqlContext.createDataFrame(dataList[index], columnList)for key in df.rdd.map(lambda x: x["key"]).distinct().collect(): parquetPath = parquetList[index] + "/key=" + str(key) df.filter("key="+str(key)).drop("key").write.parquet(parquetPath, mode="append", partitionBy="type")
10、 dropDuplicates(colNames: Array[String]) 删除相同的列 返回一个dataframe11、 except(other: DataFrame) 返回一个dataframe,返回在当前集合存在的在其他集合不存在的
12、 explode[A, B](inputColumn: String, outputColumn: String)行转列
根据c3字段中的空格将字段内容进行分割,分割的内容存储在新的字段c3_中jdbcDF.explode( "c3" , "c3_" ){time: String => time.split( " " )}
13、 filter(conditionExpr: String): 刷选部分数据,返回dataframe类型
df.filter("age>10").show(); df.filter(df("age")>10).show(); df.where(df("age")>10).show();
14、 groupBy(col1: String, cols: String*) 分组
dfgroupBy("age").avg().show();15、 intersect(other: DataFrame) 返回一个dataframe,在2个dataframe都存在的元素16、 join(right: DataFrame, joinExprs: Column, joinType: String)一个是关联的dataframe,第二个关联的条件,第三个关联的类型:inner, outer, left_outer, right_outer, leftsemidf.join(ds,df("name")===ds("name") and df("age")===ds("age"),"outer").show();17、 limit(n: Int) 返回dataframe类型 去n 条数据出来18、 na: DataFrameNaFunctions ,可以调用dataframenafunctions的功能区做过滤 df.na.drop().show(); 删除为空的行19、 orderBy(sortExprs: Column*) 做alise排序20、 select(cols:string*) dataframe 做字段的刷选 df.select($"colA", $"colB" + 1)21、 selectExpr(exprs: String*) 做字段的刷选 df.selectExpr("name","name as names","upper(name)","age+1").show();22、 sort(sortExprs: Column*) 排序 df.sort(df("age").desc).show(); 默认是asc23、 unionAll(other:Dataframe) 合并
df = df.unionAll(dfTemp).coalesce(int(self.partitionNum))
24、 withColumnRenamed(existingName: String, newName: String) 修改列表 df.withColumnRenamed("name","names").show();25、 withColumn(colName: String, col: Column) 增加一列
往df
中新增一个名为aa的列,值与列name的一样
df.withColumn("aa",df("name")).show(); 将该列时间值计算加上时区偏移值mergeDF = mergeDF.withColumn("dm_transct_date_hr_key", functions.lit(self.__datehandle(mergeDF["dm_transct_date_hr_key"], self.timezoneOffset)))
http://blog.csdn.net/mtj66/article/details/52064827