MRS 3.X集群Spark on CloudTable使用指导

网友投稿 747 2022-05-30

1. 参考官方文档-使用2.x及之前的开发指南(https://support.huaweicloud.com/devg-mrs/mrs_06_0187.html),开发指南(适用于2.x及之前)->Spark应用开发章节->Spark on HBase,将样例代码的pom文件hbase.version 配置成1.3.1-mrs-1.9.0,样例大妈中对应的CreateTable、TableInputData、TableOutputData类换成如下内容。(其中hbase.ZooKeeper.quorum值根据不同CloudTable集群会不同,具体的获取联系CloudTableSRE)

CreateTable类如下:

package com.huawei.bigdata.spark.examples;

import java.io.File;

import java.io.IOException;

import java.util.Iterator;

import java.util.List;

import scala.Tuple2;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.Cell;

import org.apache.hadoop.hbase.CellUtil;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableInputFormat;

import org.apache.hadoop.hbase.protobuf.ProtobufUtil;

import org.apache.hadoop.hbase.util.Base64;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaSparkContext;

import com.huawei.hadoop.security.LoginUtil;

/**

* Get data from table.

*/

public class TableOutputData {

public static void main(String[] args) throws IOException {

Configuration hadoopConf = new Configuration();

if("kerberos".equalsIgnoreCase(hadoopConf.get("hadoop.security.authentication"))){

//security mode

final String userPrincipal = "sparkuser";

final String USER_KEYTAB_FILE = "user.keytab";

String filePath = System.getProperty("user.dir") + File.separator + "conf" + File.separator;

String krbFile = filePath + "krb5.conf";

String userKeyTableFile = filePath + USER_KEYTAB_FILE;

String ZKServerPrincipal = "zookeeper/hadoop.hadoop.com";

String ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME = "Client";

String ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal";

LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, userPrincipal, userKeyTableFile);

LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZKServerPrincipal);

LoginUtil.login(userPrincipal, userKeyTableFile, krbFile, hadoopConf);;

}

System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

System.setProperty("spark.kryo.registrator", "com.huawei.bigdata.spark.examples.MyRegistrator");

// Create the configuration parameter to connect the HBase. The hbase-site.xml must be included in the classpath.

SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo");

JavaSparkContext jsc = new JavaSparkContext(conf);

Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration());

String ip = "cloudtable-yan-zk1-4TZKF7vD.mycloudtable.com";

String ip1 = "cloudtable-yan-zk2-xOogaSa3.mycloudtable.com";

String ip2 = "cloudtable-yan-zk3-7ajHEN3S.mycloudtable.com";

hbConf.set("hbase.zookeeper.quorum",ip);

hbConf.set("hbase.zookeeper.quorum",ip1);

hbConf.set("hbase.zookeeper.quorum",ip2);

// Declare the information of the table to be queried.

Scan scan = new org.apache.hadoop.hbase.client.Scan();

scan.addFamily(Bytes.toBytes("info"));

org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan proto = ProtobufUtil.toScan(scan);

String scanToString = Base64.encodeBytes(proto.toByteArray());

hbConf.set(TableInputFormat.INPUT_TABLE, "shb1");

hbConf.set(TableInputFormat.SCAN, scanToString);

// Obtain the data in the table through the Spark interface.

JavaPairRDD rdd = jsc.newAPIHadoopRDD(hbConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);

// Traverse every row in the HBase table and print the results.

List> rddList = rdd.collect();

for (int i = 0; i < rddList.size(); i++) {

Tuple2 t2 = rddList.get(i);

ImmutableBytesWritable key = t2._1();

Iterator it = t2._2().listCells().iterator();

while (it.hasNext()) {

Cell c = it.next();

String family = Bytes.toString(CellUtil.cloneFamily(c));

String qualifier = Bytes.toString(CellUtil.cloneQualifier(c));

String value = Bytes.toString(CellUtil.cloneValue(c));

Long tm = c.getTimestamp();

System.out.println(" Family=" + family + " Qualifier=" + qualifier + " Value=" + value + " TimeStamp=" + tm);

}

}

jsc.stop();

}

}

TableInputData类如下:

package com.huawei.bigdata.spark.examples;

import java.io.File;

import java.io.IOException;

import java.util.List;

import scala.Tuple4;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.TableName;

import org.apache.hadoop.hbase.client.*;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import com.huawei.hadoop.security.LoginUtil;

/**

* Input data to hbase table.

*/

public class TableInputData {

public static void main(String[] args) throws IOException {

Configuration hadoopConf = new Configuration();

if("kerberos".equalsIgnoreCase(hadoopConf.get("hadoop.security.authentication"))){

//security mode

final String userPrincipal = "sparkuser";

final String USER_KEYTAB_FILE = "user.keytab";

String filePath = System.getProperty("user.dir") + File.separator + "conf" + File.separator;

String krbFile = filePath + "krb5.conf";

String userKeyTableFile = filePath + USER_KEYTAB_FILE;

String ZKServerPrincipal = "zookeeper/hadoop.hadoop.com";

String ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME = "Client";

String ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal";

LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, userPrincipal, userKeyTableFile);

LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZKServerPrincipal);

LoginUtil.login(userPrincipal, userKeyTableFile, krbFile, hadoopConf);;

}

// Create the configuration parameter to connect the HBase.

SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo");

JavaSparkContext jsc = new JavaSparkContext(conf);

Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration());

String ip = "cloudtable-yan-zk1-4TZKF7vD.mycloudtable.com";

String ip1 = "cloudtable-yan-zk2-xOogaSa3.mycloudtable.com";

String ip2 = "cloudtable-yan-zk3-7ajHEN3S.mycloudtable.com";

hbConf.set("hbase.zookeeper.quorum",ip);

hbConf.set("hbase.zookeeper.quorum",ip1);

hbConf.set("hbase.zookeeper.quorum",ip2);

// Declare the information of the table.

Table table = null;

String tableName = "shb1";

byte[] familyName = Bytes.toBytes("info");

Connection connection = null;

try {

// Connect to the HBase.

connection = ConnectionFactory.createConnection(hbConf);

// Obtain the table object.

table = connection.getTable(TableName.valueOf(tableName));

List> data = jsc.textFile(args[0]).map(

new Function>() {

public Tuple4 call(String s) throws Exception {

String[] tokens = s.split(",");

return new Tuple4(tokens[0], tokens[1], tokens[2], tokens[3]);

}

}).collect();

Integer i = 0;

for (Tuple4 line : data) {

Put put = new Put(Bytes.toBytes("row" + i));

put.addColumn(familyName, Bytes.toBytes("c11"), Bytes.toBytes(line._1()));

put.addColumn(familyName, Bytes.toBytes("c12"), Bytes.toBytes(line._2()));

put.addColumn(familyName, Bytes.toBytes("c13"), Bytes.toBytes(line._3()));

put.addColumn(familyName, Bytes.toBytes("c14"), Bytes.toBytes(line._4()));

i += 1;

table.put(put);

}

} catch (IOException e) {

e.printStackTrace();

} finally {

if (table != null) {

try {

// Close the HTable.

table.close();

} catch (IOException e) {

e.printStackTrace();

}

}

if (connection != null) {

try {

// Close the HBase connection.

connection.close();

} catch (IOException e) {

e.printStackTrace();

}

}

jsc.stop();

}

}

}

TableOutputData类如下:

package com.huawei.bigdata.spark.examples;

import java.io.File;

import java.io.IOException;

import java.util.Iterator;

import java.util.List;

import scala.Tuple2;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.Cell;

import org.apache.hadoop.hbase.CellUtil;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableInputFormat;

import org.apache.hadoop.hbase.protobuf.ProtobufUtil;

import org.apache.hadoop.hbase.util.Base64;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaSparkContext;

import com.huawei.hadoop.security.LoginUtil;

/**

* Get data from table.

*/

public class TableOutputData {

public static void main(String[] args) throws IOException {

Configuration hadoopConf = new Configuration();

if("kerberos".equalsIgnoreCase(hadoopConf.get("hadoop.security.authentication"))){

//security mode

final String userPrincipal = "sparkuser";

final String USER_KEYTAB_FILE = "user.keytab";

String filePath = System.getProperty("user.dir") + File.separator + "conf" + File.separator;

String krbFile = filePath + "krb5.conf";

String userKeyTableFile = filePath + USER_KEYTAB_FILE;

String ZKServerPrincipal = "zookeeper/hadoop.hadoop.com";

String ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME = "Client";

String ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal";

LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, userPrincipal, userKeyTableFile);

LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZKServerPrincipal);

LoginUtil.login(userPrincipal, userKeyTableFile, krbFile, hadoopConf);;

}

System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

System.setProperty("spark.kryo.registrator", "com.huawei.bigdata.spark.examples.MyRegistrator");

// Create the configuration parameter to connect the HBase. The hbase-site.xml must be included in the classpath.

SparkConf conf = new SparkConf().setAppName("CollectFemaleInfo");

JavaSparkContext jsc = new JavaSparkContext(conf);

Configuration hbConf = HBaseConfiguration.create(jsc.hadoopConfiguration());

String ip = "cloudtable-yan-zk1-4TZKF7vD.mycloudtable.com";

String ip1 = "cloudtable-yan-zk2-xOogaSa3.mycloudtable.com";

String ip2 = "cloudtable-yan-zk3-7ajHEN3S.mycloudtable.com";

hbConf.set("hbase.zookeeper.quorum",ip);

hbConf.set("hbase.zookeeper.quorum",ip1);

hbConf.set("hbase.zookeeper.quorum",ip2);

// Declare the information of the table to be queried.

Scan scan = new org.apache.hadoop.hbase.client.Scan();

scan.addFamily(Bytes.toBytes("info"));

org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Scan proto = ProtobufUtil.toScan(scan);

String scanToString = Base64.encodeBytes(proto.toByteArray());

hbConf.set(TableInputFormat.INPUT_TABLE, "shb1");

hbConf.set(TableInputFormat.SCAN, scanToString);

// Obtain the data in the table through the Spark interface.

JavaPairRDD rdd = jsc.newAPIHadoopRDD(hbConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);

MRS 3.X集群Spark on CloudTable使用指导

// Traverse every row in the HBase table and print the results.

List> rddList = rdd.collect();

for (int i = 0; i < rddList.size(); i++) {

Tuple2 t2 = rddList.get(i);

ImmutableBytesWritable key = t2._1();

Iterator it = t2._2().listCells().iterator();

while (it.hasNext()) {

Cell c = it.next();

String family = Bytes.toString(CellUtil.cloneFamily(c));

String qualifier = Bytes.toString(CellUtil.cloneQualifier(c));

String value = Bytes.toString(CellUtil.cloneValue(c));

Long tm = c.getTimestamp();

System.out.println(" Family=" + family + " Qualifier=" + qualifier + " Value=" + value + " TimeStamp=" + tm);

}

}

jsc.stop();

}

}

2. 集群的各个节点的/etc/hosts文件添加如下内容(据不同CloudTable集群会不同,具体的获取联系CloudTable服务的SRE):

3. 找一个1.9.0集群将/opt/client/Spark/spark/jars/下面的这些1.3.1版本的hbase的包

hbase-client-1.3.1-mrs-1.9.0.jar, hbase-common-1.3.1-mrs-1.9.0.jar, hbase-hadoop2-compat-1.3.1-mrs-1.9.0.jar, hbase-protocol-1.3.1-mrs-1.9.0.jar,  hbase-server-1.3.1-mrs-1.9.0.jar, htrace-core-3.1.0-incubating.jar

都换到3.0.1集群的/opt/client/Spark2x/spark/jars/下面,将/opt/client/Spark2x/spark/jars/下面2.3.2版本的hbase包都移掉。

按照如下步骤执行官方样例代码即可完成:

MapReduce spark 表格存储服务 CloudTable

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:2021最新版SpringCloud高频面试题分享
下一篇:X-Forward-For 看破红尘,代理 IP 无所遁形!
相关文章