扩展Druid.io支持GaussDB T 数据库[转]

网友投稿 902 2022-05-30

转自 http://3ms.huawei.com/km/blogs/details/6220889

为了满足某产品的诉求,需要扩展druid.io使之支持高斯数据库。之前druid.io使用mysql关系数据库来存储元数据,现在主要是将元数据存储方式改为使用高斯数据库存储。

扩展Druid.io支持GaussDB T 数据库[转]

在Druid.io官网上找到Druid.io模块扩展的官方文档:

https://druid.apache.org/docs/latest/development/modules.html

通过分析代码了解到Druid.io为了使得代码具有很强的扩展性使用了Guice框架来进行开发,下面我们来看看如何基于Guice框架扩展Druid.io支持高斯数据库。

1、创建一个高斯元数据存储的模块

在源代码druid\extensions-core目录下创建一个名为gaussdb-metadata-storage的maven模块。

创建类ZenithMetadataStorageModule继承SQLMetadataStorageDruidModule这个元数据存储基类,并实现DruidModule这个模块接口:

package org.gaussdb.metadata.storage; import java.util.List; import com.fasterxml.jackson.databind.Module; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Key; import io.druid.guice.LazySingleton; import io.druid.guice.PolyBind; import io.druid.guice.SQLMetadataStorageDruidModule; import io.druid.initialization.DruidModule; import io.druid.metadata.MetadataStorageActionHandlerFactory; import io.druid.metadata.MetadataStorageConnector; import io.druid.metadata.MetadataStorageProvider; import io.druid.metadata.NoopMetadataStorageProvider; import io.druid.metadata.SQLMetadataConnector; public class ZenithMetadataStorageModule extends SQLMetadataStorageDruidModule implements DruidModule {     public static final String TYPE = "gaussdb";          public ZenithMetadataStorageModule()     {         super(TYPE);         // TODO Auto-generated constructor stub     }     @Override     public List getJacksonModules()     {         // TODO Auto-generated method stub         return ImmutableList.of();     }          @Override     public void configure(Binder binder)     {       super.configure(binder);       PolyBind           .optionBinder(binder, Key.get(MetadataStorageProvider.class))           .addBinding(TYPE)           .to(NoopMetadataStorageProvider.class)           .in(LazySingleton.class);       PolyBind           .optionBinder(binder, Key.get(MetadataStorageConnector.class))           .addBinding(TYPE)           .to(ZenithConnector.class)           .in(LazySingleton.class);       PolyBind           .optionBinder(binder, Key.get(SQLMetadataConnector.class))           .addBinding(TYPE)           .to(ZenithConnector.class)           .in(LazySingleton.class);       PolyBind.optionBinder(binder, Key.get(MetadataStorageActionHandlerFactory.class))               .addBinding(TYPE)               .to(ZenithMetadataStorageActionHandlerFactory.class)               .in(LazySingleton.class);     } }

同时需要引入对高斯驱动的依赖

            com.huawei.gauss             com.huawei.gauss.jdbc.ZenithDriver             V300R001C00SPC100B210

2、注册高斯元数据存储模块

2.1、修改配置

Druid.io所有核心扩展可开箱即用。通过将其名称添加到common.runtime.properties配置文件的druid.extensions.loadList属性来加载绑定扩展。

druid.extensions.loadList=["gaussdb-metadata-storage"]

并增加高斯数据库的参数配置如下:

# For GaussDB: druid.metadata.storage.type=gaussdb druid.metadata.storage.connector.connectURI=jdbc:zenith:@10.243.49.xx:1611 druid.metadata.storage.connector.user=druid druid.metadata.storage.connector.password=xxx

注意此处的druid.metadata.storage.type与ZenithMetadataStorageModule代码中的TYPE是对应的。

2.2、需要在jar的META-INF / services目录中打包一个额外的文件注册模块。

通过在src / main / resources目录中创建文件,它应该是一个文本文件,类似:

META-INF/services/org.apache.druid.initialization.DruidModule,

其中包含实现DruidModule的包限定类的新行分隔列表,类似:

org.gaussdb.metadata.storage.ZenithMetadataStorageModule

此时,当jar被添加到类路径或作为扩展时,Druid.io会注意到该文件并将实例化Module的实例。 模块应该有一个默认的构造函数,但是如果你需要访问运行时配置属性,它可以有一个带@Inject的方法来获取从Guice注入的一个Properties对象。

3、验证元数据存储的实现

3.1、打包

distribution工程是专门用来打包的,为了保证druid.io打包的时候将该jar包打入,需要修改distribution工程的pom文件。

在exec-maven-plugin这个插件中增加参数:

-c io.druid.extensions:gaussdb-metadata-storage

最终在druid-0.12.0-bin.tar.gz\druid-0.12.0\extensions\gaussdb-metadata-storage中可以看到jar包及高斯驱动包。

3.2、适配高斯语法

目前为止我们已经可以部署druid了,但是运行的时候会报很多错误和异常,这些错误一般是由于代码中存在与高斯语法不一致的地方导致的,druid.io使用jdbi进行关系数据库的操作,下面一一列出对这块操作的修改。

1、ZenithConnector类中:

package org.gaussdb.metadata.storage; import java.sql.SQLException; import java.util.List; import java.util.Map; import org.apache.commons.dbcp2.BasicDataSource; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.tweak.HandleCallback; import com.google.common.base.Supplier; import com.google.inject.Inject; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.logger.Logger; import io.druid.metadata.MetadataStorageConnectorConfig; import io.druid.metadata.MetadataStorageTablesConfig; import io.druid.metadata.SQLMetadataConnector; public class ZenithConnector extends SQLMetadataConnector {     private static final Logger log = new Logger(ZenithConnector.class);          private static final String PAYLOAD_TYPE = "BLOB";          private static final String SERIAL_TYPE = "Integer AUTO_INCREMENT";          private static final String QUOTE_STRING = "";          public static final int DEFAULT_STREAMING_RESULT_SIZE = 100;          private final DBI dbi;          @Inject     public ZenithConnector(Supplier config,             Supplier tablesConfigSupplier)     {         super(config, tablesConfigSupplier);                  final BasicDataSource datasource = getDatasource();                  datasource.setDriverClassLoader(getClass().getClassLoader());         datasource.setDriverClassName("com.huawei.gauss.jdbc.ZenithDriver");                  this.dbi = new DBI(datasource);                  log.info("Configured Zenith as metadata storage");     }          @Override     protected String getPayloadType()     {         return PAYLOAD_TYPE;     }          @Override     protected String getSerialType()     {         return SERIAL_TYPE;     }          @Override     public String getQuoteString()     {         return QUOTE_STRING;     }          @Override     protected int getStreamingFetchSize()     {         return DEFAULT_STREAMING_RESULT_SIZE;     }          @Override     public boolean tableExists(Handle handle, String tableName)     {                 try         {             String owner = handle.getConnection().getMetaData().getUserName();             List> list = handle                     .createQuery(                             "select * from dba_tables where owner= :owner and table_name= :tableName")                     .bind("owner", owner.toUpperCase())                     .bind("tableName", tableName.toUpperCase()).list();             log.info("ZenithConnector----" + list.size());             return !list.isEmpty();         }         catch (SQLException e)         {             log.error("table is not Exists");                     }         return false;     }          @Override     public Void insertOrUpdate(final String tableName, final String keyColumn,         final String valueColumn, final String key, final byte[] value)         throws Exception     {         return getDBI().withHandle(new HandleCallback()         {             @Override             public Void withHandle(Handle handle) throws Exception             {                 handle.createStatement(StringUtils.format(                         "INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value) ON DUPLICATE KEY UPDATE %3$s = :value",                         tableName,                         keyColumn,                         valueColumn))                         .bind("key", key)                         .bind("value", value)                         .execute();                 return null;             }         });     }          @Override     public DBI getDBI()     {         return dbi;     }      }

上面代码中有几个关键地方解释下:

PAYLOAD_TYPE = "BLOB" ;//  高斯下CLOB方式不行,必须用BLOB SERIAL_TYPE = "Integer AUTO_INCREMENT";  // 主键自增方式 QUOTE_STRING = " ";  // 高斯对空字符串的处理必须转换为空格 DEFAULT_STREAMING_RESULT_SIZE = 100;  // 高斯中必须不能为0

另外高斯中对查询数据表是否存在时需要使用用户名参数,可以从连接对象中获取:

-

Java 代码

String owner = handle.getConnection().getMetaData().getUserName();

2、解决高斯Do not support addBatch问题

druid-server工程中SQLMetadataConnector类:

public void createTable(final String tableName, final Iterable sql)   {     try {       retryWithHandle(           new HandleCallback()           {             @Override             public Void withHandle(Handle handle)             {              if (!tableExists(handle, tableName)) {                     log.info("Creating table[%s]", tableName); //                  final Batch batch = handle.createBatch();                   for (String s : sql) { //                      log.info(s); //                    batch.add(s);                       handle.execute(s);                   } //                  batch.execute();                   } else {                     log.info("Table[%s] already exists", tableName);                   }                   return null;             }           }       );     }

3、解决字段名与保留关键字冲突问题:

druid-server工程中SQLMetadataConnector类:

将sql语句中所有start改为starttime,end改为endtime

public void createPendingSegmentsTable(final String tableName)   {     createTable(         tableName,         ImmutableList.of(             StringUtils.format(                 "CREATE TABLE %1$s (\n"                 + "  id VARCHAR(255) NOT NULL,\n"                 + "  dataSource VARCHAR(255) NOT NULL,\n"                 + "  created_date VARCHAR(255) NOT NULL,\n"                 + "  starttime VARCHAR(255) NOT NULL,\n"                 + "  endtime VARCHAR(255) NOT NULL,\n"                 + "  sequence_name VARCHAR(255) NOT NULL,\n"                 + "  sequence_prev_id VARCHAR(255),\n"                 + "  sequence_name_prev_id_sha1 VARCHAR(255) NOT NULL,\n"                 + "  payload %2$s NOT NULL,\n"                 + "  PRIMARY KEY (id),\n"                 + "  UNIQUE (sequence_name_prev_id_sha1)\n"                 + ")", //                tableName, getPayloadType(), getQuoteString()                 tableName, getPayloadType()             )         )     );   }

druid-server工程中IndexerSQLMetadataStorageCoordinator类:

start和end字符串与高斯关键字冲突,将sql语句中所有start改为starttime,end改为endtime

4、sequence_prev_id改为可以为空。

-

Java 代码

final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId;

此处的previousSegmentIdNotNull会作为sequence_prev_id插入,但高斯不支持null或“”字符串的插入,所以sequence_prev_id字段要设置为可以为空。

5、字段名大小写问题:

druid-server工程中SQLMetadataSegmentManager类396行:

stringObjectMap中的key转为大写再取值,因为高斯数据库查询返回的字段名是大写的。

云数据库 GaussDB(for openGauss)

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:鲲鹏开发工程师技能图谱正式发布
下一篇:浏览器常识和http错误码介绍
相关文章