这里,我采用的是elasticsearch-py与es集群进行交互。 因为有6台服务器作为es集群,如果只是针对一台进行录入,显然效率不高,所以可以同时对所有节点进行录入。 首先按照官网的优化介绍,有几个设置参数是必须在创建表之前就设定的,一旦录入数据之后,这几个参数就没法修改,并参考了下徽沪一郎大神这篇博文《elasticsearch性能调优》。 1 - 通过kibana的dev tools页面:
PUT myindex { "mappings":{ "mytype":{ "_all":{ "enabled":false }, "properties":{ "field1":{ "type":"integer" }, "field2":{ "type":"text" }, "field3":{ "type":"text", "index":false } } } }, "settings":{ "index":{ "number_of_shards":10, "number_of_replicas":0,//可后续动态修改 "codec":"best_compression",//只能初始化时候指定 "query.default_field":"field1", "refresh_interval":"60s" } } }采用了多节点录入,修改内部urllib默认的请求时间,这里只是单进程录入。 官方文档
#! /opt/python2 # -*- coding: utf-8 -*- import logging from elasticsearch import Elasticsearch from elasticsearch import helpers logging.basicConfig( level = logging.INFO, format = "%(asctime)s %(message)s" ) class Import4ES(object): def __init__(self,index,file): ''' 1 - 通过如下将每个节点都加入Elasticsearch,进行初始化,此时后续录入的时候,它就会对所有节点都有机会录入,加快录入速度; 2 - timeout是对内部的urllib3模块的连接超时进行设置,默认是10s; 3 - 之前开启sniffing,不过发现会报错,所以以此方法代替之 ''' super(Import4ES, self).__init__() self.es = Elasticsearch( ['http://host1:9200', 'http://host2:9200', 'http://host3:9200', 'http://host4:9200', 'http://host5:9200', 'http://host6:9200'], timeout = 600 ) self.index = index self.file = file def insert(self,blockDiLi): ''' 1 - 采用elasticsearch中的helpers来调用批量录入函数,即bulk,其中blockDiLi(自定义变量)是一个字典元素的列表,如一次录入500条,那么这个列表中就有500个字典元素 ''' try: status = helpers.bulk(self.es, blockDiLi) except Exception as e: logging.error('insert failed:%s'%str(e)) status = 1 return status def oneline(self,line): fields = line.split() action = { "_index":self.index, "_type":"mytype", "_source":{ "field1":fields[0], "field2":fields[1], "field3":fields[2], "field4":fields[3], "field5":fields[4], "field6":fields[5], "field7":fields[6] } } return action def handle_blockDiLi(self,length = 500): ''' 1 - 读取所需要的文件,读取每个500行,然后进行录入 ''' with open(self.file) as fr: curlen = 0 blockDiLi = [] for ind,line in enumerate(fr): if (ind+1)%length == 0: self.insert(blockDiLi) blockDiLi = [] blockDiLi.append(self.oneline(line)) else: blockDiLi.append(self.oneline(line)) self.insert(blockDiLi) if __name__ == '__main__': file = 'test.txt' ans = Import4ES('myindex',file) ans.handle_blockDiLi()从kibana上的indexing Rate上看, 2017/03/08晚上:单进程录入平均稳定在3000条/s, 2017/03/09上午:单进程录入稳定在5000条/s;两个进程同时录入,基本稳定在9500条/s;三个进程同时录入,基本稳定在12500条/s。
2017/03/08 第一次修改!