MRS Flink使用SQL-Client对接Hive

网友投稿 783 2022-05-30

一、准备环境

1.根据产品文档安装Flink客户端;

2.将sql-client-defaults.yaml放入/opt/client/Flink/flink/conf中

3.将jaas.conf放入/opt/client/Flink/flink/conf中

Client {

com.sun.security.auth.module.Krb5LoginModule required

useKeyTab=false

useTicketCache=true

debug=false;

};

4.添加sql-client.sh中添加在JVM_ARGS参数:

JVM_ARGS="-Djava.security.auth.login.config=/opt/client/Flink/flink/conf/jaas.conf $JVM_ARGS"

二、启动Flink集群

例如:yarn-session.sh -t ssl -d

三、启动SQL-Client

./sql-client.sh embedded -d ./../conf/sql-client-defaults.yaml

四、运行SQL

CREATE TABLE kafkaSourceTable (

order_id VARCHAR,

shop_id VARCHAR,

member_id VARCHAR,

trade_amt DOUBLE

) WITH (

'connector.type' = 'kafka',

'connector.version' = 'universal',

'connector.topic' = 'order_sql',

'connector.properties.bootstrap.servers' = '10.162.147.217:21005',

'connector.properties.zookeeper.connect' = '10.162.147.217:24002',

'connector.properties.group.id' = 'test-consumer-group',

'connector.startup-mode' = 'latest-offset',

'format.type' = 'json'

);

CREATE TABLE kafkaSinkTable(shop_id VARCHAR, member_id VARCHAR) WITH (

'connector.type' = 'kafka',

'connector.version' = 'universal',

'connector.topic' = 'order_sql',

'connector.properties.bootstrap.servers' = '10.162.147.217:21005',

MRS Flink使用SQL-Client对接Hive

'connector.properties.zookeeper.connect' = '10.162.147.217:24002',

'update-mode' = 'append',

'format.type' = 'json'

);

INSERT INTO

kafkaSinkTable

SELECT

shop_id,

member_id

FROM

kafkaSourceTable;

SELECT

shop_id,

member_id

FROM

kafkaSourceTable;

五、对接Hive

1)修改sql-client-defaults.yaml

catalogs:

- name: myhive

type: hive

hive-conf-dir: /opt/clienrc5/Hive/config

hive-version: 3.1.0

2)在/opt/clienrc5/Hive/config/hive-site.xml添加配置

hive.metastore.sasl.enabled

true

3)启动sql-client

use catalog myhive;

SET table.sql-dialect=hive;

CREATE TABLE IF NOT EXISTS hive_dialect_tbl (

`id` int ,

`name` string ,

`age` int

)

ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

SET table.sql-dialect=default;

CREATE TABLE datagen (

`id` int ,

`name` string ,

`age` int

) WITH (

'connector' = 'datagen',

'rows-per-second'='1'

);

INSERT INTO hive_dialect_tbl SELECT * FROM datagen;

select * from hive_dialect_tbl;

EI企业智能 Flink FusionInsight

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:source insight使用6---Source Insight 4.0中文注释乱码解决办法
下一篇:云脉智慧门禁APP/智慧门禁系统模块介绍
相关文章