Java的语言特点是什么(java语言的主要特点是什么)
540
2022-05-30
Sqoop Java Client API 使用指南
简介
Sqoop是一个在Hadoop和关系型数据库之间被用来传输数据的工具,官网地址sqoop.apache.org,当前最新版本是2.0,本文根据官网文档,基于1.99.7版本,简单介绍其Java Client API使用。接下来要介绍的所有方法都在sqoop源码中的SqoopClient类中被打包。
迁移流程
sqoop的迁移流程简单来说就是将源端连接器的数据通过sqoop迁移到目的端,这个数据可能是mysql的一张表,也可能是hdfs上一个文件。具体来说可分为以下三步:
根据sqoop指定的connector name,分别创建源端(From)和目的端(To)对应的Link Object;
根据From和To连接器的名字,创建一个sqoop作业;
根据作业名,启动作业。
开发指南
本文主要介绍API开发指南,已假定你已准备好jdk环境,maven环境,以及sqoop server。
根据默认的url初始化一个SqoopClient对象
String url = "http://localhost:12000/sqoop/";
SqoopClient client = new SqoopClient(url);
如果想要修改url可调用SqoopClient的setServerUrl方法。
连接器(Connectors)提供了一个可以与数据源交互的平台,一个连接器可以有一个或者多个链接(Link)去连接指定的数据源,如mysql,hive等。因此,Sqoop的迁移过程就可以看做是利用From端的Link读取数据,在由To端的Link写入数据。SqoopClient中提供了创建、修改和删除Link的接口。用户可以根据自己需要去使用。
下面将通过代码简单来介绍这个过程
1) 根据Connector name创建Link,
MLink link = Client.createLink("connectorName");
link.setName("mysql_link");
link.setCreationUser("Ruby");
MLink是一个Model,表示一个Link Object,它又包含了一个MLinkConfig模型,用以保存数据源的连接信息,具体可以查看sqoop源码common包
2) 利用这个新建的Link去连接mysql数据库
MLinkConfig linkConfig = link.getConnectorLinkConfig();
linkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://localhost/xxx");
linkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
linkConfig.getStringInput("linkConfig.username").setValue("root");
linkConfig.getStringInput("linkConfig.password").setValue("root");
3) 保存连接
// save the link object that was filled
Status status = client.saveLink(link);
if(status.canProceed()) {
System.out.println("Created Link with Link Name : " + link.getName());
} else {
System.out.println("Something went wrong creating the link");
}
这里的status.canProceed()返回ture当且仅当连接配置参数验证通过。
4)获取连接
getLink("linkName"),返回指定Link
getLinks(),以一个list返回所有创建的Links。
一个sqoop作业需要有一个From端的Link和一个To端的Link,每个Link的name都是唯一的。新建作业时,首先指定FromLinkName和ToLinkName,然后分别设置MFromJobConfig和ToJobConfig,例如源端的数据库名表名,目的端的数据库名和表名。此外,每个作业还要设置driver configs,例如指定job提交到MapReduce后,需要设置多少个mappers。
下面将用代码详细介绍这个过程:
1) 新建作业,指定作业
String url = "http://localhost:12000/sqoop/";
SqoopClient client = new SqoopClient(url);
//Creating dummy job object
MJob job = client.createJob("fromLinkName", "toLinkName");
job.setName("Vampire");
job.setCreationUser("Buffy");
2)设置From的作业配置
// set the "FROM" link job config values
MFromConfig fromJobConfig = job.getFromJobConfig();
fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("sqoop");
fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("sqoop");
fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id");
3)设置To的作业配置
// set the "TO" link job config values
MToConfig toJobConfig = job.getToJobConfig();
toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/usr/tmp");
4)设置driver Config
// set the driver config values
MDriverConfig driverConfig = job.getDriverConfig();
driverConfig.getStringInput("throttlingConfig.numExtractors").setValue("3");
5)保存job
Status status = client.saveJob(job);
if(status.canProceed()) {
System.out.println("Created Job with Job Name: "+ job.getName());
} else {
System.out.println("Something went wrong creating the job");
}
其中status有三种状态
OK: 状态正常,没有任何告警
WARNING:个别参数验证不通过,但不致命
ERROR:参数验证失败,无法启动作业
6)获取作业
getJob("jobName"),返回指定作业
getJobs(),以一个list返回所有创建的作业。
根据作业名启动作业,如果作业成功启动,状态返回BOOTING或者RUNNING 1)启动作业
//Job start
MSubmission submission = client.startJob("jobName");
System.out.println("Job Submission Status : " + submission.getStatus());
if(submission.getStatus().isRunning() && submission.getProgress() != -1) {
System.out.println("Progress : " + String.format("%.2f %%", submission.getProgress() * 100));
}
System.out.println("Hadoop job id :" + submission.getExternalId());
System.out.println("Job link : " + submission.getExternalLink());
2) 打印作业的统计信息
Counters counters = submission.getCounters();
if(counters != null) {
System.out.println("Counters:");
for(CounterGroup group : counters) {
System.out.print("\t");
System.out.println(group.getName());
for(Counter counter : group) {
System.out.print("\t\t");
System.out.print(counter.getName());
System.out.print(": ");
System.out.println(counter.getValue());
}
}
}
if(submission.getExceptionInfo() != null) {
System.out.println("Exception info : " +submission.getExceptionInfo());
}
2)查看作业状态
//Check job status for a running job
MSubmission submission = client.getJobStatus("jobName");
if(submission.getStatus().isRunning() && submission.getProgress() != -1) {
System.out.println("Progress : " + String.format("%.2f %%", submission.getProgress() * 100));
}
3)停止作业
//Stop a running job
submission.stopJob("jobName");
上述提交到作业的过程是异步的,如果想同步提交作业,可以使用startJob(jobName, callback, pollTime)方法,其中callback可以为null如果你不需要提交状态
参考文献
https://github.com/apache/sqoop/tree/branch1.99.7/docs/src/site/sphinx/dev/ConnectorDevelopment.rst
API Java
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。