原文地址:http://blog.csdn.net/u012116196/article/details/51754845
BulkProcessor类提供了一个简单接口自动冲洗批量操作基于请求的数量或大小,或者在给定的时期。
使用它,首先创建一个 BulkProcessor实例:
[java] view plain copy import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; BulkProcessor bulkProcessor = BulkProcessor.builder( client, //<1> new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { ... } //<2> @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { ... } //<3> @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { ... } //<4> }) .setBulkActions(10000) //<5> .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) //<6> .setFlushInterval(TimeValue.timeValueSeconds(5)) //<7> .setConcurrentRequests(1) //<8> .setBackoffPolicy( BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) //<9> .build();
添加您elasticsearch客户
调用此方法之前执行。例如你可以看到numberOfActions request.numberOfActions()
批量执行后调用此方法。例如你可以检查是否有一些失败的请求 response.hasFailures()
调用此方法时,大部分失败了 Throwable
我们想执行批量每10 000个请求
我们想要冲洗每1 gb
我们想要冲洗大部分每5秒钟任何请求的数量
设置并发请求的数量。值为0意味着只有一个单一的请求将被允许执行。值为1时表示并发请求允许同时积累新的批量执行请求。
设置一个自定义的补偿政策,最初将等待100 ms,增加指数和重试3倍。重试尝试当一个或多个大部分项目没有一个请求。 EsRejectedExecutionException这表明有太少的计算资源可用于处理请求。禁用倒扣,通过BackoffPolicy.noBackoff().
然后你可以添加你的请求 BulkProcessor:
[java] view plain copy bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */)); bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));默认情况下, BulkProcessor:
集bulkActions 1000 集bulkSize 5mb 不设置flushInterval concurrentRequests设置为1 集backoffPolicy指数倒扣8重试和开始50毫秒的延迟。总等待时间大约是5.1秒。当文档加载到 BulkProcessor它可以通过使用封闭 awaitClose或 close方法:
[java] view plain copy bulkProcessor.awaitClose(10, TimeUnit.MINUTES); 或 [java] view plain copy bulkProcessor.close();