解读Spark Datasource

网友投稿 964 2022-05-30

在日常的工作中,我们会接触到各种各样的数据库,他们存储的数据也是各式各样。当我们使用Spark去处理数据的时候,我们常常会遇到数据来源于不同数据源的情况。如果重新加载和保存数据的话,会非常的麻烦,还浪费空间,而且很多时候还需要考虑数据格式转换的问题。

为了解决这样的问题,Spark提供了一个框架,叫Datasource。我目前就在学习这个框架的内容,并尝试着用Spark Datesource这个框架实现一个与自建数据源连接的功能。

下面就是我通过学习记录的内容。

1.概念介绍

1.1 什么是Spark Datasource

Spark Datasource是连接外部数据源和Spark引擎的框架,是一个连接器。可以利用这个连接器来对外部的数据源进行一个读写的操作。华为云的DLI支持原生Spark的Datasource能力,并在其基础上进行了扩展。

DLI利用Spark Datasource构建的功能为跨源连接。当前跨源连接分为以下两种:

经典型跨源连接

DLI 经典型跨源连接可用于访问CloudTable的Hbase和OpenTSDB,MRS的OpenTSDB,DWS,RDS,CSS数据源。

增强型跨源连接

DLI 增强型跨源连接底层采用对等连接,直接打通DLI集群与目的数据源的vpc网络,通过点对点的方式实现数据互通,能够提供比经典型跨源更加灵活的使用场景与更加强劲的性能。增强型跨源支持所有DLI服务已实现的跨源业务,包括CloudTable的Hbase和OpenTSDB,MRS的OpenTSDB,DWS,RDS,CSS,DCS,DDS等数据源。并且通过Spark作业方式能够实现与自建数据源之间的访问。

1.2 Spark Datasource如何使用?

当你使用华为云DLI的跨源连接服务时,首先需要按照官网上面的知识将跨源连接创建好,将队列绑定好。然后就可以创建作业,然后用绑定好的这个队列来运行你的作业。本文主要讲述的是Spark Datasource,所以具体使用讲的是spark作业相关,以scala语言为例。

利用spark作业读取外部数据源的模式总共有两种:

DataFrame模式

利用DataFrame的API读取外部数据源的方式如下:

val jdbcDF = sparkSession .read //表示是读取数据(write就是写) .format("jdbc") //驱动类,这里连接的是jdbc数据源 .option("url", "jdbc:postgresql:dbserver") //option表示的是填入的参数。 .load()

SparkSql模式

利用Spark SQL的方式读取外部数据源的方式如下:

sparkSession.sql(" CREATE TABLE IF NOT EXISTS dli_to_rds //创建的spark sql表名 USING JDBC OPTIONS //驱动类,表明连接的是jdbc数据源 ( 'url'='jdbc:mysql://to-rds-1174404209-cA37siB6.datasource.com:3306', 'driver'='com.mysql.jdbc.Driver' //上述是数据源的一些具体参数。 )") sparkSession.sql("select * from dli_to_rds")

因为sql的简单易用以及易上手性,所以我们推荐大家使用Spark SQL模式。

2.运行原理

2.1 读流程

def read: DataFrameReader = new DataFrameReader(self)

read对应的是DataFrameReader,这是Spark Datasource读取外部数据源的入口。当我们使用sparkSession.read的时候,就会构造出一个DataFrameReader,这是后面参数设置的基础,其中内置了读取多种数据源的方法,还包括参数配置的接口。

def format(source: String): DataFrameReader = {this.source = source}

format需要填入的是数据源的名称,表明连接的是什么连接源。Spark原生支持的数据源有csv、orc、parquet、jdbc等,通过在format这里填写来表示连接的数据源具体是什么。除此之外,format这里也是匹配自建数据源的一个重要窗口。当我们需要连接自己的数据源时,需要设置源的名字是什么,也就是source是什么,设置好以后通过format这里的填写,Spark Datasource框架会利用Spi机制连接创建我们所需要的连接。

def option(key: String, value: String): DataFrameReader = { this.extraOptions = this.extraOptions + (key -> value)}

option的作用就是具体的参数的填写。以连接一个jdbc的数据源为例,需要填入的参数有:

url 这是jdbc数据源的host地址。

dbtable 这是具体要读取的表的表名。

user 这是数据源的用户名。

password 这是数据源的密码。

driver 这是数据源的驱动器,jdbc的驱动器就是com.mysql.jdbc.Driver。

.load()

最后通过这个load的方法真正建立Spark与数据源之间的连接,并且得到一个DataFrame。

解读Spark Datasource

2.2 写流程

def write: DataFrameWriter[T] = {new DataFrameWriter[T](this)}

写的流程是与读流程类似的,区别的是write返回的是一个DataFrameWrite类型对象,这是Spark Datasource写数据的入口。

2.3 连接是如何创建的?

这里主要以读取华为云JDBC数据源流程为例来讲解。

如果不关注Spark Datasource内部是如何运行的话,那么对用使用者来说,load完就是成功建立起连接并读取到外部数据源中的数据了。但是,如果关注Spark Datasource内部是如何运作的话,那么你就会知道,load对于这个流程来说,仅仅是个开始。

def load(paths: String*): DataFrame = { DataSource.lookupDataSourceV2(source, ....).map {......}.getOrElse(loadV1Source(paths: _*)) }

与load相关的函数主要有两个:

lookupDataSourceV2

这个函数的主要作用为将source name匹配为RelationProvider的名字。如果source name为jdbc,那么匹配到的名字就是“org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider”。

loadV1Source

这个函数的主要作用为利用上面函数得到的RelationProvider建立与外部数据源之间的连接,并读取option中设置的数据,最终返回一个包含了我们希望读取到的数据的DataFrame。

private def loadV1Source(paths: String*) = { ... // Code path for data source v1. sparkSession.baseRelationToDataFrame((...).resolveRelation()) }

loadV1Source中调用的函数主要为二:

resolveRelation

这个函数就是通过上面匹配后的source name来返回一个JdbcRelationProvider。在这个函数的内部,通过匹配来判断获取的RelationProvider类型,如果是外部的自建数据源,则会利用java的Spi机制来获取外部的RelationProvider。每个RelationProvider对象中都有着createRelation的功能,在这个函数内部,通过调用这些createRelation方法建立起了与外部数据源的连接,并利用之前的option获取了相关的数据信息。

baseRelationToDataFrame

这个函数则是将RelationProvider转换为DataFrame。

到这里,一个与外部数据源建立连接并获取相关数据的过程就完成了。

3.DLI是如何使用Spark Datasource的

DLI支持原生Spark的DataSource能力,并在其基础上进行了相应的扩展,能够利用spark作业去访问其他华为云的数据源并导入、查询和分析处理其中的数据。

目前支持DLI跨源访问的服务有:

云搜索服务CSS

分布式缓存服务DCS

文档数据库服务DDS

文档数据库服务DDS

云数据库RDS

MapReduce服务MRS

云数据库RDS

上述服务中,CSS集群存在着安全和非安全两种情况、MRS集群存在着是否开启Kerberos认证的情况,DLI均支持。只需要按照相关的指南配置连接文件和参数,即可利用DLI对CSS、MRS集群进行数据的操作。

数据湖探索 DLI

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:代理服务器和 Web 服务器通信中的 504 问题
下一篇:Cassandra copy命令使用指南
相关文章