以java API方式提交spark作业

网友投稿 1391 2022-05-30

一、文章背景

在初期学习Spark的时候是以命令行的方式提交Job到集群环境中运行的,试想当一个作业需要重复去执行的时候且linux脚本不会搞,是不是很尴尬!随着对spark的深入了解和查看官网提供的文档示例,了解到spark提供了以sparkLauncher作为spark job提交的唯一入口,可以用Java API编程的方式提交spark job,可以在IDEA中通过创建sparkLauncher对象,进行参数设置后直接点击Run 运行包含Job的Main类就能成功提交job进行运行。还可以集成到spring项目中,避免了以拼接cmd命令的方式集成到项目中带来的安全隐患。

二、实现样例

2.1 主函数样例

public class TestSparkLauncher {

public static void main(String[] args) throws IOException, InterruptedException {

// 用于配置运行spark的环境变量

HashMap env = new HashMap();

env.put("HADOOP_CONF_DIR", "环境上安装的hadoop配置文件目录");

env.put("JAVA_HOME", "环境上的java home");

// 用于指定spark运行时使用的配置文件,默认加载的是环境上安装的spark home下的conf目录

以java API方式提交spark作业

env.put("SPARK_CONF_DIR", "自定义的spark配置文件目录");

SparkLauncher sparkLauncher = new SparkLauncher(env);

sparkLauncher.setAppName("spark job 名称");

sparkLauncher.setAppResource(" spark jar包在hdfs上的路径");

sparkLauncher.setSparkHome("环境上安装的spark路径");

sparkLauncher.setMainClass(" spark jar包的运行主函数名称");

sparkLauncher.setDeployMode("spark 运行模式 client 或 cluster 二选一");

// 提交spark job 获取process

Process process = sparkLauncher.launch();

// client模式下用于输出运行日志

InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(process.getInputStream(), "input");

Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input");

inputThread.start();

InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(process.getErrorStream(), "error");

Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error");

errorThread.start();

System.out.println("Waiting for finish...");

// client模式下用于监控spark job 运行结果

int exitCode = process.waitFor();

System.out.println("Finished! Exit code:" + exitCode);

}

2.2 记录日志线程样例

public class InputStreamReaderRunnable implements Runnable {

private BufferedReader reader;

private String name;

public InputStreamReaderRunnable(InputStream is, String name) {

this.reader = new BufferedReader(new InputStreamReader(is));

this.name = name;

}

public void run() {

System.out.println("InputStream_" + name + ":");

try {

String line = reader.readLine();

while (line != null) {

System.out.println(line);

line = reader.readLine();

}

reader.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

三、选用这种方式的优劣

优势:通过SparkLanuncher.lanunch()方法获取一个process进程,然后调用进程的process.waitFor()方法等待线程返回结果,获取的输出信息一切都在掌握之中;

劣势:使用这种方式需要自己管理运行过程中的输出信息,比较麻烦。

四、实现过程中遇到的问题

4.1.运行时找不到java_home

在用于配置spark的运行时环境变量的env集合中添加java_home配置或者在sparkLauncher对象内setJavaHome

4.2.开启kerberos认证后,job提交运行失败

在sparkLauncher对象中setConf中以key-value形式配置认证文件及名称

4.3.spark版本兼容较差,

如果日志文件中出现序列化ID不想等的问题,请查看集成的spring项目中的sparkjar包是否与环境安装的spark版本一致。

以上为项目实现过程demo以及部分问题总结,不足之处,请多多指教。

Java API spark

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:零代码美食分类模型开发
下一篇:《OpenStack高可用集群(上册):原理与架构》—2.4.2 Mirantis OpenStack自定义高可用集群架构
相关文章