ElasticSearch基本命令&python库使用

网友投稿 809 2022-05-30

python连接ES

集群方式

from elasticsearch import Elasticsearch es = Elasticsearch([{'host': 'xxx'}, {'host': 'xx'}, {'host': 'xxx'}], http_auth=('xxx', 'xxx'), timeout=3600)

单点方式

from elasticsearch import Elasticsearch es = Elasticsearch([{'host': 'xxx', 'port': 9000}], timeout=3600)

如何使用SSl或者添加port端口等可以help该API查看,有具体的例子

ES body中的命令

数据库中现在有一条数据为:

{'source': 'abc', 'valid': True}], 'collectTime': '2021-11-02T15:50:30Z', 'confidence': 70.0, 'createdBy': 'def', 'createdTime': '2021-11-02T16:10:55Z', 'creditLevel': 3, 'dataType': 'business', 'hitCount': 2, 'id': 'indicator-d3a548d9eb056666', 'idxKey': 'network-attacks-botnet', 'labels': ['ip'], 'modifiedTime': '2021-11-02T16:10:55Z', 'modifyCount': 0, 'observables': [{'type': 'ipv4-addr', 'value': '175.181.111.27'}], 'pattern': "[ipv4-addr:value = '175.181.111.27']", 'related': [], 'revoked': False, 'threatLevel': 5, 'threatTypes': ['40702'], 'tlp': 'green', 'type': 'indicator', 'hitLastTime': '2021-11-03T17:14:19Z'}}

主要命令

切片查询

body = { 'from': 0, # 从0开始 'size': 20 # 取多少条数据 }

term精准单值查询

'term': {'source': 'nsfocus'} # 查询source为nsfocus的数据 'term': {'createdBy.keyword': '中国电信'} # 查询中文需要加上keyword

terms精准多值(value)查询(or关系)

'terms': {'source': ['nsfocus', 'tjym']} # 查询source为nsfocus或者tjym的数据

match模糊查询-分词

'match': {'createdBy': '中国电信'} # 模糊查询(会被分词)。只要包含"中国"或者"电信"就会被查询到

multi_match多字段(key)模糊查询(AND)-分词

'multi_match': {'query':'indicator', 'fields': ['type', 'id']} # "type"和"id"字段中包含关键词indicator的数据

match_phrase模糊查询-不分词

'multi_phrase': {'idxkey': 'network'}

match_all搜索所有数据

"match_all":{}

exists存在查询

'exists': {'field': 'indicator'} # 查询存在indicator的数据

prefix前缀查询

'prefix': {'idxkey.keyword': '中国'} # 查询idxkey以中国开头的数据,英文不需要加keyword

wildcard通配符查询

'wildcard': {'idxkey': '*network*'} # ?代表一个字符,*代表0个或多个字符

regexp正则查询

'regexp': {'tlp': 'w+'}

range范围查询

'range': {'collectTime': {'gte': '2021-11-02T00:00:00', 'lt': '2021-12-02T23:59:59'}} # gt : 大于 # lt : 小于 # gte : 大于等于 # lte :小于等于

单条件查询

body ={ 'query':{ 'term': {'source': 'nsfocus'} } }

复合查询bool

bool有三类查询关系:

must:必须都满足

should:只要其中一个满足

must_not:must_not都不满足

body = { 'query': { 'bool': { 'must': [ {'range': {'createdTime': {'gte': '2020-11-06T00:00:00', 'lt': '2020-11-06T23:59:59'}}}, {'match': {'createdBy': "['安全帮']"}} ] } } }

切片查询

# 取20个数据。类似mysql中的limit 0, 20。 注:size可以在es.search中指定,也可以在此指定,默认是10 body = { 'from': 0, # 从0开始 'size': 20 # 取20条数据 }

排序

# 单字段排序 body = { "query":{ # query指定查询条件,可以使用以上的任何命令 "match_all":{} }, "sort":{ # 对满足查询条件的数据按照confidence从小到大排序 "confidence":{"order":"asc"} } } # 多字段排序:先根据confidence升序,再更具threatLevel降序 body = { "query":{ "match_all":{} }, "sort":[{"confidence":{"order":"asc"}}, {"threatLevel":{"order":"asc"}} ], }

聚合

# 查询满足条件数据中confidence的最小值 # 聚合:min、max、sum、mean # confidence_age:最小值的key # min:聚合操作 # field:age对age字段进行求最小值 body = { "from": 0, "size": 3, "aggs":{ "confidence_min":{ "min": {"field":"confidence"} } } }

针对不同数据的命令

基础数据类型

文本数据类型,用于索引全文值的字段。使用文本数据类型的字段,它们会被分词,在索引之前将字符串转换为单个术语的列表(倒排索引),分词过程允许ES搜索每个全文字段中的单个单词。因此适用于match命令,一般不用于聚合

关键字数据类型,用于索引结构化内容的字段。使用keyword类型的字段,其不会被分析(分词),给什么值就原封不动地按照这个值索引,所以关键字字段只能按其确切值进行搜索,适用于term命令,通常用于过滤、排序和聚合。

对比text和keyword数据类型,假如value为"我喜欢运动",以text类型进行存储为"我喜欢你中国",以keyword类型进行存储为"我喜欢"、“运动”,假如match查询时,输入的条件为"运动",那么以text存储时是没有"运动"这个词的,故查不到;但keyword有,故能查到。假如value为“运动”,无论是text还是keyword,存储的都是"运动",因此用term和match都是能查到的。

数字类型,这类数据类型都是以**确切值(不会分词)**存储的,可以使用"term"查询精确匹配。ES支持的数字类型有:long、integer、short、byte、double、float、half_float、scaled_float。

"confidence":{"type":"float"}

日期数据类型。由于JSON中没有表示日期的数据类型,所以ES中的日期可以表示:

日期格式化后的字符串,如:“2021-01-01”

long类型值表示自纪元以来的毫秒数

nteger类型值表示自纪元以来的秒数

如果指定了时区,ES将日期转换为UTC,然后再存储为自纪元以来的毫秒数(long类型)。当字段被设置为date类型时,可以自定义日期格式,但如果未指定格式,则使用默认格式:“strict_date_optional_time||epoch_millis”

范围数据类型。具有大小关系的一个值区间,所以会用到gt、gte、lt、lte…等逻辑表示符。ES支持下面6种范围数据类型:integer_range、long_range、float_range、double_range、date_range、ip_range

复杂数据类型

在ES中,没有专门的数组数据类型,但是默认情况下,任意一个字段都可以包含0或多个值,这意味着每个字段默认都是数组类型。不过ES要求数组类型的各个元素值的数据类型必须相同。

假如type是keyword的array,相关命令需要进行更改(添加一个value):"term": {"labels": {"value": "url"}},labels中包含url(不分词)的数据。

对象类型,类似于python的字典,如下所示是category的数据类型,properties代表了object,针对每个key还可以设置其对应的type

"category":{ "properties":{ "firstCategory":{"type":"keyword"}, "secondCategory":{"type":"keyword"}, "thirdCategory":{"type":"keyword"}}} # 多了一个properties,数字类型时直接是'confidence': {"type":"float"}

对象类型的相关命令也需要进行修改:'term': {"category.firstCategory": "network-attacks"},也可以搭配复合查询bool使用。

nested数据类型是object的延伸版本。object的value是一个字典,nest是字典构成的数组

object数据举例:"category": {"firstCategory": "network-attacks", "secondCategory": "botnet"} nest数据举例:"category": [{"firstCategory": "network-attacks", "secondCategory": "botnet"}, {"firstCategory": "network-attacks", "secondCategory": "botnet", "thirdCategory": "c2"}]

对象类型的相关命令也需要进行修改,使用专门的nested命令,假如插入的一条数据是:

// 插入的数据 { "title":"Speed", "actors":[ { "first_name":"Keanu", "last_name":"Reeves" }, { "first_name":"Dennis", "last_name":"Hopper" } ] } // nested查询语句 {"query": { "bool": { "must": [ {"match": {"title": "Speed"}}, {"nested": {"path": "actors", "query": { "bool": { "must": [{"match": {"actors.first_name":"Keanu"}}, {"match": {"actors.last_name": "Hopper"}}] } } } }] } } } // 如果数据中first_name的值还是个object,在nested语句中可以写成actors.first_name.next_objective_key

操作:插入数据(增)

python

from elasticsearch import Elasticsearch def connect_es(): # 连接数据库 es = Elasticsearch([{'host': 'xxx'}, {'host': 'xxx'}, {'host': 'xxx'}], http_auth=('xxx', 'xxx'), timeout=3600) return es body ={ 'name': 'tom', 'aget': 25 } es = connect_es() # index的作用比较多,通过只传入body的方式,可以插入一条数据 # 也可以既传body,也传id。如果id号不存在,插入该条数据,id号为指定的值;如果id号存在,则更新值 # 如果索引不存在,会直接创建索引 result = es.index(index='indicator', body=body) # es.indices.refresh(index='indicator') # 一定要加上刷新,否则很容易报错

命令行

// put:需要设定数据ID(同一条数据首次插入是created,再次插入会updated) curl -u elastic:Pis@ta12735f2A56 -PUT "http://127.0.0.1:9200/indicator/_doc/1" -H 'Content-Type: application/json' -d' { "name":"stono", "country":"China", "age":111, "date":"1999-11-11" }'' // post,可选择设定数据ID(不指定id情况下:同一条数据首次插入是created,再次插入还是created,但_id会变;指定id若id不变第二次插入失败) curl -u elastic:Pis@ta12735f2A56 -POST "http://127.0.0.1:9200/indicator/_doc/" -H 'Content-Type: application/json' -d' { "name":"stono", "country":"China", "age":111, "date":"1999-11-11" }'

操作:删除数据(删)

python(含按id号方式)

from elasticsearch import Elasticsearch def connect_es(): # 连接数据库 es = Elasticsearch([{'host': 'xxx'}, {'host': 'xxx'}, {'host': 'xxx'}], http_auth=('xxx', 'xxx'), timeout=3600) return es body ={ 'query':{ 'term': {'source': 'nsfocus'} } } # body = { # "query": { # "bool": { # "should": [ # { # "ids": { # "values": ['tool'] # } # } # ] # } # } # } es = connect_es() result = es.delete(index='indicator', body=body) # 通过指定body来查询数据 es.indices.refresh(index='indicator') # 一定要加上刷新,否则很容易报错

命令行(含按id号方式)

curl -X POST 'http://10.0.10.11:9200/indicator/_delete_by_query' -H 'content-Type:application/json' -d ' { "query": { "match": { "name": "测试" } } }' // 按id号删除 curl -X POST 'http://10.0.10.11:9200/indicator/_delete_by_query' -H 'content-Type:application/json' -d ' { "query": { "bool": { "should": [ { "ids": { "values": [1,2] } } ] } } }

操作:修改数据(改)

python

# 方法一:参考插入数据中的index方法 # 同时指定body和id,id存在,更新数据为body;id不存在插入body数据 # 方法二:指定ID单条更新:update data={'doc':{"age":77}} # doc必须存在 es.update(index='test',id=3,doc_type='_doc',body=data) es.indices.refresh(index='test') """ 使用update_by_query批量更新:该方法需要结合painless语言 body分为两部分: query是查询语言,表示对query查询出的数据进行更新 script:表示以脚本的方式修改doc,value就是修改的具体内容 lang:指明脚本的语言,painless表示以es内置的脚本语言进行修改. 此外es还支持多种脚本语言, 如Python, js等等 inline: 脚本内容,也就是更新的内容(本例将createdBy修改为['中国']) ctx: 代表es上下文 _source:表示对_source这个key createdBy:是_source中的一个key params:下方的params参数,这里指定value params: 指定inline中用到的参数 """ body = { "script": { "inline": "ctx._source.createdBy = params.newProps", "lang": "painless", "params": { "newProps": ["中国"] } }, "query": { "match": {"createdBy": '["安全"]'} } } es.update_by_query(index='test2',body=data_all) es.indices.refresh(index='test2')

命令行

curl -X POST 'http://10.0.10.11:9200/indicator/_update_by_query' -H 'content-Type:application/json' -d '{ "script": { "inline": "ctx._source.createdBy = params.newProps", "lang": "painless", "params": { "newProps": ["中国"] } }, "query": { "match": {"createdBy": '["安全"]'} }}'

# 批量修改createdBy,将电信安全改为中国curl -u user:123455 -POST "http://172.160.100.224:9200/indicator/_update_by_query" -H 'Content-Type: application/json' -d '{ "script": { "inline": "ctx._source.createdBy = params.newProps", "lang": "painless", "params": { "newProps": ["中国"] } }, "query": { "match": {"createdBy": '["安全"]'} }}'

操作:查询数据(查)

python

search的方式:按照body进行查询,常用该方法

from elasticsearch import Elasticsearchdef connect_es(): # 连接数据库 es = Elasticsearch([{'host': 'xxx'}, {'host': 'xxx'}, {'host': ''}], xxx http_auth=('xxx', 'xxx'), timeout=3600) return esbody ={ 'query':{ 'term': {'source': 'nsfocus'} }}es = connect_es()result = es.search(index='indicator', body=body) # 通过指定body来查询数据es.indices.refresh(index='indicator') # 一定要加上刷新,否则很容易报错

get或者get_source按照id进行查询:不常用

from elasticsearch import Elasticsearchdef connect_es(): # 连接数据库 es = Elasticsearch([{'host': 'xxx'}, {'host': 'xxx'}, {'host': 'xxx'}], http_auth=('xxx', 'xxx'), timeout=3600) return eses = connect_es()result = es.get_source(index='indicator', id='indicator-6461a5b7de8c4d2c9e1fcb49fecb0196') es.indices.refresh(index='indicator') # 一定要加上刷新,否则很容易报错

命令行

// search的方式curl -u user:password -XGET "http://172.16.100.21:9000/indicator/_search?pretty" -H 'Content-Type: application/json' -d'{ "query": { "match_all": {} }, "from": 0, "size": 1}'// search的第二种方法:查询tlp为green的文档curl -u elastic:Pis@ta12735f2A56 -XGET "http://172.160.111.221:9000/indicator/_search?pretty&q=tlp:green" -H 'Content-Type: application/json'

批量增改

#批量对不同的索引进行增删改查操作,每个json一行batch_action=[ {"index": {"_index": "test", "_type": "_doc", "_id": "1"}}, # index表示插入 {"name": "tom", "age": 25, "email":"12345@163.com", "company":"test1" }, # 插入的数据 {"index": {"_index": "test", "_type": "_doc", "_id": "2"}}, # index表示插入 {"name": "lucy", "age": 33, "email":"lucy@163.com", "company":"test2" }, # 插入的数据 {"delete": {"_index": "test", "_type": "_doc", "_id": "3"}}, # delete删除数据 {"create": {"_index" : "test", "_type" : "_doc", "_id": "4"}}, # create插入数据 {"name": "bob", "age": 24, "email":"bob@csdn.com", "company":"test3" }, # 插入的数据 {"update": {"_index": "test", "_type": "_doc", "_id": "5"}}, # 更新数据 {"doc": {"age": "12"}} # 更新的数据字段和值 ]es.bulk(index='test2',doc_type='_doc',body=batch_action)

其他操作

查看es所有索引

python

from elasticsearch import Elasticsearches = Elasticsearch([{'host': '172.160.100.224'}, {'host': '172.160.100.133'}, {'host': '172.160.100.153'}], http_auth=('user', '123456'), timeout=3600)indices=es.indices.get_alias().keys()print(sorted(indices))

命令行模式

curl -u user:123456 -XGET "http://172.160.100.224:9200/_cat/indices?v"

查看index的信息(包括数据类型等)

curl -u elastic:Pis@ta12735f2A56 -XGET "http://172.16.0.224:9200/indicator?pretty" -H 'Content-Type: application/json'

// 完整结果{ "indicator": { "aliases": {}, "mappings": { "properties": { "actTypes": { "type": "integer" }, "categories": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "category": { "properties": { "firstCategory": { "type": "keyword" }, "secondCategory": { "type": "keyword" }, "thirdCategory": { "type": "keyword" } } }, "chainEvidenceId": { "type": "keyword" }, "city": { "type": "keyword" }, "collectSource": { "properties": { "source": { "type": "keyword" }, "valid": { "type": "boolean" } } }, "collectTime": { "type": "date" }, "confidence": { "type": "float" }, "configurations": { "properties": { "cVEDataVersion": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "nodes": { "properties": { "cpe": { "properties": { "cpe22Uri": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "cpe23Uri": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "vulnerable": { "type": "boolean" } } }, "operator": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } } } }, "country": { "type": "keyword" }, "createdBy": { "type": "text" }, "createdTime": { "type": "date" }, "creditLevel": { "type": "integer" }, "cvssInfo": { "properties": { "vKey": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "vValue": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } }, "cvssScore": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "cweId": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "dataType": { "type": "text" }, "description": { "type": "text" }, "extReferences": { "properties": { "externalId": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "sourceName": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "url": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } }, "heat": { "type": "long" }, "hitCount": { "type": "long" }, "hitLastTime": { "type": "date" }, "id": { "type": "keyword" }, "idxKey": { "type": "keyword" }, "interfaceType": { "type": "keyword" }, "iocCreateTime": { "type": "date" }, "labels": { "type": "keyword" }, "lang": { "type": "text" }, "lastUpdated": { "type": "date" }, "modifiedBy": { "type": "text" }, "modifiedTime": { "type": "date" }, "modifyCount": { "type": "long" }, "modifyTime": { "type": "date" }, "name": { "type": "text" }, "observables": { "properties": { "displayName": { "type": "text" }, "type": { "type": "keyword" }, "value": { "type": "keyword" } } }, "operator": { "type": "keyword" }, "pattern": { "type": "text" }, "patternType": { "type": "text" }, "patternVersion": { "type": "text" }, "pocFound": { "type": "boolean" }, "public": { "type": "boolean" }, "query": { "properties": { "bool": { "properties": { "must": { "properties": { "match": { "properties": { "observables": { "properties": { "value": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } } } }, "term": { "properties": { "revoked": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } } } }, "must_not": { "properties": { "exists": { "properties": { "field": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } } } } } } } }, "region": { "type": "keyword" }, "related": { "properties": { "object": { "properties": { "type": { "type": "keyword" }, "value": { "type": "text" } } }, "objectRef": { "properties": { "objId": { "type": "keyword" }, "objTitle": { "type": "text" }, "objType": { "type": "keyword" } } }, "relationship": { "type": "text" }, "threatTypes": { "type": "keyword" } } }, "relatedCount": { "properties": { "infoType": { "type": "keyword" }, "value": { "type": "integer" } } }, "revoked": { "type": "boolean" }, "riskLevel": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "solution": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "sourceRef": { "type": "text" }, "tag": { "properties": { "tagType": { "type": "keyword" }, "tagValues": { "type": "text" } } }, "threatLevel": { "type": "integer" }, "threatTypes": { "type": "keyword" }, "timestamp": { "type": "date" }, "tlp": { "type": "keyword" }, "type": { "type": "keyword" }, "validFrom": { "type": "date" }, "validUntil": { "type": "date" }, "vulnerabilityIds": { "properties": { "cve": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "xf": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } } } }, "settings": { "index": { "creation_date": "1604625303266", "number_of_shards": "3", "number_of_replicas": "1", "uuid": "uwv6wSdiTA2zY7SEY5ZsOQ", "version": { "created": "7080099" }, "provided_name": "indicator" } } }}

删除指定index

python

res=requests.delete(es_http+'/'+'new_index',auth=auth) #删除索引

命令行

curl -u user:12735 -DELETE "http://172.160.100.224:9000/indicator"

filter_path显示指定字段

python

添加过滤路径。通过指定字段,只显示数据的指定字段信息(默认显示所有字段的信息)。

from elasticsearch import Elasticsearchdef connect_es(): # 连接数据库 es = Elasticsearch([{'host': 'xxx'}, {'host': 'xxx'}, {'host': 'xxx'}], http_auth=('xxx', 'xxx'), timeout=3600) # es = Elasticsearch([{'host': 'xxx', 'port': 9200}], timeout=3600) # 不需要密码 return eses = connect_es()index = 'indicator'body = { "query":{ "match_all":{} }}filter_path=['hits.hits._source.labels', # 字段1 'hits.hits._source.category'] # 字段2res = es.search(index=index, body=body, filter_path=filter_path) # 指定字段:filter_pathprint(res)

命令行

curl -u user:123445 -XGET "http://172.160.100.224:9200/indicator/_search?pretty" -H 'Content-Type: application/json' -d'{ "query": { "match_all": {} }, "_source": ["labels", "category"]}'

条件查询总数

nums = es.count(index='alert', body=body)['count']es.indices.refresh(index="alert")print('------nums------', nums)

翻页查找

方法1

body={ 'query':{'match_all': {}},}# size:设置一页数据量result = es.search(index='indicator', scroll='5m', size=5, body=body)es.indices.refresh(index="indicator")# 获取总的数据量,用于得到总的数据页数total = result['hits']['total']['value']# 或者nums = es.count(index='alert', body=body)['count']# 获取初始翻页idscrid = result['_scroll_id']# 第一页的数据first_page_data = result['hits']['hits'] # 数据构成的列表# 开始翻页for i in range(5): #翻5页 next_page_data = es.scroll(scroll_id=scrid, scroll='5m') next_page_data = next_page_data['hits']['hits'] # 数据构成的列表

方法2(推荐)

# 设置一页的数据量content_size = 100 # 获取的总的数据量# 按常规方法进行第一次查询first_body = { "query": { "match_all": {} }, 'sort': [ {"confidence": "asc"}, {"_id": "desc"}], # 按照confidence和_id排序,后续会根据这两个值进行排序 'size': 10 # 指定当前页数据量}first_reslut = es.search(index='tool', body=body)['hits']['hits']es.indices.refresh(index="tool")# 主要内容是first_result['_source']['collectSource']# 获取最后一个文档sort后的confidence与id值next_id = first_result[-1]['_source']['sort'] # 返回的结果是[80.0, 'tool-fffa800581654d5fa6ff0051a9e61563']size_cont = len(first_result) while content_size==size_cont: # 若数据量不等于指定的数据量,说明遍历到最后的一页数据了 second_body = { "query": { "match_all": {} }, 'sort': [ {"confidence": "asc"}, {"_id": "desc"}], 'search_after': next_id, # second_body和firt_body仅相差一行代码:从此数据之后的第1个数据开始,但不包含此数据 'size': 10 } next_result = es.search(index='tool', body=body)['hits']['hits'] es.indices.refresh(index="tool") next_id = next_result[-1]['_source']['sort'] for result in next_result: print(result)

ElasticSearch基本命令&python库使用

Elasticsearch Python

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:简单的成绩录入系统程序及分析以及思考
下一篇:关于K8s中Service Account的一些笔记:Pod内部如何访问K8s API Server
相关文章