深入浅出SqoopJava Client API 使用指南

网友投稿 471 2022-05-30

Sqoop Java Client API 使用指南

简介

Sqoop是一个在Hadoop和关系型数据库之间被用来传输数据的工具,官网地址sqoop.apache.org,当前最新版本是2.0,本文根据官网文档,基于1.99.7版本,简单介绍其Java Client API使用。接下来要介绍的所有方法都在sqoop源码中的SqoopClient类中被打包。

迁移流程

sqoop的迁移流程简单来说就是将源端连接器的数据通过sqoop迁移到目的端,这个数据可能是mysql的一张表,也可能是hdfs上一个文件。具体来说可分为以下三步:

根据sqoop指定的connector name,分别创建源端(From)和目的端(To)对应的Link Object;

根据From和To连接器的名字,创建一个sqoop作业;

根据作业名,启动作业。

开发指南

本文主要介绍API开发指南,已假定你已准备好jdk环境,maven环境,以及sqoop server。

org.apache.sqoop

sqoop-client

${requestedVersion}

根据默认的url初始化一个SqoopClient对象

String url = "http://localhost:12000/sqoop/";

SqoopClient client = new SqoopClient(url);

如果想要修改url可调用SqoopClient的setServerUrl方法。

连接器(Connectors)提供了一个可以与数据源交互的平台,一个连接器可以有一个或者多个链接(Link)去连接指定的数据源,如mysql,hive等。因此,Sqoop的迁移过程就可以看做是利用From端的Link读取数据,在由To端的Link写入数据。SqoopClient中提供了创建、修改和删除Link的接口。用户可以根据自己需要去使用。

下面将通过代码简单来介绍这个过程

1) 根据Connector name创建Link,

MLink link = Client.createLink("connectorName");

link.setName("mysql_link");

link.setCreationUser("Ruby");

MLink是一个Model,表示一个Link Object,它又包含了一个MLinkConfig模型,用以保存数据源的连接信息,具体可以查看sqoop源码common包

2) 利用这个新建的Link去连接mysql数据库

MLinkConfig linkConfig = link.getConnectorLinkConfig();

linkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://localhost/xxx");

linkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");

linkConfig.getStringInput("linkConfig.username").setValue("root");

linkConfig.getStringInput("linkConfig.password").setValue("root");

3) 保存连接

// save the link object that was filled

Status status = client.saveLink(link);

if(status.canProceed()) {

System.out.println("Created Link with Link Name : " + link.getName());

} else {

System.out.println("Something went wrong creating the link");

}

这里的status.canProceed()返回ture当且仅当连接配置参数验证通过。

4)获取连接

getLink("linkName"),返回指定Link

getLinks(),以一个list返回所有创建的Links。

一个sqoop作业需要有一个From端的Link和一个To端的Link,每个Link的name都是唯一的。新建作业时,首先指定FromLinkName和ToLinkName,然后分别设置MFromJobConfig和ToJobConfig,例如源端的数据库名表名,目的端的数据库名和表名。此外,每个作业还要设置driver configs,例如指定job提交到MapReduce后,需要设置多少个mappers。

下面将用代码详细介绍这个过程:

1) 新建作业,指定作业

String url = "http://localhost:12000/sqoop/";

SqoopClient client = new SqoopClient(url);

//Creating dummy job object

MJob job = client.createJob("fromLinkName", "toLinkName");

job.setName("Vampire");

job.setCreationUser("Buffy");

2)设置From的作业配置

// set the "FROM" link job config values

MFromConfig fromJobConfig = job.getFromJobConfig();

fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("sqoop");

fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("sqoop");

fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id");

3)设置To的作业配置

// set the "TO" link job config values

MToConfig toJobConfig = job.getToJobConfig();

toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/usr/tmp");

4)设置driver Config

// set the driver config values

MDriverConfig driverConfig = job.getDriverConfig();

driverConfig.getStringInput("throttlingConfig.numExtractors").setValue("3");

5)保存job

Status status = client.saveJob(job);

if(status.canProceed()) {

System.out.println("Created Job with Job Name: "+ job.getName());

} else {

System.out.println("Something went wrong creating the job");

}

其中status有三种状态

OK: 状态正常,没有任何告警

WARNING:个别参数验证不通过,但不致命

ERROR:参数验证失败,无法启动作业

6)获取作业

getJob("jobName"),返回指定作业

getJobs(),以一个list返回所有创建的作业。

根据作业名启动作业,如果作业成功启动,状态返回BOOTING或者RUNNING 1)启动作业

//Job start

MSubmission submission = client.startJob("jobName");

System.out.println("Job Submission Status : " + submission.getStatus());

if(submission.getStatus().isRunning() && submission.getProgress() != -1) {

System.out.println("Progress : " + String.format("%.2f %%", submission.getProgress() * 100));

}

System.out.println("Hadoop job id :" + submission.getExternalId());

System.out.println("Job link : " + submission.getExternalLink());

2) 打印作业的统计信息

Counters counters = submission.getCounters();

if(counters != null) {

System.out.println("Counters:");

for(CounterGroup group : counters) {

System.out.print("\t");

System.out.println(group.getName());

for(Counter counter : group) {

System.out.print("\t\t");

System.out.print(counter.getName());

System.out.print(": ");

System.out.println(counter.getValue());

}

}

}

深入浅出Sqoop之Java Client API 使用指南

if(submission.getExceptionInfo() != null) {

System.out.println("Exception info : " +submission.getExceptionInfo());

}

2)查看作业状态

//Check job status for a running job

MSubmission submission = client.getJobStatus("jobName");

if(submission.getStatus().isRunning() && submission.getProgress() != -1) {

System.out.println("Progress : " + String.format("%.2f %%", submission.getProgress() * 100));

}

3)停止作业

//Stop a running job

submission.stopJob("jobName");

上述提交到作业的过程是异步的,如果想同步提交作业,可以使用startJob(jobName, callback, pollTime)方法,其中callback可以为null如果你不需要提交状态

参考文献

https://github.com/apache/sqoop/tree/branch1.99.7/docs/src/site/sphinx/dev/ConnectorDevelopment.rst

API Java

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Arthas从入门到实践
下一篇:Python Socket 的更多方法介绍
相关文章