iceberg1.4.2 +minio通过spark创建表,插入数据

2024-03-08 1286阅读

温馨提示:这篇文章已超过397天没有更新,请注意相关的内容是否还可用!

iceberg 是一种开放的表格式管理,解决大数据数据中结构化,非结构化和半结构化不统一的问题。主要是通过对表的管理实现增删改查,同时支持历史回滚(版本旅行)等操作。下层支持hadoop,s3,对象存储,上层支持hive,spark,flink 等应用。实现在中间把两部分隔离开来,实现一种对接和数据管理的标准。有这个标准,不管是谁建的表,都可以操作和访问。比如我用spark创建表,flink去读取的时候,可以读取到数据。不存在组件不同无法识别的情况。

iceberg1.4.2 +minio通过spark创建表,插入数据
(图片来源网络,侵删)

在idea进行pom.xml配置

  4.0.0
  org.gbicc
  bigdata
  1.0-SNAPSHOT
  2008
  
    2.12.18
  
  
    
      scala-tools.org
      Scala-Tools Maven2 Repository
      http://scala-tools.org/repo-releases
    
  
  
    
      scala-tools.org
      Scala-Tools Maven2 Repository
      http://scala-tools.org/repo-releases
    
  
  
    
      org.scala-lang
      scala-library
      ${scala.version}
    
    
      junit
      junit
      4.4
      test
    
    
      org.specs
      specs
      1.2.5
      test
    
    
    
    
      org.apache.iceberg
      iceberg-core
      1.4.2
    
    
      io.minio
      minio
      8.5.7
    
    
    
      com.amazonaws
      aws-java-sdk-s3
      1.12.620
    
    
      org.apache.hadoop
      hadoop-aws
      3.2.2
    
    
      org.apache.hadoop
      hadoop-common
      3.2.2
    
    
    
      org.apache.iceberg
      iceberg-data
      1.4.2
    
    
    org.apache.spark
    spark-core_2.12
    3.4.2 
  
    
      org.apache.spark
      spark-sql_2.12
      3.4.2 
    
    
      org.apache.spark
      spark-streaming_2.12
      3.4.2 
    
    
    
      org.apache.iceberg
      iceberg-spark
      1.4.2
    
      
      
          org.apache.iceberg
          iceberg-spark-runtime-3.4_2.12
          1.4.2
      
    
      com.fasterxml.jackson.core
      jackson-databind
      2.14.2
    
    
      org.apache.iceberg
      iceberg-data
      1.4.2
    
    
      com.amazonaws
      aws-java-sdk-s3
      1.12.620
    
    
      org.apache.hadoop
      hadoop-aws
      3.2.2
    
    
      org.apache.iceberg
      iceberg-aws
      1.4.2
    
    
      com.amazonaws
      aws-java-sdk-bundle
      1.11.375
    
    
      org.apache.iceberg
      iceberg-parquet
      1.4.2
    
    
      io.delta
      delta-core_2.12
      2.4.0
    
    
      io.delta
      delta-spark_2.12
      3.0.0
    
  
  
    
      
        org.scala-tools
        maven-scala-plugin
        
          ${scala.version}
        
      
    
  

下面进行代码编写

package org.icebergtest
import org.apache.iceberg.{PartitionSpec, Schema}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.spark.SparkSchemaUtil
import org.apache.iceberg.types.Types
import org.apache.spark.sql.types._
import org.apache.iceberg._
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.data.GenericRecord
import org.apache.iceberg.types.{Types => _, _}
object icebergspark {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().master("local").appName("test")
      /* .config("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
       .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
       .config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
       .config("spark.hadoop.fs.s3a.endpoint", "http://127.0.0.1:9000")
       .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
       .config("spark.hadoop.fs.s3a.path.style.access", "true")
       .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
       .config("spark.debug.maxToStringFields", "2048")*/
      .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
      .config("spark.hadoop.fs.s3a.secret.key", "minioadmin")
      .config("spark.hadoop.spark.hadoop.fs.s3a.endpoint", "http://127.0.0.1:9000")
      .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
      .config("spark.hadoop.fs.s3a.path.style.access", "true")
      .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
      .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
      .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
      //指定hadoop catalog,catalog名称为hadoop_prod
      .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
      .config("spark.sql.catalog.hadoop_prod.type", "hadoop")
      .config("spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.access.key", "minioadmin")
        .config("spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.secret.key", "minioadmin")
        .config("spark.sql.catalog.hadoop_prod.hadoop.fs.s3a.endpoint", "http://127.0.0.1:9000")
      .config("spark.sql.catalog.hadoop_prod.warehouse", "s3a://test1/")
      .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      .getOrCreate()
    import org.apache.iceberg.spark.SparkSessionCatalog
    // 将 Iceberg 的 SparkSessionCatalog 注册到 Spark 中// 将 Iceberg 的 SparkSessionCatalog 注册到 Spark 中
    // 将 Iceberg 的 SparkSessionCatalog 注册到 Spark 中
    //1.创建Iceberg表,并插入数据
    //spark.sql("create table hadoop_prod.mydb.mytest (id int,name string,age int) using iceberg".stripMargin)
    spark.sql(
      """
        |insert into hadoop_prod.mydb.mytest values (1,"zs",18),(2,"ls",19),(3,"ww",20)
      """.stripMargin)
    //1.SQL 方式读取Iceberg中的数据
   // spark.sql("select * from hadoop_prod.mydb.mytest").show()
    spark.sql(
      """
        |select * from hadoop_prod.mydb.mytest VERSION AS OF 4696493712637386339;
      """.stripMargin).show()
    /**
      * 2.使用Spark查询Iceberg中的表除了使用sql 方式之外,还可以使用DataFrame方式,建议使用SQL方式
      */
    //第一种方式使用DataFrame方式查询Iceberg表数据snapshots,history,manifests,files
  val frame1: DataFrame = spark.table("hadoop_prod.mydb.mytest.snapshots")
   frame1.show()
    val frame2: DataFrame = spark.table("hadoop_prod.mydb.mytest.history")
    frame2.show()
   // spark.read.option("snapshot-id","4696493712637386339"). format("iceberg").load("3a://test/mydb/mytest")
    //第二种方式使用DataFrame加载 Iceberg表数据
   val frame3: DataFrame = spark.read.format("iceberg").load("hadoop_prod.mydb.mytest")
   frame3.show()
  }
}

通过上面的例子,直接复制执行

VPS购买请点击我

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!

目录[+]