前面说到elasticsearch 7.16集群安装,本文介绍通过springboot 2.6.2集成es的java api对其进行操作。
首先看一下pom文件
pom.xml
4.0.0 org.springframework.boot spring-boot-starter-parent 2.6.2 com.example springboot-elasticsearch 0.0.1-SNAPSHOT springboot-elasticsearch Demo project for Spring Boot 1.8 5.7.8 org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-web RELEASE co.elastic.clients elasticsearch-java 7.16.0 com.fasterxml.jackson.core jackson-databind 2.12.3 com.alibaba fastjson 1.2.76 cn.hutool hutool-all ${hutool.version} org.springframework.boot spring-boot-starter-test test junit junit 4.13.2 test org.springframework.boot spring-boot-maven-plugin
Elasticsearch升级到7.16之后,已经废弃了High-level API了,统一使用Low-Level API,所以某些接口发生了变化,下面列出Elasticsearch Low-Level API的一些基本操作:
从application.properties文件读取Elasticsearch配置信息
server.port=8899 spring.application.name=qa-search elasticsearch.hosts=10.0.2.9:9200,10.0.2.78:9200,10.0.2.211:9200 elasticsearch.username=elastic elasticsearch.password=elastic elasticsearch.connection.timeout=10000 elasticsearch.socket.timeout=10000 elasticsearch.connection.request.timeout=10000
配置类
ElasticSearchConfig.java
package com.zh.ch.springboot.elasticsearch.config; import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.json.jackson.JacksonJsonpMapper; import co.elastic.clients.transport.ElasticsearchTransport; import co.elastic.clients.transport.rest_client.RestClientTransport; import com.zh.ch.springboot.elasticsearch.service.ElasticsearchServiceImpl; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.elasticsearch.client.Node; import org.elasticsearch.client.RestClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @Configuration public class ElasticSearchConfig { private final Log logger = LogFactory.getLog(ElasticsearchServiceImpl.class); @Value("${elasticsearch.hosts}") public String elasticsearchHost; @Value("${elasticsearch.username}") public String elasticsearchUsername; @Value("${elasticsearch.password}") public String elasticsearchPassword; @Value("${elasticsearch.connection.timeout}") public int elasticsearchConnectionTimeout; @Value("${elasticsearch.socket.timeout}") public int elasticsearchSocketTimeout; @Value("${elasticsearch.connection.request.timeout}") public int getElasticsearchConnectionRequestTimeout; @Bean public ElasticsearchClient elasticsearchClient() { RestClient restClient = RestClient.builder(getESHttpHosts()).setRequestConfigCallback(requestConfigBuilder -> { //设置连接超时时间 requestConfigBuilder.setConnectTimeout(elasticsearchConnectionTimeout); requestConfigBuilder.setSocketTimeout(elasticsearchSocketTimeout); requestConfigBuilder.setConnectionRequestTimeout(getElasticsearchConnectionRequestTimeout); return requestConfigBuilder; }).setFailureListener(new RestClient.FailureListener() { //某节点失败,这里可以做一些告警 @Override public void onFailure(Node node) { logger.error(node); } }).setHttpClientConfigCallback(httpClientBuilder -> { httpClientBuilder.disableAuthCaching(); //设置账密 return getHttpAsyncClientBuilder(httpClientBuilder); }).build(); ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper()); return new ElasticsearchClient(transport); } /** * ElasticSearch 连接地址 * 多个逗号分隔 * 示例:127.0.0.1:9201,127.0.0.1:9202,127.0.0.1:9203 */ private HttpHost[] getESHttpHosts() { String[] hosts = elasticsearchHost.split(","); HttpHost[] httpHosts = new HttpHost[hosts.length]; for (int i = 0; i < httpHosts.length; i++) { String host = hosts[i]; host = host.replaceAll("http://", "").replaceAll("https://", ""); Assert.isTrue(host.contains(":"), String.format("your host %s format error , Please refer to [ 127.0.0.1:9200 ] ", host)); httpHosts[i] = new HttpHost(host.split(":")[0], Integer.parseInt(host.split(":")[1]), "http"); } return httpHosts; } private HttpAsyncClientBuilder getHttpAsyncClientBuilder(HttpAsyncClientBuilder httpClientBuilder) { if (StringUtils.isEmpty(elasticsearchUsername) || StringUtils.isEmpty(elasticsearchPassword)) { return httpClientBuilder; } //账密设置 CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); //es账号密码(一般使用,用户elastic) credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(elasticsearchUsername, elasticsearchPassword)); httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); return httpClientBuilder; } }
接口类
ElasticsearchService.java
package com.zh.ch.springboot.elasticsearch.service; import co.elastic.clients.elasticsearch._types.mapping.Property; import co.elastic.clients.elasticsearch.core.IndexResponse; import co.elastic.clients.elasticsearch.core.search.HitsMetadata; import java.util.List; import java.util.Map; public interface ElasticsearchService { /** * 判断索引是否存在 * * @param index 索引 * @return boolean */ public boolean existsIndex(String index); /** * 创建索引 * * @param index 索引 * @param aliasename aliasename * @return boolean */ public boolean createIndex(String index, String aliasename, int numOfShards, Map properties); /** * 删除索引 * * @param indexList indexList * @return boolean */ public boolean deleteIndex(List indexList); /** * 判断文档是否存在 * @param index index * @param id id * @return boolean */ public boolean existsDocument(String index, String id, Class clazz); /** * 保存文档 * 如果文档存在则更新文档 * @param index index * @param id id * @param qa qa * @return IndexResponse */ public IndexResponse saveOrUpdateDocument(String index, String id, T qa); /** * 不指定IO保存文档 * @param index 索引 * @param qa 数据 * @return IndexResponse */ public IndexResponse saveOrUpdateDocument(String index, T qa); /** * 根据id获取文档 * @param index index * @param id id * @param clazz clazz * @return T */ public T getById(String index, String id, Class clazz); /** * 根据id列表获取文档 * @param index index * @param idList id * @param clazz clazz * @return List */ public List getByIdList(String index, List idList, Class clazz); /** * 分页查询 * @param index index * @param pageNo pageNo * @param pageSize pageSize * @param clazz clazz * @return HitsMetadata */ public HitsMetadata searchByPages(String index, Integer pageNo, Integer pageSize, Class clazz); /** * 根据id删除文档 * @param id id */ public boolean deleteById(String index, String id); }
实现类
ElasticsearchServiceImpl.java
package com.zh.ch.springboot.elasticsearch.service; import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.elasticsearch._types.mapping.Property; import co.elastic.clients.elasticsearch._types.mapping.TypeMapping; import co.elastic.clients.elasticsearch._types.query_dsl.FieldAndFormat; import co.elastic.clients.elasticsearch._types.query_dsl.Query; import co.elastic.clients.elasticsearch._types.query_dsl.QueryStringQuery; import co.elastic.clients.elasticsearch.core.*; import co.elastic.clients.elasticsearch.core.search.Highlight; import co.elastic.clients.elasticsearch.core.search.HighlightField; import co.elastic.clients.elasticsearch.core.search.HitsMetadata; import co.elastic.clients.elasticsearch.indices.*; import co.elastic.clients.elasticsearch.indices.ExistsRequest; import co.elastic.clients.transport.endpoints.BooleanResponse; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.*; @Component public class ElasticsearchServiceImpl implements ElasticsearchService { private final Log logger = LogFactory.getLog(ElasticsearchServiceImpl.class); private ElasticsearchClient client; @Autowired public void setClient(ElasticsearchClient client) { this.client = client; } public boolean existsIndex(String index) { try { ExistsRequest existsRequest = new ExistsRequest.Builder().index(index).build(); BooleanResponse response = client.indices().exists(existsRequest); return response.value(); } catch (IOException e) { logger.error("There is an error while getting index", e); } return false; } @Override public boolean createIndex(String indexName, String aliasesName, int numOfShards, Map properties) { try { TypeMapping typeMapping = new TypeMapping.Builder().properties(properties).build(); IndexSettings indexSettings = new IndexSettings.Builder().numberOfShards(String.valueOf(numOfShards)).build(); CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder() .index(indexName) .aliases(aliasesName, new Alias.Builder().isWriteIndex(true).build()) .mappings(typeMapping) .settings(indexSettings) .build(); CreateIndexResponse response = client.indices().create(createIndexRequest); return response.acknowledged(); } catch (IOException e) { logger.error("There is an error while creating index", e); } return false; } @Override public boolean deleteIndex(List indexList) { try { DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest.Builder().index(indexList).build(); DeleteIndexResponse response = client.indices().delete(deleteIndexRequest); return response.acknowledged(); } catch (IOException e) { logger.error("There is an error while deleting index", e); } return false; } @Override public boolean existsDocument(String index, String id, Class clazz) { try { GetRequest getRequest = new GetRequest.Builder().index(index).id(id).build(); GetResponse getResponse = client.get(getRequest, clazz); return getResponse.found(); } catch (IOException e) { logger.error("There is an error while judging if the document exists", e); } return false; } @Override public IndexResponse saveOrUpdateDocument(String index, String id, T t) { try { IndexRequest indexRequest = new IndexRequest.Builder().index(index).id(id).document(t).build(); return client.index(indexRequest); } catch (IOException e) { logger.error("There is an error while saving the document", e); } return null; } @Override public IndexResponse saveOrUpdateDocument(String index, T t) { try { IndexRequest indexRequest = new IndexRequest.Builder().index(index).document(t).build(); return client.index(indexRequest); } catch (IOException e) { logger.error("There is an error while saving the document", e); } return null; } @Override public T getById(String index, String id, Class clazz) { try { GetRequest getRequest = new GetRequest.Builder().index(index).id(id).build(); GetResponse getResponse = client.get(getRequest, clazz); return getResponse.source(); } catch (IOException e) { logger.error("There is an error while getting the document", e); } return null; } @Override public List getByIdList(String index, List idList, Class clazz) { try { List tList = new ArrayList<>(idList.size()); for (String id : idList) { tList.add(client.get(new GetRequest.Builder().index(index).id(id).build(), clazz).source()); } return tList; } catch (IOException e) { logger.error("There is an error while getting the document list", e); } return null; } @Override public HitsMetadata searchByPages(String index, Integer pageNo, Integer pageSize, Class clazz) { try { SearchRequest searchRequest = new SearchRequest.Builder().index(Collections.singletonList(index)).from(pageNo).size(pageSize).build(); SearchResponse searchResponse = client.search(searchRequest, clazz); return searchResponse.hits(); } catch (IOException e) { logger.error("There is an error while searching by pages", e); } return null; } public boolean deleteById(String index, String id) { try { DeleteRequest deleteRequest = new DeleteRequest.Builder().index(index).id(id).build(); DeleteResponse deleteResponse = client.delete(deleteRequest); return "deleted".equals(deleteResponse.result().jsonValue()); } catch (IOException e) { logger.error("There is an error while deleting id document", e); } return false; } }
测试类
package com.zh.ch.springboot.elasticsearch.service; import co.elastic.clients.elasticsearch._types.mapping.DateProperty; import co.elastic.clients.elasticsearch._types.mapping.Property; import co.elastic.clients.elasticsearch.core.IndexResponse; import co.elastic.clients.elasticsearch.core.search.Hit; import co.elastic.clients.elasticsearch.core.search.HitsMetadata; import com.alibaba.fastjson.JSON; import com.zh.ch.springboot.elasticsearch.SpringbootElasticsearchApplication; import com.zh.ch.springboot.elasticsearch.bean.QA; import org.junit.jupiter.api.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.text.SimpleDateFormat; import java.util.*; @RunWith(SpringRunner.class) @SpringBootTest(classes = SpringbootElasticsearchApplication.class) class ElasticsearchServiceImplTest { private ElasticsearchServiceImpl elasticsearchService; @Autowired public void setElasticsearchService(ElasticsearchServiceImpl elasticsearchService) { this.elasticsearchService = elasticsearchService; } @Test void existsIndex() { String index = "es_index_test_1"; boolean existsIndexFlag = elasticsearchService.existsIndex(index); System.out.printf("%s 是否存在 %b%n", index, existsIndexFlag); } @Test void createIndex() { String index = "es_index_test_1"; String indexAliasesName = "es_index_test_1_aliases"; Map map = new HashMap<>(); map.put("id", new Property(new DateProperty.Builder().index(true).store(true).build())); boolean createIndexFlag = elasticsearchService.createIndex(index, indexAliasesName, 12, map); System.out.printf("创建索引, index:%s , createIndexFlag:%b%n", index, createIndexFlag); } @Test void deleteIndex() { List indexList = new ArrayList<>(); indexList.add("es_index_test_1"); boolean deleteIndexFlag = elasticsearchService.deleteIndex(indexList); System.out.printf("删除 %s 索引是否成功 %b", indexList, deleteIndexFlag); } @Test void existsDocument() { String index = "bigdata"; String id = "1"; boolean existsDocumentFlag = elasticsearchService.existsDocument(index, id, QA.class); System.out.printf("文档 index为 %s, id为 %s 是否存在于es中: %b",index, id, existsDocumentFlag); } @Test void saveOrUpdateDocument() { QA qa = new QA(); qa.setType_name("flink"); qa.setTitle("# Checkpoint 做恢复的过程中出现Savepoint failed with error \"Checkpoint expired before completing\"的问题"); qa.setContent("该问题字面意思看是由于flink在做cp落地hdfs的时候,出现超时失败的问题\n" + "\n" + "\t/** The default timeout of a checkpoint attempt: 10 minutes. */\n" + "\tpublic static final long DEFAULT_TIMEOUT = 10 * 60 * 1000;\n" + "可以看到是超时失败的问题(默认超时10min失败)。\n" + "\n" + "## 1.原因分析与排查:\n" + "第一种情况:自身设置了超时时间(自身做持久化的内存也不大的情况)\n" + "\n" + "//例如:仅仅间隔6sec就做持久化\n" + "env.getCheckpointConfig.setCheckpointTimeout( 6 * 1000) //6sec内完成checkpoint\n" + "![](https://search.lrting.top/images/20190313190334179.png)" + "如上图所示:查看Flink-web-ui的DashBoard中看到checkpoint栏目下的history中各个失败的checkpoint快照,然后查看失败时候,各个算子中使用时间,总有一些大部分完成的算子,但是另外一部分算子做checkpoint时候出现失败的情况。此时要做的是查看这部分算子的计算处理速度慢的原因。\n" + "\n" + "参考这个:[](http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Savepoint-failed-with-error-quot-Checkpoint-expired-before-completing-quot-td24177.html)\n" + "![](https://search.lrting.top/images/2019031211302593.png" + "## 2.因此,解决办法在于:\n" + "1.检查是否数据倾斜;(比如:数据倾斜导致的个别算子计算能力差异巨大)\n" + "\n" + "2.开启并发增长个别处理慢的算子的处理能力;\n" + "\n" + "3.检查代码中是否存在计算速度特别慢的操作(如读写磁盘、数据库、网络传输、创建大对象等耗时操作)\n" + "\n" + "部分检查点成功问题(刚开始成功,过了几个检查点之后持久化失败的问题,参考https://blog.csdn.net/fct2001140269/article/details/88715808)\n"); SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); qa.setDt(f.format(new Date())); qa.setUser_id(1); IndexResponse indexResponse = elasticsearchService.saveOrUpdateDocument("bigdata", qa); System.out.printf("插入书籍是否成功 %s", indexResponse.result()); } @Test void getById() { String index = "bigdata"; QA qaList = elasticsearchService.getById(index, "1", QA.class); System.out.println(JSON.toJSONString(qaList)); } @Test void getByIdList() { String index = "bigdata"; List idList = new ArrayList<>(); idList.add("1"); idList.add("2"); List qaList = elasticsearchService.getByIdList(index, idList, QA.class); for (QA qa : qaList) { System.out.println(JSON.toJSONString(qa)); } } @Test void searchByPages() { String index = "bigdata"; Integer pageNo = 0; Integer pageSize = 10; HitsMetadata qaList = elasticsearchService.searchByPages(index, pageNo, pageSize, QA.class); System.out.println(qaList.hits().size()); } @Test void searchByQuery() { String queryString = "大数据"; HitsMetadata qaList = elasticsearchService.searchByQuery(queryString, QA.class); for (Hit hit : qaList.hits()) { System.out.println(hit.highlight()); } } @Test void deleteById() { String index = "bigdata"; String id = "ee00B34BwyhfTnq-1xYe"; boolean deleteByIdFlag = elasticsearchService.deleteById(index, id); System.out.println(deleteByIdFlag); } }
完整代码示例(https://git.lrting.top/xiaozhch5/springboot-elasticsearch.git):
API Elasticsearch Spring Boot
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。