iceberg1.4.2 +minio通过spark创建表,插入数据
温馨提示:这篇文章已超过397天没有更新,请注意相关的内容是否还可用!
iceberg 是一种开放的表格式管理,解决大数据数据中结构化,非结构化和半结构化不统一的问题。主要是通过对表的管理实现增删改查,同时支持历史回滚(版本旅行)等操作。下层支持hadoop,s3,对象存储,上层支持hive,spark,flink 等应用。实现在中间把两部分隔离开来,实现一种对接和数据管理的标准。有这个标准,不管是谁建的表,都可以操作和访问。比如我用spark创建表,flink去读取的时候,可以读取到数据。不存在组件不同无法识别的情况。
(图片来源网络,侵删)
在idea进行pom.xml配置
4.0.0
org.gbicc
bigdata
1.0-SNAPSHOT
2008
2.12.18
scala-tools.org
Scala-Tools Maven2 Repository
http://scala-tools.org/repo-releases
scala-tools.org
Scala-Tools Maven2 Repository
http://scala-tools.org/repo-releases
org.scala-lang
scala-library
${scala.version}
junit
junit
4.4
test
org.specs
specs
1.2.5
test
org.apache.iceberg
iceberg-core
1.4.2
io.minio
minio
8.5.7
com.amazonaws
aws-java-sdk-s3
1.12.620
org.apache.hadoop
hadoop-aws
3.2.2
org.apache.hadoop
hadoop-common
3.2.2
org.apache.iceberg
iceberg-data
1.4.2
org.apache.spark
spark-core_2.12
3.4.2
org.apache.spark
spark-sql_2.12
3.4.2
org.apache.spark
spark-streaming_2.12
3.4.2
org.apache.iceberg
iceberg-spark
1.4.2
org.apache.iceberg
iceberg-spark-runtime-3.4_2.12
1.4.2
com.fasterxml.jackson.core
jackson-databind
2.14.2
org.apache.iceberg
iceberg-data
1.4.2
com.amazonaws
aws-java-sdk-s3
1.12.620
org.apache.hadoop
hadoop-aws
3.2.2
org.apache.iceberg
iceberg-aws
1.4.2
com.amazonaws
aws-java-sdk-bundle
1.11.375
org.apache.iceberg
iceberg-parquet
1.4.2
io.delta
delta-core_2.12
2.4.0
io.delta
delta-spark_2.12
3.0.0
org.scala-tools
maven-scala-plugin
${scala.version}
下面进行代码编写
package org.icebergtest
import org.apache.iceberg.{PartitionSpec, Schema}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.spark.SparkSchemaUtil
import org.apache.iceberg.types.Types
import org.apache.spark.sql.types._
import org.apache.iceberg._
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.data.GenericRecord
import org.apache.iceberg.types.{Types => _, _}
object icebergspark {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local").appName("test")
/* .config("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
.config("spark.hadoop.fs.s3a.access.key", "minioadmin")
.config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
.config("spark.hadoop.fs.s3a.endpoint", "http://127.0.0.1:9000")
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
.config("spark.hadoop.fs.s3a.path.style.access", "true")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.debug.maxToStringFields", "2048")*/
.config("spark.hadoop.fs.s3a.access.key", "minioadmin")
.config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
.config("spark.hadoop.spark.hadoop.fs.s3a.endpoint", "http://127.0.0.1:9000")
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
.config("spark.hadoop.fs.s3a.path.style.access", "true")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
//指定hadoop catalog,catalog名称为hadoop_prod
.config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hadoop_prod.type", "hadoop")
.config("spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.access.key", "minioadmin")
.config("spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.secret.key", "minioadmin")
.config("spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.endpoint", "http://127.0.0.1:9000")
.config("spark.sql.catalog.hadoop_prod.warehouse", "s3a://test1/")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.getOrCreate()
import org.apache.iceberg.spark.SparkSessionCatalog
// 将 Iceberg 的 SparkSessionCatalog 注册到 Spark 中// 将 Iceberg 的 SparkSessionCatalog 注册到 Spark 中
// 将 Iceberg 的 SparkSessionCatalog 注册到 Spark 中
//1.创建Iceberg表,并插入数据
//spark.sql("create table hadoop_prod.mydb.mytest (id int,name string,age int) using iceberg".stripMargin)
spark.sql(
"""
|insert into hadoop_prod.mydb.mytest values (1,"zs",18),(2,"ls",19),(3,"ww",20)
""".stripMargin)
//1.SQL 方式读取Iceberg中的数据
// spark.sql("select * from hadoop_prod.mydb.mytest").show()
spark.sql(
"""
|select * from hadoop_prod.mydb.mytest VERSION AS OF 4696493712637386339;
""".stripMargin).show()
/**
* 2.使用Spark查询Iceberg中的表除了使用sql 方式之外,还可以使用DataFrame方式,建议使用SQL方式
*/
//第一种方式使用DataFrame方式查询Iceberg表数据snapshots,history,manifests,files
val frame1: DataFrame = spark.table("hadoop_prod.mydb.mytest.snapshots")
frame1.show()
val frame2: DataFrame = spark.table("hadoop_prod.mydb.mytest.history")
frame2.show()
// spark.read.option("snapshot-id","4696493712637386339"). format("iceberg").load("3a://test/mydb/mytest")
//第二种方式使用DataFrame加载 Iceberg表数据
val frame3: DataFrame = spark.read.format("iceberg").load("hadoop_prod.mydb.mytest")
frame3.show()
}
}
通过上面的例子,直接复制执行
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!
