尚硅谷大数据技术-数据湖Hudi视频教程-笔记03【Hudi集成Spark】
大数据新风口:Hudi数据湖(尚硅谷&Apache Hudi联合出品)
- B站直达:https://www.bilibili.com/video/BV1ue4y1i7na 尚硅谷数据湖Hudi视频教程
- 百度网盘:https://pan.baidu.com/s/1NkPku5Pp-l0gfgoo63hR-Q?pwd=yyds
- 阿里云盘:https://www.aliyundrive.com/s/uMCmjb8nGaC(教程配套资料请从百度网盘下载)
-
尚硅谷大数据技术-数据湖Hudi视频教程-笔记01【Hudi概述、Hudi编译安装】
-
尚硅谷大数据技术-数据湖Hudi视频教程-笔记02【Hudi核心概念(基本概念、数据写、数据读)】
-
尚硅谷大数据技术-数据湖Hudi视频教程-笔记03【Hudi集成Spark】
-
尚硅谷大数据技术-数据湖Hudi视频教程-笔记04【Hudi集成Flink】
-
尚硅谷大数据技术-数据湖Hudi视频教程-笔记05【Hudi集成Hive】
目录
第4章 集成 Spark
026
027
028
029
030
031
第4章 集成 Spark
026
第4章 集成 Spark
4.1 环境准备
4.1.1 安装Spark
4.1.2 启动Hadoop(略)
4.2 spark-shell 方式
4.2.1 启动 spark-shell
1)启动命令
[atguigu@node001 ~]$ spark-shell \ > --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ > --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \ > --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 18760 [main] WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Spark context Web UI available at http://node001:4040 Spark context available as 'sc' (master = local[*], app id = local-1704790850201). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.2.2 /_/ Using Scala version 2.12.15 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_212) Type in expressions to have them evaluated. Type :help for more information. scala>
027
4.2.1 启动 spark-shell
2)设置表名,基本路径和数据生成器
scala> import org.apache.hudi.QuickstartUtils._ import org.apache.hudi.QuickstartUtils._ scala> import scala.collection.JavaConversions._ import scala.collection.JavaConversions._ scala> import org.apache.spark.sql.SaveMode._ import org.apache.spark.sql.SaveMode._ scala> import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceReadOptions._ scala> import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.DataSourceWriteOptions._ scala> import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.config.HoodieWriteConfig._ scala> scala> val tableName = "hudi_trips_cow" tableName: String = hudi_trips_cow scala> val basePath = "file:///tmp/hudi_trips_cow" basePath: String = file:///tmp/hudi_trips_cow scala> val dataGen = new DataGenerator dataGen: org.apache.hudi.QuickstartUtils.DataGenerator = org.apache.hudi.QuickstartUtils$DataGenerator@66e6b022 scala> scala> scala> val inserts = convertToStringList(dataGen.generateInserts(10)) inserts: java.util.List[String] = [{"ts": 1704209002713, "uuid": "58f04a7a-6d32-42a6-8915-dfc00ae845fc", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.4726905879569653, "begin_lon": 0.46157858450465483, "end_lat": 0.754803407008858, "end_lon": 0.9671159942018241, "fare": 34.158284716382845, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 1704623138251, "uuid": "d2c871b0-e98e-44ee-815b-09fbcc5771bb", "rider": "rider-213", "driver": "driver-213", "begin_lat": 0.6100070562136587, "begin_lon": 0.8779402295427752, "end_lat": 0.3407870505929602, "end_lon": 0.5030798142293655, "fare": 43.4923811219014, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 1704362707376, "uuid": "74e4699e-3644-477e-9d12-bf83a67c59c1", "rider": "rider-213", "driver"... scala> val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) warning: one deprecation (since 2.12.0) warning: one deprecation (since 2.2.0) warning: two deprecations in total; for details, enable `:setting -deprecation' or `:replay -deprecation' df: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 8 more fields] scala> df.write.format("hudi"). | options(getQuickstartWriteConfigs). | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | option(TABLE_NAME, tableName). | mode(Overwrite). | save(basePath) warning: one deprecation; for details, enable `:setting -deprecation' or `:replay -deprecation' 7744528 [main] WARN org.apache.hudi.common.config.DFSPropertiesConfiguration - Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf 7744558 [main] WARN org.apache.hudi.common.config.DFSPropertiesConfiguration - Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file 7747037 [main] WARN org.apache.hudi.metadata.HoodieBackedTableMetadata - Metadata table was not found at path file:///tmp/hudi_trips_cow/.hoodie/metadata scala>
028
4.2.3 查询数据
scala> val tripsSnapshotDF = spark. | read. | format("hudi"). | load(basePath) tripsSnapshotDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 13 more fields] scala> tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") scala> scala> spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() +------------------+-------------------+-------------------+-------------+ | fare| begin_lon| begin_lat| ts| +------------------+-------------------+-------------------+-------------+ | 33.92216483948643| 0.9694586417848392| 0.1856488085068272|1704256809374| | 64.27696295884016| 0.4923479652912024| 0.5731835407930634|1704362707376| | 93.56018115236618|0.14285051259466197|0.21624150367601136|1704216687193| | 27.79478688582596| 0.6273212202489661|0.11488393157088261|1704494741512| | 43.4923811219014| 0.8779402295427752| 0.6100070562136587|1704623138251| | 66.62084366450246|0.03844104444445928| 0.0750588760043035|1704568660989| |34.158284716382845|0.46157858450465483| 0.4726905879569653|1704209002713| | 41.06290929046368| 0.8192868687714224| 0.651058505660742|1704563419431| +------------------+-------------------+-------------------+-------------+ scala> spark.sql("select * from hudi_trips_snapshot where fare > 20.0").show() +-------------------+--------------------+--------------------+----------------------+--------------------+-------------------+-------------------+----------+-------------------+-------------------+------------------+---------+-------------+--------------------+--------------------+ |_hoodie_commit_time|_hoodie_commit_seqno| _hoodie_record_key|_hoodie_partition_path| _hoodie_file_name| begin_lat| begin_lon| driver| end_lat| end_lon| fare| rider| ts| uuid| partitionpath| +-------------------+--------------------+--------------------+----------------------+--------------------+-------------------+-------------------+----------+-------------------+-------------------+------------------+---------+-------------+--------------------+--------------------+ | 20240109190932344|20240109190932344...|ca1d00a7-8eeb-49c...| americas/united_s...|7608cfb3-4a20-418...| 0.1856488085068272| 0.9694586417848392|driver-213|0.38186367037201974|0.25252652214479043| 33.92216483948643|rider-213|1704256809374|ca1d00a7-8eeb-49c...|americas/united_s...| | 20240109190932344|20240109190932344...|74e4699e-3644-477...| americas/united_s...|7608cfb3-4a20-418...| 0.5731835407930634| 0.4923479652912024|driver-213|0.08988581780930216|0.42520899698713666| 64.27696295884016|rider-213|1704362707376|74e4699e-3644-477...|americas/united_s...| | 20240109190932344|20240109190932344...|fb17af56-2f63-481...| americas/united_s...|7608cfb3-4a20-418...|0.21624150367601136|0.14285051259466197|driver-213| 0.5890949624813784| 0.0966823831927115| 93.56018115236618|rider-213|1704216687193|fb17af56-2f63-481...|americas/united_s...| | 20240109190932344|20240109190932344...|c745152f-893d-461...| americas/united_s...|7608cfb3-4a20-418...|0.11488393157088261| 0.6273212202489661|driver-213| 0.7454678537511295| 0.3954939864908973| 27.79478688582596|rider-213|1704494741512|c745152f-893d-461...|americas/united_s...| | 20240109190932344|20240109190932344...|d2c871b0-e98e-44e...| americas/brazil/s...|1c69c03e-47de-4d9...| 0.6100070562136587| 0.8779402295427752|driver-213| 0.3407870505929602| 0.5030798142293655| 43.4923811219014|rider-213|1704623138251|d2c871b0-e98e-44e...|americas/brazil/s...| | 20240109190932344|20240109190932344...|6983e2a6-61e4-4df...| americas/brazil/s...|1c69c03e-47de-4d9...| 0.0750588760043035|0.03844104444445928|driver-213|0.04376353354538354| 0.6346040067610669| 66.62084366450246|rider-213|1704568660989|6983e2a6-61e4-4df...|americas/brazil/s...| | 20240109190932344|20240109190932344...|58f04a7a-6d32-42a...| americas/brazil/s...|1c69c03e-47de-4d9...| 0.4726905879569653|0.46157858450465483|driver-213| 0.754803407008858| 0.9671159942018241|34.158284716382845|rider-213|1704209002713|58f04a7a-6d32-42a...|americas/brazil/s...| | 20240109190932344|20240109190932344...|f464f5d9-c284-4a8...| asia/india/chennai|38325a98-dd9a-453...| 0.651058505660742| 0.8192868687714224|driver-213|0.20714896002914462|0.06224031095826987| 41.06290929046368|rider-213|1704563419431|f464f5d9-c284-4a8...| asia/india/chennai| +-------------------+--------------------+--------------------+----------------------+--------------------+-------------------+-------------------+----------+-------------------+-------------------+------------------+---------+-------------+--------------------+--------------------+ scala>
029
4.2.4 更新数据
val tripsSnapshotDF1 = spark.read.format("hudi").load(basePath)
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot1")
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show(20)
scala> val tripsSnapshotDF1 = spark.read.format("hudi").load(basePath) tripsSnapshotDF1: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 13 more fields] scala> tripsSnapshotDF1.createOrReplaceTempView("hudi_trips_snapshot1") scala> spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show(20) +-------------------+--------------------+----------------------+---------+----------+------------------+ |_hoodie_commit_time| _hoodie_record_key|_hoodie_partition_path| rider| driver| fare| +-------------------+--------------------+----------------------+---------+----------+------------------+ | 20240123152755513|c1f36d67-9031-415...| americas/united_s...|rider-213|driver-213| 64.27696295884016| | 20240123152755513|a1914586-f6f8-468...| americas/united_s...|rider-213|driver-213| 33.92216483948643| | 20240123152755513|7779f311-34fb-412...| americas/united_s...|rider-213|driver-213| 93.56018115236618| | 20240123152755513|bc90314b-537a-409...| americas/united_s...|rider-213|driver-213| 27.79478688582596| | 20240123152755513|aef5f9e3-e31a-42c...| americas/united_s...|rider-213|driver-213|19.179139106643607| | 20240123152755513|1dda1939-c3a7-488...| americas/brazil/s...|rider-213|driver-213|34.158284716382845| | 20240123152755513|7f6b775b-1480-425...| americas/brazil/s...|rider-213|driver-213| 43.4923811219014| | 20240123152755513|ce7f6bb2-53de-46b...| americas/brazil/s...|rider-213|driver-213| 66.62084366450246| | 20240123152755513|10b632a9-59e5-4ef...| asia/india/chennai|rider-213|driver-213|17.851135255091155| | 20240123152755513|8a78e424-e64a-40f...| asia/india/chennai|rider-213|driver-213| 41.06290929046368| +-------------------+--------------------+----------------------+---------+----------+------------------+ scala>
4.2.3 查询数据
3)时间旅行查询
030
4.2.5 增量查询
scala> spark. | read. | format("hudi"). | load(basePath). | createOrReplaceTempView("hudi_trips_snapshot") scala> val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50) commits: Array[String] = Array(20240123152755513) scala> val beginTime = commits(commits.length - 2) java.lang.ArrayIndexOutOfBoundsException: -1 ... 59 elided scala> scala> scala> val updates = convertToStringList(dataGen.generateUpdates(10)) updates: java.util.List[String] = [{"ts": 1705879465411, "uuid": "c1f36d67-9031-4157-a8d2-13b6d30b1572", "rider": "rider-284", "driver": "driver-284", "begin_lat": 0.7340133901254792, "begin_lon": 0.5142184937933181, "end_lat": 0.7814655558162802, "end_lon": 0.6592596683641996, "fare": 49.527694252432056, "partitionpath": "americas/united_states/san_francisco"}, {"ts": 1705399614657, "uuid": "1dda1939-c3a7-4884-9b70-4ef87bc050f9", "rider": "rider-284", "driver": "driver-284", "begin_lat": 0.1593867607188556, "begin_lon": 0.010872312870502165, "end_lat": 0.9808530350038475, "end_lon": 0.7963756520507014, "fare": 29.47661370147079, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 1705403081035, "uuid": "1dda1939-c3a7-4884-9b70-4ef87bc050f9", "rider": "rider-... scala> val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) warning: one deprecation (since 2.12.0) warning: one deprecation (since 2.2.0) warning: two deprecations in total; for details, enable `:setting -deprecation' or `:replay -deprecation' df: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 8 more fields] scala> df.write.format("hudi"). | options(getQuickstartWriteConfigs). | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | option(TABLE_NAME, tableName). | mode(Append). | save(basePath) warning: one deprecation; for details, enable `:setting -deprecation' or `:replay -deprecation' scala> val updates = convertToStringList(dataGen.generateUpdates(10)) updates: java.util.List[String] = [{"ts": 1705790965039, "uuid": "c1f36d67-9031-4157-a8d2-13b6d30b1572", "rider": "rider-243", "driver": "driver-243", "begin_lat": 0.9045189017781902, "begin_lon": 0.38697902072535484, "end_lat": 0.21932410786717094, "end_lon": 0.7816060218244935, "fare": 44.596839246210095, "partitionpath": "americas/united_states/san_francisco"}, {"ts": 1705482677807, "uuid": "ce7f6bb2-53de-46bd-87f8-ff19f367bd1d", "rider": "rider-243", "driver": "driver-243", "begin_lat": 0.856152038750905, "begin_lon": 0.3132477949501916, "end_lat": 0.8742438057467156, "end_lon": 0.26923247017036556, "fare": 2.4995362119815567, "partitionpath": "americas/brazil/sao_paulo"}, {"ts": 1705754253073, "uuid": "c1f36d67-9031-4157-a8d2-13b6d30b1572", "rider": "rider... scala> val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) warning: one deprecation (since 2.12.0) warning: one deprecation (since 2.2.0) warning: two deprecations in total; for details, enable `:setting -deprecation' or `:replay -deprecation' df: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 8 more fields] scala> df.write.format("hudi"). | options(getQuickstartWriteConfigs). | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | option(TABLE_NAME, tableName). | mode(Append). | save(basePath) warning: one deprecation; for details, enable `:setting -deprecation' or `:replay -deprecation' scala> val updates = convertToStringList(dataGen.generateUpdates(10)) updates: java.util.List[String] = [{"ts": 1705393408046, "uuid": "10b632a9-59e5-4ef4-811c-1250a817c74a", "rider": "rider-563", "driver": "driver-563", "begin_lat": 0.16172715555352513, "begin_lon": 0.6286940931025506, "end_lat": 0.7559063825441225, "end_lon": 0.39828516291900906, "fare": 16.098476392187365, "partitionpath": "asia/india/chennai"}, {"ts": 1705982569737, "uuid": "8a78e424-e64a-40f8-8eb7-f8e3741ab17e", "rider": "rider-563", "driver": "driver-563", "begin_lat": 0.9312237784651692, "begin_lon": 0.67243450582925, "end_lat": 0.28393433672984614, "end_lon": 0.2725166210142148, "fare": 27.603571822228822, "partitionpath": "asia/india/chennai"}, {"ts": 1705602950761, "uuid": "bc90314b-537a-409e-9d93-9c8663d578cc", "rider": "rider-563", "driver": "driver-5... scala> val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) warning: one deprecation (since 2.12.0) warning: one deprecation (since 2.2.0) warning: two deprecations in total; for details, enable `:setting -deprecation' or `:replay -deprecation' df: org.apache.spark.sql.DataFrame = [begin_lat: double, begin_lon: double ... 8 more fields] scala> df.write.format("hudi"). | options(getQuickstartWriteConfigs). | option(PRECOMBINE_FIELD_OPT_KEY, "ts"). | option(RECORDKEY_FIELD_OPT_KEY, "uuid"). | option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). | option(TABLE_NAME, tableName). | mode(Append). | save(basePath) warning: one deprecation; for details, enable `:setting -deprecation' or `:replay -deprecation' scala> scala> scala> spark. | read. | format("hudi"). | load(basePath). | createOrReplaceTempView("hudi_trips_snapshot") scala> val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50) commits: Array[String] = Array(20240123155641315, 20240123155717896, 20240123155726796) scala> val beginTime = commits(commits.length - 2) beginTime: String = 20240123155717896 scala> val tripsIncrementalDF = spark.read.format("hudi"). | option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). | option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). | load(basePath) tripsIncrementalDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 13 more fields] scala> tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental") scala> spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show() +-------------------+------------------+-------------------+------------------+-------------+ |_hoodie_commit_time| fare| begin_lon| begin_lat| ts| +-------------------+------------------+-------------------+------------------+-------------+ | 20240123155726796| 54.16944371261484| 0.7548086309564753|0.5535762898838785|1705960250638| | 20240123155726796| 37.35848234860164| 0.9084944020139248|0.6330100459693088|1705652568556| | 20240123155726796| 84.66949742559657|0.31331111382522836|0.8573834026158349|1705867633650| | 20240123155726796| 38.4828225162323|0.20404106962358204|0.1450793330198833|1705405789140| | 20240123155726796| 55.31092276192561| 0.826183030502974| 0.391583018565109|1705428608507| | 20240123155726796|27.603571822228822| 0.67243450582925|0.9312237784651692|1705982569737| +-------------------+------------------+-------------------+------------------+-------------+ scala>
4.2.6 指定时间点查询
scala> val beginTime = "000" beginTime: String = 000 scala> val endTime = commits(commits.length - 2) endTime: String = 20240123155717896 scala> val tripsPointInTimeDF = spark.read.format("hudi"). | option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). | option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). | option(END_INSTANTTIME_OPT_KEY, endTime). | load(basePath) tripsPointInTimeDF: org.apache.spark.sql.DataFrame = [_hoodie_commit_time: string, _hoodie_commit_seqno: string ... 13 more fields] scala> tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time") scala> spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show() +-------------------+------------------+-------------------+-------------------+-------------+ |_hoodie_commit_time| fare| begin_lon| begin_lat| ts| +-------------------+------------------+-------------------+-------------------+-------------+ | 20240123155717896|44.596839246210095|0.38697902072535484| 0.9045189017781902|1705790965039| | 20240123155641315| 90.9053809533154|0.19949323322922063|0.18294079059016366|1705742911148| | 20240123155717896|26.636532270940915|0.12314538318119372|0.35527775182006427|1705655264707| | 20240123155717896| 51.42305232303094| 0.7071871604905721| 0.876334576190389|1705609025064| | 20240123155641315| 91.99515909032544| 0.2783086084578943| 0.2110206104048945|1705585303503| | 20240123155717896| 89.45841313717807|0.22991770617403628| 0.6923616674358241|1705771716835| | 20240123155717896| 71.08018349571618| 0.8150991077375751|0.01925237918893319|1705812754018| +-------------------+------------------+-------------------+-------------------+-------------+ scala>
031
4.2.7 删除数据
3.2.6 删除策略
1)逻辑删:将 value 字段全部标记为 null。
2)物理删:
(1)通过 OPERATION_OPT_KEY 删除所有的输入记录
(2)配置 PAYLOAD_CLASS_OPT_KEY = org.apache.hudi.EmptyHoodieRecordPayload 删除所有的输入记录
(3)在输入记录添加字段:_hoodie_is_deleted
4.2.8 覆盖数据
032
4.3 Spark SQL方式
4.3.1 创建表
[atguigu@node001 ~]$ nohup hive --service metastore & [1] 11371 [atguigu@node001 ~]$ nohup: 忽略输入并把输出追加到"nohup.out" [atguigu@node001 ~]$ jpsall ================ node001 ================ 3472 NameNode 4246 NodeManager 4455 JobHistoryServer 11384 -- process information unavailable 10456 SparkSubmit 3642 DataNode 4557 SparkSubmit 11437 Jps ================ node002 ================ 6050 Jps 2093 DataNode 2495 NodeManager 2335 ResourceManager ================ node003 ================ 5685 Jps 2279 SecondaryNameNode 2459 NodeManager 2159 DataNode [atguigu@node001 ~]$ netstat -anp | grep 9083 (Not all processes could be identified, non-owned process info will not be shown, you would have to be root to see it all.) tcp6 0 0 :::9083 :::* LISTEN 11371/java [atguigu@node001 ~]$ spark-sql \ > --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ > --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \ > --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 0 [main] WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 1410 [main] WARN org.apache.hadoop.hive.conf.HiveConf - HiveConf of name hive.metastore.event.db.notification.api.auth does not exist 1410 [main] WARN org.apache.hadoop.hive.conf.HiveConf - HiveConf of name hive.server2.active.passive.ha.enable does not exist 8317 [main] WARN org.apache.spark.util.Utils - Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 8319 [main] WARN org.apache.spark.util.Utils - Service 'SparkUI' could not bind on port 4041. Attempting port 4042. Spark master: local[*], Application Id: local-1706012797408 spark-sql (default)> show databases; namespace default edu2077 Time taken: 11.185 seconds, Fetched 2 row(s) spark-sql (default)> > create database spark_hudi; Response code Time taken: 11.725 seconds spark-sql (default)> use spark_hudi; Response code Time taken: 0.673 seconds spark-sql (default)> create table hudi_cow_nonpcf_tbl ( > uuid int, > name string, > price double > ) using hudi; 422168 [main] WARN org.apache.hudi.common.config.DFSPropertiesConfiguration - Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf 422406 [main] WARN org.apache.hudi.common.config.DFSPropertiesConfiguration - Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file 425565 [main] WARN org.apache.hadoop.hive.ql.session.SessionState - METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory. Response code Time taken: 10.492 seconds spark-sql (default)> show tables; namespace tableName isTemporary hudi_cow_nonpcf_tbl Time taken: 1.444 seconds, Fetched 1 row(s) spark-sql (default)> desc hudi_cow_nonpcf_tbl; col_name data_type comment _hoodie_commit_time string _hoodie_commit_seqno string _hoodie_record_key string _hoodie_partition_path string _hoodie_file_name string uuid int name string price double Time taken: 2.179 seconds, Fetched 8 row(s) spark-sql (default)> create table hudi_mor_tbl ( > id int, > name string, > price double, > ts bigint > ) using hudi > tblproperties ( > type = 'mor', > primaryKey = 'id', > preCombineField = 'ts' > ); Response code Time taken: 1.803 seconds spark-sql (default)> show tables; namespace tableName isTemporary hudi_cow_nonpcf_tbl hudi_mor_tbl Time taken: 0.257 seconds, Fetched 2 row(s) spark-sql (default)> desc hudi_mor_tbl; col_name data_type comment _hoodie_commit_time string _hoodie_commit_seqno string _hoodie_record_key string _hoodie_partition_path string _hoodie_file_name string id int name string price double ts bigint Time taken: 0.636 seconds, Fetched 9 row(s) spark-sql (default)> create table hudi_cow_pt_tbl ( > id bigint, > name string, > ts bigint, > dt string, > hh string > ) using hudi > tblproperties ( > type = 'cow', > primaryKey = 'id', > preCombineField = 'ts' > ) > partitioned by (dt, hh) > location '/tmp/hudi/hudi_cow_pt_tbl'; Response code Time taken: 21.88 seconds spark-sql (default)> create table hudi_ctas_cow_nonpcf_tbl > using hudi > tblproperties (primaryKey = 'id') > as > select 1 as id, 'a1' as name, 10 as price; 1514254 [main] WARN org.apache.hudi.metadata.HoodieBackedTableMetadata - Metadata table was not found at path file:/home/atguigu/spark-warehouse/spark_hudi.db/hudi_ctas_cow_nonpcf_tbl/.hoodie/metadata 1546852 [main] WARN org.apache.hadoop.hive.conf.HiveConf - HiveConf of name hive.metastore.event.db.notification.api.auth does not exist 1546852 [main] WARN org.apache.hadoop.hive.conf.HiveConf - HiveConf of name hive.server2.active.passive.ha.enable does not exist Response code Time taken: 63.078 seconds spark-sql (default)> select * from hudi_ctas_cow_nonpcf_tbl; _hoodie_commit_time _hoodie_commit_seqno _hoodie_record_key _hoodie_partition_path _hoodie_file_name id name price 20240123205131373 20240123205131373_0_0 id:1 bee32427-f490-40dd-89ed-3bacd3adf6fb-0_0-17-15_20240123205131373.parquet 1 a1 10 Time taken: 1.636 seconds, Fetched 1 row(s) spark-sql (default)> create table hudi_ctas_cow_pt_tbl > using hudi > tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts') > partitioned by (dt) > as > select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-12-01' as dt; 1646675 [main] WARN org.apache.hudi.metadata.HoodieBackedTableMetadata - Metadata table was not found at path file:/home/atguigu/spark-warehouse/spark_hudi.db/hudi_ctas_cow_pt_tbl/.hoodie/metadata 1664435 [main] WARN org.apache.hadoop.hive.conf.HiveConf - HiveConf of name hive.metastore.event.db.notification.api.auth does not exist 1664435 [main] WARN org.apache.hadoop.hive.conf.HiveConf - HiveConf of name hive.server2.active.passive.ha.enable does not exist Response code Time taken: 26.019 seconds spark-sql (default)> select * from hudi_ctas_cow_pt_tbl; _hoodie_commit_time _hoodie_commit_seqno _hoodie_record_key _hoodie_partition_path _hoodie_file_name id name price ts dt 20240123205354771 20240123205354771_0_0 id:1 dt=2021-12-01 88ad8031-2239-4bce-8494-5bf109012400-0_0-69-1259_20240123205354771.parquet 1 a1 10 1000 2021-12-01 Time taken: 2.829 seconds, Fetched 1 row(s) spark-sql (default)>
033
4.3.2 插入数据
spark-sql (default)> show tables; namespace tableName isTemporary hudi_cow_nonpcf_tbl hudi_cow_pt_tbl hudi_ctas_cow_nonpcf_tbl hudi_ctas_cow_pt_tbl hudi_mor_tbl Time taken: 1.298 seconds, Fetched 5 row(s) spark-sql (default)> insert into hudi_cow_nonpcf_tbl 1, 'a1', 20; Error in query: mismatched input '1' expecting {'(', 'FROM', 'MAP', 'REDUCE', 'SELECT', 'TABLE', 'VALUES'}(line 1, pos 32) == SQL == insert into hudi_cow_nonpcf_tbl 1, 'a1', 20 --------------------------------^^^ spark-sql (default)> insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20; 458055 [main] WARN org.apache.hudi.common.config.DFSPropertiesConfiguration - Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf 458212 [main] WARN org.apache.hudi.common.config.DFSPropertiesConfiguration - Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file 475300 [main] WARN org.apache.hudi.metadata.HoodieBackedTableMetadata - Metadata table was not found at path file:/home/atguigu/spark-warehouse/spark_hudi.db/hudi_cow_nonpcf_tbl/.hoodie/metadata 517206 [main] WARN org.apache.hadoop.hive.conf.HiveConf - HiveConf of name hive.metastore.event.db.notification.api.auth does not exist 517206 [main] WARN org.apache.hadoop.hive.conf.HiveConf - HiveConf of name hive.server2.active.passive.ha.enable does not exist Response code Time taken: 80.477 seconds spark-sql (default)> insert into hudi_mor_tbl select 1, 'a1', 20, 1000; 640716 [main] WARN org.apache.hudi.metadata.HoodieBackedTableMetadata - Metadata table was not found at path file:/home/atguigu/spark-warehouse/spark_hudi.db/hudi_mor_tbl/.hoodie/metadata 694713 [main] WARN org.apache.hadoop.hive.conf.HiveConf - HiveConf of name hive.metastore.event.db.notification.api.auth does not exist 694723 [main] WARN org.apache.hadoop.hive.conf.HiveConf - HiveConf of name hive.server2.active.passive.ha.enable does not exist Response code Time taken: 71.597 seconds spark-sql (default)> insert into hudi_cow_pt_tbl partition (dt, hh) > select 1 as id, 'a1' as name, 1000 as ts, '2021-12-09' as dt, '10' as hh; 743011 [main] WARN org.apache.hudi.metadata.HoodieBackedTableMetadata - Metadata table was not found at path hdfs://node001:8020/tmp/hudi/hudi_cow_pt_tbl/.hoodie/metadata 04:48 WARN: Timeline-server-based markers are not supported for HDFS: base path hdfs://node001:8020/tmp/hudi/hudi_cow_pt_tbl. Falling back to direct markers. 04:49 WARN: Timeline-server-based markers are not supported for HDFS: base path hdfs://node001:8020/tmp/hudi/hudi_cow_pt_tbl. Falling back to direct markers. 04:53 WARN: Timeline-server-based markers are not supported for HDFS: base path hdfs://node001:8020/tmp/hudi/hudi_cow_pt_tbl. Falling back to direct markers. Response code Time taken: 46.44 seconds spark-sql (default)> insert into hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='11') select 2, 'a2', 1000; 10:06 WARN: Timeline-server-based markers are not supported for HDFS: base path hdfs://node001:8020/tmp/hudi/hudi_cow_pt_tbl. Falling back to direct markers. 10:07 WARN: Timeline-server-based markers are not supported for HDFS: base path hdfs://node001:8020/tmp/hudi/hudi_cow_pt_tbl. Falling back to direct markers. 10:11 WARN: Timeline-server-based markers are not supported for HDFS: base path hdfs://node001:8020/tmp/hudi/hudi_cow_pt_tbl. Falling back to direct markers. Response code Time taken: 71.592 seconds spark-sql (default)> select * from hudi_mor_tbl; _hoodie_commit_time _hoodie_commit_seqno _hoodie_record_key _hoodie_partition_path _hoodie_file_name id name price ts 20240125160415749 20240125160415749_0_0 id:1 a81ca1da-f642-45c5-ac7e-eb93de803ba8-0_0-65-1253_20240125160415749.parquet 1 a1 20.0 1000 Time taken: 23.135 seconds, Fetched 1 row(s) spark-sql (default)> -- 向指定preCombineKey的表插入数据,则写操作为upsert spark-sql (default)> insert into hudi_mor_tbl select 1, 'a1_1', 20, 1001; 1353572 [main] WARN org.apache.hadoop.hive.conf.HiveConf - HiveConf of name hive.metastore.event.db.notification.api.auth does not exist 1353573 [main] WARN org.apache.hadoop.hive.conf.HiveConf - HiveConf of name hive.server2.active.passive.ha.enable does not exist Response code Time taken: 46.455 seconds spark-sql (default)> select id, name, price, ts from hudi_mor_tbl; id name price ts 1 a1_1 20.0 1001 Time taken: 2.081 seconds, Fetched 1 row(s) spark-sql (default)> set hoodie.sql.bulk.insert.enable=true; key value hoodie.sql.bulk.insert.enable true Time taken: 0.214 seconds, Fetched 1 row(s) spark-sql (default)> set hoodie.sql.insert.mode=non-strict; key value hoodie.sql.insert.mode non-strict Time taken: 0.027 seconds, Fetched 1 row(s) spark-sql (default)> insert into hudi_mor_tbl select 1, 'a1_2', 20, 1002; 1538483 [main] WARN org.apache.hadoop.hive.conf.HiveConf - HiveConf of name hive.metastore.event.db.notification.api.auth does not exist 1538486 [main] WARN org.apache.hadoop.hive.conf.HiveConf - HiveConf of name hive.server2.active.passive.ha.enable does not exist Response code Time taken: 42.906 seconds spark-sql (default)> select id, name, price, ts from hudi_mor_tbl; id name price ts 1 a1_2 20.0 1002 1 a1_1 20.0 1001 Time taken: 1.015 seconds, Fetched 2 row(s) spark-sql (default)> set hoodie.sql.bulk.insert.enable=false; key value hoodie.sql.bulk.insert.enable false Time taken: 2.396 seconds, Fetched 1 row(s) spark-sql (default)> create table hudi_cow_pt_tbl1 ( > id bigint, > name string, > ts bigint, > dt string, > hh string > ) using hudi > tblproperties ( > type = 'cow', > primaryKey = 'id', > preCombineField = 'ts' > ) > partitioned by (dt, hh) > location '/tmp/hudi/hudi_cow_pt_tbl1'; 1737608 [main] WARN org.apache.hadoop.hive.ql.session.SessionState - METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory. Response code Time taken: 12.013 seconds spark-sql (default)> insert into hudi_cow_pt_tbl1 select 1, 'a0', 1000, '2021-12-09', '10'; 1765548 [main] WARN org.apache.hudi.metadata.HoodieBackedTableMetadata - Metadata table was not found at path hdfs://node001:8020/tmp/hudi/hudi_cow_pt_tbl1/.hoodie/metadata 22:14 WARN: Timeline-server-based markers are not supported for HDFS: base path hdfs://node001:8020/tmp/hudi/hudi_cow_pt_tbl1. Falling back to direct markers. 22:15 WARN: Timeline-server-based markers are not supported for HDFS: base path hdfs://node001:8020/tmp/hudi/hudi_cow_pt_tbl1. Falling back to direct markers. 22:23 WARN: Timeline-server-based markers are not supported for HDFS: base path hdfs://node001:8020/tmp/hudi/hudi_cow_pt_tbl1. Falling back to direct markers. Response code Time taken: 64.552 seconds spark-sql (default)> select * from hudi_cow_pt_tbl1; _hoodie_commit_time _hoodie_commit_seqno _hoodie_record_key _hoodie_partition_path _hoodie_file_name id name ts dt hh 20240125162301446 20240125162301446_0_0 id:1 dt=2021-12-09/hh=10 1e1b2016-8b33-4d4b-a824-4cd53ab7e8ec-0_0-290-5658_20240125162301446.parquet 1a0 1000 2021-12-09 10 Time taken: 1.702 seconds, Fetched 1 row(s) spark-sql (default)> insert into hudi_cow_pt_tbl1 select 1, 'a1', 1001, '2021-12-09', '10'; 23:04 WARN: Timeline-server-based markers are not supported for HDFS: base path hdfs://node001:8020/tmp/hudi/hudi_cow_pt_tbl1. Falling back to direct markers. 23:05 WARN: Timeline-server-based markers are not supported for HDFS: base path hdfs://node001:8020/tmp/hudi/hudi_cow_pt_tbl1. Falling back to direct markers. 23:09 WARN: Timeline-server-based markers are not supported for HDFS: base path hdfs://node001:8020/tmp/hudi/hudi_cow_pt_tbl1. Falling back to direct markers. Response code Time taken: 16.112 seconds spark-sql (default)> select * from hudi_cow_pt_tbl1; _hoodie_commit_time _hoodie_commit_seqno _hoodie_record_key _hoodie_partition_path _hoodie_file_name id name ts dt hh 20240125162431677 20240125162431677_0_0 id:1 dt=2021-12-09/hh=10 1e1b2016-8b33-4d4b-a824-4cd53ab7e8ec-0_0-329-6292_20240125162431677.parquet 1a1 1001 2021-12-09 10 Time taken: 1.122 seconds, Fetched 1 row(s) spark-sql (default)> select * from hudi_cow_pt_tbl1 timestamp as of '20220307091628793' where id = 1; _hoodie_commit_time _hoodie_commit_seqno _hoodie_record_key _hoodie_partition_path _hoodie_file_name id name ts dt hh Time taken: 20.962 seconds spark-sql (default)>
034
4.3.4 更新数据
1)update
035
4.3.4 更新数据
2)MergeInto