第64课:SparkSQL下Parquet的数据切分和压缩内幕详解学习笔记

2024-01-09 19:08

本文主要是介绍第64课:SparkSQL下Parquet的数据切分和压缩内幕详解学习笔记,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

第64课:SparkSQLParquet的数据切分和压缩内幕详解学习笔记

本期内容:

1  SparkSQLParquet数据切分

2  SparkSQL下的Parquet数据压缩

 

Spark官网上的SparkSQL操作Parquet的实例进行讲解:

Schema Merging

Like ProtocolBuffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with a simple schema, and gradually add more columns to the schema as needed. In this way, users may end up with multiple Parquet files with different but mutually compatible schemas. The Parquet data source is now able to automatically detect this case and merge schemas of all these files.

 

Since schema merging is a relatively expensive operation, and is not a necessity in most cases, we turned it off by default starting from 1.5.0. You may enable it by

 

setting data source option mergeSchema to true when reading Parquet files (as shown in the examples below), or

setting the global SQL option spark.sql.parquet.mergeSchema to true.

// sqlContext from the previous example is used in this example.// This is used to implicitly convert an RDD to a DataFrame.

import sqlContext.implicits._

// Create a simple DataFrame, stored into a partition directory

val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")

df1.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,// adding a new column and dropping an existing column

val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")

df2.write.parquet("data/test_table/key=2")

// Read the partitioned table

val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")

df3.printSchema()

// The final schema consists of all 3 columns in the Parquet files together// with the partitioning column appeared in the partition directory paths.// root// |-- single: int (nullable = true)// |-- double: int (nullable = true)// |-- triple: int (nullable = true)// |-- key : int (nullable = true)

 

 

实际运行结果:

scala> val df1 = sc.makeRDD(1 to 5).map(i => (i,i * 2)).toDF("single","double")

df1: org.apache.spark.sql.DataFrame = [single: int, double: int]

 

scala> df1.write.parquet("data/text_table/key=1")

16/04/02 04:27:07 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id

16/04/02 04:27:07 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id

16/04/02 04:27:07 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id

16/04/02 04:27:07 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap

16/04/02 04:27:07 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition

16/04/02 04:27:07 INFO parquet.ParquetRelation: Using default output committer for Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter

16/04/02 04:27:07 INFO datasources.DefaultWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter

16/04/02 04:27:09 INFO spark.SparkContext: Starting job: parquet at <console>:33

16/04/02 04:27:09 INFO scheduler.DAGScheduler: Got job 0 (parquet at <console>:33) with 3 output partitions

16/04/02 04:27:09 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (parquet at <console>:33)

16/04/02 04:27:09 INFO scheduler.DAGScheduler: Parents of final stage: List()

16/04/02 04:27:09 INFO scheduler.DAGScheduler: Missing parents: List()

16/04/02 04:27:09 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[3] at parquet at <console>:33), which has no missing parents

16/04/02 04:27:12 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 68.0 KB, free 68.0 KB)

16/04/02 04:27:12 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 24.6 KB, free 92.5 KB)

16/04/02 04:27:12 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.121:56069 (size: 24.6 KB, free: 517.4 MB)

16/04/02 04:27:12 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006

16/04/02 04:27:12 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from ResultStage 0 (MapPartitionsRDD[3] at parquet at <console>:33)

16/04/02 04:27:12 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 3 tasks

16/04/02 04:27:13 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, slq1, partition 0,PROCESS_LOCAL, 2078 bytes)

16/04/02 04:27:13 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, slq2, partition 1,PROCESS_LOCAL, 2078 bytes)

16/04/02 04:27:13 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, slq3, partition 2,PROCESS_LOCAL, 2135 bytes)

16/04/02 04:27:17 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on slq2:44836 (size: 24.6 KB, free: 517.4 MB)

16/04/02 04:27:17 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on slq3:53765 (size: 24.6 KB, free: 517.4 MB)

16/04/02 04:27:18 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on slq1:44043 (size: 24.6 KB, free: 517.4 MB)

16/04/02 04:28:13 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 60174 ms on slq3 (1/3)

16/04/02 04:28:16 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 62700 ms on slq2 (2/3)

16/04/02 04:28:27 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 74088 ms on slq1 (3/3)

16/04/02 04:28:27 INFO scheduler.DAGScheduler: ResultStage 0 (parquet at <console>:33) finished in 74.105 s

16/04/02 04:28:27 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool

16/04/02 04:28:27 INFO scheduler.DAGScheduler: Job 0 finished: parquet at <console>:33, took 78.540234 s

16/04/02 04:28:29 INFO hadoop.ParquetFileReader: Initiating action with parallelism: 5

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) logger implementation

SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

16/04/02 04:28:35 INFO datasources.DefaultWriterContainer: Job job_201604020427_0000 committed.

16/04/02 04:28:36 INFO parquet.ParquetRelation: Listing hdfs://slq1:9000/user/richard/data/text_table/key=1 on driver

16/04/02 04:28:36 INFO parquet.ParquetRelation: Listing hdfs://slq1:9000/user/richard/data/text_table/key=1 on driver

 

scala> 16/04/02 04:39:10 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on slq2:44836 in memory (size: 24.6 KB, free: 517.4 MB)

16/04/02 04:39:10 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on 192.168.1.121:56069 in memory (size: 24.6 KB, free: 517.4 MB)

16/04/02 04:39:11 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on slq3:53765 in memory (size: 24.6 KB, free: 517.4 MB)

16/04/02 04:39:11 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on slq1:44043 in memory (size: 24.6 KB, free: 517.4 MB)

16/04/02 04:39:11 INFO spark.ContextCleaner: Cleaned accumulator 3

16/04/02 04:39:11 INFO spark.ContextCleaner: Cleaned accumulator 2

 

 

scala> val df2 = sc.makeRDD(6 to 10).map(i => (i,i * 3)).toDF("single","triple")

df2: org.apache.spark.sql.DataFrame = [single: int, triple: int]

 

scala> df2.write.parquet("data/text_table/key=2")

16/04/02 04:56:13 INFO parquet.ParquetRelation: Using default output committer for Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter

16/04/02 04:56:13 INFO datasources.DefaultWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter

16/04/02 04:56:14 INFO spark.SparkContext: Starting job: parquet at <console>:33

16/04/02 04:56:14 INFO scheduler.DAGScheduler: Got job 1 (parquet at <console>:33) with 3 output partitions

16/04/02 04:56:14 INFO scheduler.DAGScheduler: Final stage: ResultStage 1 (parquet at <console>:33)

16/04/02 04:56:14 INFO scheduler.DAGScheduler: Parents of final stage: List()

16/04/02 04:56:14 INFO scheduler.DAGScheduler: Missing parents: List()

16/04/02 04:56:14 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[14] at parquet at <console>:33), which has no missing parents

16/04/02 04:56:14 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 68.0 KB, free 68.0 KB)

16/04/02 04:56:14 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 24.6 KB, free 92.5 KB)

16/04/02 04:56:14 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.121:56069 (size: 24.6 KB, free: 517.4 MB)

16/04/02 04:56:14 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006

16/04/02 04:56:14 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from ResultStage 1 (MapPartitionsRDD[14] at parquet at <console>:33)

16/04/02 04:56:14 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 3 tasks

16/04/02 04:56:14 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 3, slq1, partition 0,PROCESS_LOCAL, 2078 bytes)

16/04/02 04:56:14 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 4, slq2, partition 1,PROCESS_LOCAL, 2078 bytes)

16/04/02 04:56:14 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0 (TID 5, slq3, partition 2,PROCESS_LOCAL, 2135 bytes)

16/04/02 04:56:15 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on slq3:53765 (size: 24.6 KB, free: 517.4 MB)

16/04/02 04:56:15 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on slq2:44836 (size: 24.6 KB, free: 517.4 MB)

16/04/02 04:56:15 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on slq1:44043 (size: 24.6 KB, free: 517.4 MB)

16/04/02 04:56:16 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 4) in 1472 ms on slq2 (1/3)

16/04/02 04:56:16 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 1.0 (TID 5) in 1486 ms on slq3 (2/3)

16/04/02 04:56:16 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 3) in 2093 ms on slq1 (3/3)

16/04/02 04:56:16 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool

16/04/02 04:56:16 INFO scheduler.DAGScheduler: ResultStage 1 (parquet at <console>:33) finished in 2.095 s

16/04/02 04:56:16 INFO scheduler.DAGScheduler: Job 1 finished: parquet at <console>:33, took 2.673089 s

16/04/02 04:56:17 INFO hadoop.ParquetFileReader: Initiating action with parallelism: 5

16/04/02 04:56:18 INFO datasources.DefaultWriterContainer: Job job_201604020456_0000 committed.

16/04/02 04:56:18 INFO parquet.ParquetRelation: Listing hdfs://slq1:9000/user/richard/data/text_table/key=2 on driver

16/04/02 04:56:18 INFO parquet.ParquetRelation: Listing hdfs://slq1:9000/user/richard/data/text_table/key=2 on driver

 

scala> val df3 = sqlContext.read.option("mergeSchema","true").parquet("data/text_table")

16/04/02 05:00:59 INFO parquet.ParquetRelation: Listing hdfs://slq1:9000/user/richard/data/text_table on driver

16/04/02 05:00:59 INFO parquet.ParquetRelation: Listing hdfs://slq1:9000/user/richard/data/text_table/key=1 on driver

16/04/02 05:00:59 INFO parquet.ParquetRelation: Listing hdfs://slq1:9000/user/richard/data/text_table/key=2 on driver

16/04/02 05:01:00 INFO spark.SparkContext: Starting job: parquet at <console>:28

16/04/02 05:01:00 INFO scheduler.DAGScheduler: Got job 2 (parquet at <console>:28) with 3 output partitions

16/04/02 05:01:00 INFO scheduler.DAGScheduler: Final stage: ResultStage 2 (parquet at <console>:28)

16/04/02 05:01:00 INFO scheduler.DAGScheduler: Parents of final stage: List()

16/04/02 05:01:00 INFO scheduler.DAGScheduler: Missing parents: List()

16/04/02 05:01:00 INFO scheduler.DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[17] at parquet at <console>:28), which has no missing parents

16/04/02 05:01:00 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 61.7 KB, free 154.2 KB)

16/04/02 05:01:00 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 21.0 KB, free 175.2 KB)

16/04/02 05:01:00 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.1.121:56069 (size: 21.0 KB, free: 517.4 MB)

16/04/02 05:01:00 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006

16/04/02 05:01:00 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from ResultStage 2 (MapPartitionsRDD[17] at parquet at <console>:28)

16/04/02 05:01:00 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 3 tasks

16/04/02 05:01:00 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 6, slq3, partition 0,PROCESS_LOCAL, 2524 bytes)

16/04/02 05:01:00 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 2.0 (TID 7, slq1, partition 1,PROCESS_LOCAL, 2524 bytes)

16/04/02 05:01:00 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 2.0 (TID 8, slq2, partition 2,PROCESS_LOCAL, 2469 bytes)

16/04/02 05:01:00 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on slq2:44836 (size: 21.0 KB, free: 517.4 MB)

16/04/02 05:01:00 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on slq3:53765 (size: 21.0 KB, free: 517.4 MB)

16/04/02 05:01:01 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on slq1:44043 (size: 21.0 KB, free: 517.4 MB)

16/04/02 05:01:02 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 2.0 (TID 6) in 1697 ms on slq3 (1/3)

16/04/02 05:01:02 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 2.0 (TID 8) in 2189 ms on slq2 (2/3)

16/04/02 05:01:05 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 2.0 (TID 7) in 4740 ms on slq1 (3/3)

16/04/02 05:01:05 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool

16/04/02 05:01:05 INFO scheduler.DAGScheduler: ResultStage 2 (parquet at <console>:28) finished in 4.804 s

16/04/02 05:01:05 INFO scheduler.DAGScheduler: Job 2 finished: parquet at <console>:28, took 5.169726 s

df3: org.apache.spark.sql.DataFrame = [single: int, double: int, triple: int, key: int]

 

scala> df3.printSchema()

root

 |-- single: integer (nullable = true)

 |-- double: integer (nullable = true)

 |-- triple: integer (nullable = true)

 |-- key: integer (nullable = true)

 

 

scala> df3.show()

16/04/02 05:03:35 INFO datasources.DataSourceStrategy: Selected 2 partitions out of 2, pruned 0.0% partitions.

16/04/02 05:03:36 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 62.4 KB, free 237.6 KB)

16/04/02 05:03:36 INFO storage.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 19.7 KB, free 257.3 KB)

16/04/02 05:03:36 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on 192.168.1.121:56069 (size: 19.7 KB, free: 517.3 MB)

16/04/02 05:03:36 INFO spark.SparkContext: Created broadcast 3 from show at <console>:31

16/04/02 05:03:38 INFO Configuration.deprecation: mapred.min.split.size is deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize

16/04/02 05:03:38 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://slq1:9000/user/richard/data/text_table/key=2/part-r-00000-2f220b3f-43a1-4093-ad51-1d3af7707ca8.gz.parquet, hdfs://slq1:9000/user/richard/data/text_table/key=2/part-r-00001-2f220b3f-43a1-4093-ad51-1d3af7707ca8.gz.parquet, hdfs://slq1:9000/user/richard/data/text_table/key=2/part-r-00002-2f220b3f-43a1-4093-ad51-1d3af7707ca8.gz.parquet

16/04/02 05:03:38 INFO parquet.ParquetRelation: Reading Parquet file(s) from hdfs://slq1:9000/user/richard/data/text_table/key=1/part-r-00000-f6a15341-401e-41b0-8f8a-acbf97ce42fb.gz.parquet, hdfs://slq1:9000/user/richard/data/text_table/key=1/part-r-00001-f6a15341-401e-41b0-8f8a-acbf97ce42fb.gz.parquet, hdfs://slq1:9000/user/richard/data/text_table/key=1/part-r-00002-f6a15341-401e-41b0-8f8a-acbf97ce42fb.gz.parquet

16/04/02 05:03:38 INFO spark.SparkContext: Starting job: show at <console>:31

16/04/02 05:03:38 INFO scheduler.DAGScheduler: Got job 3 (show at <console>:31) with 1 output partitions

16/04/02 05:03:38 INFO scheduler.DAGScheduler: Final stage: ResultStage 3 (show at <console>:31)

16/04/02 05:03:38 INFO scheduler.DAGScheduler: Parents of final stage: List()

16/04/02 05:03:38 INFO scheduler.DAGScheduler: Missing parents: List()

16/04/02 05:03:38 INFO scheduler.DAGScheduler: Submitting ResultStage 3 (MapPartitionsRDD[24] at show at <console>:31), which has no missing parents

16/04/02 05:03:38 INFO storage.MemoryStore: Block broadcast_4 stored as values in memory (estimated size 7.1 KB, free 264.4 KB)

16/04/02 05:03:38 INFO storage.MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 3.9 KB, free 268.4 KB)

16/04/02 05:03:38 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on 192.168.1.121:56069 (size: 3.9 KB, free: 517.3 MB)

16/04/02 05:03:38 INFO spark.SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:1006

16/04/02 05:03:38 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 3 (MapPartitionsRDD[24] at show at <console>:31)

16/04/02 05:03:38 INFO scheduler.TaskSchedulerImpl: Adding task set 3.0 with 1 tasks

16/04/02 05:03:39 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 3.0 (TID 9, slq1, partition 0,NODE_LOCAL, 2353 bytes)

16/04/02 05:03:39 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on slq1:44043 (size: 3.9 KB, free: 517.4 MB)

16/04/02 05:03:39 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on slq1:44043 (size: 19.7 KB, free: 517.3 MB)

16/04/02 05:03:44 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 3.0 (TID 9) in 5898 ms on slq1 (1/1)

16/04/02 05:03:44 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool

16/04/02 05:03:44 INFO scheduler.DAGScheduler: ResultStage 3 (show at <console>:31) finished in 5.901 s

16/04/02 05:03:44 INFO scheduler.DAGScheduler: Job 3 finished: show at <console>:31, took 6.358506 s

16/04/02 05:03:44 INFO spark.SparkContext: Starting job: show at <console>:31

16/04/02 05:03:44 INFO scheduler.DAGScheduler: Got job 4 (show at <console>:31) with 5 output partitions

16/04/02 05:03:44 INFO scheduler.DAGScheduler: Final stage: ResultStage 4 (show at <console>:31)

16/04/02 05:03:44 INFO scheduler.DAGScheduler: Parents of final stage: List()

16/04/02 05:03:44 INFO scheduler.DAGScheduler: Missing parents: List()

16/04/02 05:03:44 INFO scheduler.DAGScheduler: Submitting ResultStage 4 (MapPartitionsRDD[24] at show at <console>:31), which has no missing parents

16/04/02 05:03:45 INFO storage.MemoryStore: Block broadcast_5 stored as values in memory (estimated size 7.1 KB, free 275.4 KB)

16/04/02 05:03:45 INFO storage.MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 3.9 KB, free 279.4 KB)

16/04/02 05:03:45 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on 192.168.1.121:56069 (size: 3.9 KB, free: 517.3 MB)

16/04/02 05:03:45 INFO spark.SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1006

16/04/02 05:03:45 INFO scheduler.DAGScheduler: Submitting 5 missing tasks from ResultStage 4 (MapPartitionsRDD[24] at show at <console>:31)

16/04/02 05:03:45 INFO scheduler.TaskSchedulerImpl: Adding task set 4.0 with 5 tasks

16/04/02 05:03:45 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 4.0 (TID 10, slq3, partition 1,NODE_LOCAL, 2354 bytes)

16/04/02 05:03:45 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 4.0 (TID 11, slq1, partition 2,NODE_LOCAL, 2354 bytes)

16/04/02 05:03:45 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 4.0 (TID 12, slq2, partition 3,NODE_LOCAL, 2353 bytes)

16/04/02 05:03:45 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on slq1:44043 (size: 3.9 KB, free: 517.3 MB)

16/04/02 05:03:45 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on slq2:44836 (size: 3.9 KB, free: 517.4 MB)

16/04/02 05:03:45 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on slq3:53765 (size: 3.9 KB, free: 517.4 MB)

16/04/02 05:03:45 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on slq3:53765 (size: 19.7 KB, free: 517.3 MB)

16/04/02 05:03:46 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 4.0 (TID 13, slq1, partition 4,NODE_LOCAL, 2354 bytes)

16/04/02 05:03:46 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 4.0 (TID 11) in 1205 ms on slq1 (1/5)

16/04/02 05:03:47 INFO scheduler.TaskSetManager: Starting task 4.0 in stage 4.0 (TID 14, slq1, partition 5,NODE_LOCAL, 2354 bytes)

16/04/02 05:03:47 INFO scheduler.TaskSetManager: Finished task 3.0 in stage 4.0 (TID 13) in 703 ms on slq1 (2/5)

16/04/02 05:03:47 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on slq2:44836 (size: 19.7 KB, free: 517.3 MB)

16/04/02 05:03:49 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 4.0 (TID 14) in 2032 ms on slq1 (3/5)

16/04/02 05:03:52 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 4.0 (TID 10) in 7654 ms on slq3 (4/5)

16/04/02 05:03:54 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 4.0 (TID 12) in 9789 ms on slq2 (5/5)

16/04/02 05:03:54 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool

16/04/02 05:03:54 INFO scheduler.DAGScheduler: ResultStage 4 (show at <console>:31) finished in 9.805 s

16/04/02 05:03:54 INFO scheduler.DAGScheduler: Job 4 finished: show at <console>:31, took 9.980420 s

+------+------+------+---+

|single|double|triple|key|

+------+------+------+---+

|     6|  null|    18|  2|

|     7|  null|    21|  2|

|     8|  null|    24|  2|

|     9|  null|    27|  2|

|    10|  null|    30|  2|

|     1|     2|  null|  1|

|     2|     4|  null|  1|

|     3|     6|  null|  1|

|     4|     8|  null|  1|

|     5|    10|  null|  1|

+------+------+------+---+

 

16/04/02 05:09:12 INFO storage.BlockManagerInfo: Removed broadcast_5_piece0 on 192.168.1.121:56069 in memory (size: 3.9 KB, free: 517.3 MB)

16/04/02 05:09:12 INFO storage.BlockManagerInfo: Removed broadcast_5_piece0 on slq1:44043 in memory (size: 3.9 KB, free: 517.3 MB)

16/04/02 05:09:12 INFO storage.BlockManagerInfo: Removed broadcast_5_piece0 on slq3:53765 in memory (size: 3.9 KB, free: 517.3 MB)

16/04/02 05:09:12 INFO storage.BlockManagerInfo: Removed broadcast_5_piece0 on slq2:44836 in memory (size: 3.9 KB, free: 517.3 MB)

16/04/02 05:09:12 INFO spark.ContextCleaner: Cleaned accumulator 8

16/04/02 05:09:12 INFO storage.BlockManagerInfo: Removed broadcast_4_piece0 on 192.168.1.121:56069 in memory (size: 3.9 KB, free: 517.3 MB)

16/04/02 05:09:12 INFO storage.BlockManagerInfo: Removed broadcast_4_piece0 on slq1:44043 in memory (size: 3.9 KB, free: 517.3 MB)

16/04/02 05:09:12 INFO spark.ContextCleaner: Cleaned accumulator 7

16/04/02 05:09:12 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0 on 192.168.1.121:56069 in memory (size: 19.7 KB, free: 517.4 MB)

16/04/02 05:09:12 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0 on slq3:53765 in memory (size: 19.7 KB, free: 517.4 MB)

16/04/02 05:09:12 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0 on slq2:44836 in memory (size: 19.7 KB, free: 517.4 MB)

16/04/02 05:09:12 INFO storage.BlockManagerInfo: Removed broadcast_3_piece0 on slq1:44043 in memory (size: 19.7 KB, free: 517.4 MB)

16/04/02 05:09:12 INFO storage.BlockManagerInfo: Removed broadcast_2_piece0 on 192.168.1.121:56069 in memory (size: 21.0 KB, free: 517.4 MB)

16/04/02 05:09:12 INFO storage.BlockManagerInfo: Removed broadcast_2_piece0 on slq1:44043 in memory (size: 21.0 KB, free: 517.4 MB)

16/04/02 05:09:12 INFO storage.BlockManagerInfo: Removed broadcast_2_piece0 on slq3:53765 in memory (size: 21.0 KB, free: 517.4 MB)

16/04/02 05:09:12 INFO storage.BlockManagerInfo: Removed broadcast_2_piece0 on slq2:44836 in memory (size: 21.0 KB, free: 517.4 MB)

16/04/02 05:09:12 INFO spark.ContextCleaner: Cleaned accumulator 6

16/04/02 05:09:12 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.1.121:56069 in memory (size: 24.6 KB, free: 517.4 MB)

16/04/02 05:09:12 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on slq3:53765 in memory (size: 24.6 KB, free: 517.4 MB)

16/04/02 05:09:12 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on slq2:44836 in memory (size: 24.6 KB, free: 517.4 MB)

16/04/02 05:09:12 INFO storage.BlockManagerInfo: Removed broadcast_1_piece0 on slq1:44043 in memory (size: 24.6 KB, free: 517.4 MB)

16/04/02 05:09:13 INFO spark.ContextCleaner: Cleaned accumulator 5

16/04/02 05:09:13 INFO spark.ContextCleaner: Cleaned accumulator 4

 

 

 

实例中使用了df.write方法将DataFrame数据以parquet格式写入到HDFS上。

下面从源码的角度解读此实例:

DataFrame.scala类中,可以找到write方法:

/**
 * :: Experimental ::
 * Interface for saving the content of the
[[DataFrame]] out into external storage.
 *
 *
@group output
 *
@since 1.4.0
 */
@Experimental
def write: DataFrameWriter = new DataFrameWriter(this)

可以看出,DataFramewrite方法直接生成了一个DataFrameWriter实例。

DataFrameWriter类中可以找到parquet方法:

/**
 * Saves the content of the
[[DataFrame]] in Parquet format at the specified path.
 * This is equivalent to:
 *
{{{
 *   format("parquet").save(path)
 *
}}}
 *
 *
@since 1.4.0
 */
def parquet(path: String): Unit = format("parquet").save(path)

可以看出parquet方法只是format("parquet").save(path)方法的快捷方式。

format方法的源码如下:

/**
 * Specifies the underlying output data source. Built-in options include "parquet", "json", etc.
 *
 *
@since 1.4.0
 */
def format(source: String): DataFrameWriter = {
  this.source = source
  this
}

format方法只是返回“parquet”格式名称本身,然后进行save操作。

/**
 * Saves the content of the
[[DataFrame]] at the specified path.
 *
 *
@since 1.4.0
 */
def save(path: String): Unit = {
  this.extraOptions += ("path" -> path)
  save()
}

可以看出save操作中调用了extraOptions方法:

private var extraOptions = new scala.collection.mutable.HashMap[String, String]

可以看出extraOptions 是一个HashMap

save操作还调用了save()方法:

/**
 * Saves the content of the
[[DataFrame]] as the specified table.
 *
 *
@since 1.4.0
 */
def save(): Unit = {
  ResolvedDataSource(
    df.sqlContext,
    source,
    partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
    mode,
    extraOptions.toMap,
    df)
}

save()方法主要就是调用ResolvedDataSource的apply方法:

/** Create a [[ResolvedDataSource]] for saving the content of the given DataFrame. */
  
def apply(
      sqlContext: SQLContext,  //对应save()方法中的df.sqlContext。
      provider: String,    //对应save()方法中的source,即“parquet”格式名称
      partitionColumns: Array[String],
      mode: SaveMode,
      options: Map[String, String],
      data: DataFrame): ResolvedDataSource = {
    if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
      throw new AnalysisException("Cannot save interval data type into external storage.")
    }
    val clazz: Class[_] = lookupDataSource(provider)
    val relation = clazz.newInstance() match {
      case dataSource: CreatableRelationProvider =>
        dataSource.createRelation(sqlContext, mode, options, data)
      case dataSource: HadoopFsRelationProvider =>
        // Don't glob path for the write path.  The contracts here are:
        //  1. Only one output path can be specified on the write path;
        //  2. Output path must be a legal HDFS style file system path;
        //  3. It's OK that the output path doesn't exist yet;
        
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
        val outputPath = {
          val path = new Path(caseInsensitiveOptions("path"))
          val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
          path.makeQualified(fs.getUri, fs.getWorkingDirectory)
        }

        val caseSensitive = sqlContext.conf.caseSensitiveAnalysis
        PartitioningUtils.validatePartitionColumnDataTypes(
          data.schema, partitionColumns, caseSensitive)

        val equality = columnNameEquality(caseSensitive)
        val dataSchema = StructType(
          data.schema.filterNot(f => partitionColumns.exists(equality(_, f.name))))
        val r = dataSource.createRelation(
          sqlContext,
          Array(outputPath.toString),
          Some(dataSchema.asNullable),
          Some(partitionColumnsSchema(data.schema, partitionColumns, caseSensitive)),
          caseInsensitiveOptions)

        // For partitioned relation r, r.schema's column ordering can be different from the column
        // ordering of data.logicalPlan (partition columns are all moved after data column).  This
        // will be adjusted within InsertIntoHadoopFsRelation.
        
sqlContext.executePlan(
          InsertIntoHadoopFsRelation(
            r,
            data.logicalPlan,
            mode)).toRdd
        
r
      case _ =>
        sys.error(s"${clazz.getCanonicalName} does not allow create table as select.")
    }
    ResolvedDataSource(clazz, relation)
  }
}

 

save()方法中的source的源码为:

private var source: String = df.sqlContext.conf.defaultDataSourceName

SQLContext的conf中的defaultDataSourceName方法为:

private[spark] def defaultDataSourceName: String = getConf(DEFAULT_DATA_SOURCE_NAME)

在SQLConf.scala中可以看到:
// This is used to set the default data source
val DEFAULT_DATA_SOURCE_NAME = stringConf("spark.sql.sources.default",
  defaultValue = Some("org.apache.spark.sql.parquet"),
  doc = "The default data source to use in input/output.")

即默认数据源是parquet

 

parquet.block.size基本上是压缩后的大小。读取数据时可能数据还在encoding

 

page内部有repetitionLevel DefinitionLevel data

Java的二进制就是字节流

Parquet非常耗内存,采用高压缩比率,采用很多Cache

解压后的大小是解压前的5-10倍。

BlockSize采用默认256MB

 

 

 

 

 

以上内容是王家林老师DT大数据梦工厂《 IMF传奇行动》第64课的学习笔记。
王家林老师是Spark、Flink、Docker、Android技术中国区布道师。Spark亚太研究院院长和首席专家,DT大数据梦工厂创始人,Android软硬整合源码级专家,英语发音魔术师,健身狂热爱好者。

微信公众账号:DT_Spark

联系邮箱18610086859@126.com 

电话:18610086859

QQ:1740415547

微信号:18610086859  

新浪微博:ilovepains


 

 

这篇关于第64课:SparkSQL下Parquet的数据切分和压缩内幕详解学习笔记的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/588125

相关文章

Python在二进制文件中进行数据搜索的实战指南

《Python在二进制文件中进行数据搜索的实战指南》在二进制文件中搜索特定数据是编程中常见的任务,尤其在日志分析、程序调试和二进制数据处理中尤为重要,下面我们就来看看如何使用Python实现这一功能吧... 目录简介1. 二进制文件搜索概述2. python二进制模式文件读取(rb)2.1 二进制模式与文本

基于C++的UDP网络通信系统设计与实现详解

《基于C++的UDP网络通信系统设计与实现详解》在网络编程领域,UDP作为一种无连接的传输层协议,以其高效、低延迟的特性在实时性要求高的应用场景中占据重要地位,下面我们就来看看如何从零开始构建一个完整... 目录前言一、UDP服务器UdpServer.hpp1.1 基本框架设计1.2 初始化函数Init详解

springboot+redis实现订单过期(超时取消)功能的方法详解

《springboot+redis实现订单过期(超时取消)功能的方法详解》在SpringBoot中使用Redis实现订单过期(超时取消)功能,有多种成熟方案,本文为大家整理了几个详细方法,文中的示例代... 目录一、Redis键过期回调方案(推荐)1. 配置Redis监听器2. 监听键过期事件3. Redi

Springboot配置文件相关语法及读取方式详解

《Springboot配置文件相关语法及读取方式详解》本文主要介绍了SpringBoot中的两种配置文件形式,即.properties文件和.yml/.yaml文件,详细讲解了这两种文件的语法和读取方... 目录配置文件的形式语法1、key-value形式2、数组形式读取方式1、通过@value注解2、通过

C#实现将XML数据自动化地写入Excel文件

《C#实现将XML数据自动化地写入Excel文件》在现代企业级应用中,数据处理与报表生成是核心环节,本文将深入探讨如何利用C#和一款优秀的库,将XML数据自动化地写入Excel文件,有需要的小伙伴可以... 目录理解XML数据结构与Excel的对应关系引入高效工具:使用Spire.XLS for .NETC

自定义注解SpringBoot防重复提交AOP方法详解

《自定义注解SpringBoot防重复提交AOP方法详解》该文章描述了一个防止重复提交的流程,通过HttpServletRequest对象获取请求信息,生成唯一标识,使用Redis分布式锁判断请求是否... 目录防重复提交流程引入依赖properties配置自定义注解切面Redis工具类controller

Python容器转换与共有函数举例详解

《Python容器转换与共有函数举例详解》Python容器是Python编程语言中非常基础且重要的概念,它们提供了数据的存储和组织方式,下面:本文主要介绍Python容器转换与共有函数的相关资料,... 目录python容器转换与共有函数详解一、容器类型概览二、容器类型转换1. 基本容器转换2. 高级转换示

MySQL数据目录迁移的完整过程

《MySQL数据目录迁移的完整过程》文章详细介绍了将MySQL数据目录迁移到新硬盘的整个过程,包括新硬盘挂载、创建新的数据目录、迁移数据(推荐使用两遍rsync方案)、修改MySQL配置文件和重启验证... 目录1,新硬盘挂载(如果有的话)2,创建新的 mysql 数据目录3,迁移 MySQL 数据(推荐两

HTML5的input标签的`type`属性值详解和代码示例

《HTML5的input标签的`type`属性值详解和代码示例》HTML5的`input`标签提供了多种`type`属性值,用于创建不同类型的输入控件,满足用户输入的多样化需求,从文本输入、密码输入、... 目录一、引言二、文本类输入类型2.1 text2.2 password2.3 textarea(严格

Python数据验证神器Pydantic库的使用和实践中的避坑指南

《Python数据验证神器Pydantic库的使用和实践中的避坑指南》Pydantic是一个用于数据验证和设置的库,可以显著简化API接口开发,文章通过一个实际案例,展示了Pydantic如何在生产环... 目录1️⃣ 崩溃时刻:当你的API接口又双叒崩了!2️⃣ 神兵天降:3行代码解决验证难题3️⃣ 深度