Elasticsearch java API批量处理

    xiaoxiao2021-03-25  79

    原文地址:http://blog.csdn.net/u012116196/article/details/51754845

    BulkProcessor类提供了一个简单接口自动冲洗批量操作基于请求的数量或大小,或者在给定的时期。

    使用它,首先创建一个 BulkProcessor实例:

    [java]  view plain  copy import org.elasticsearch.action.bulk.BackoffPolicy;   import org.elasticsearch.action.bulk.BulkProcessor;   import org.elasticsearch.common.unit.ByteSizeUnit;   import org.elasticsearch.common.unit.ByteSizeValue;   import org.elasticsearch.common.unit.TimeValue;      BulkProcessor bulkProcessor = BulkProcessor.builder(           client,  //<1>           new BulkProcessor.Listener() {               @Override               public void beforeBulk(long executionId,                                      BulkRequest request) { ... } //<2>                  @Override               public void afterBulk(long executionId,                                     BulkRequest request,                                     BulkResponse response) { ... } //<3>                  @Override               public void afterBulk(long executionId,                                     BulkRequest request,                                     Throwable failure) { ... } //<4>           })           .setBulkActions(10000//<5>           .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) //<6>           .setFlushInterval(TimeValue.timeValueSeconds(5)) //<7>           .setConcurrentRequests(1//<8>           .setBackoffPolicy(               BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) //<9>           .build();  

    添加您elasticsearch客户

    调用此方法之前执行。例如你可以看到numberOfActions request.numberOfActions()

    批量执行后调用此方法。例如你可以检查是否有一些失败的请求 response.hasFailures()

    调用此方法时,大部分失败了 Throwable

    我们想执行批量每10 000个请求

    我们想要冲洗每1 gb

    我们想要冲洗大部分每5秒钟任何请求的数量

    设置并发请求的数量。值为0意味着只有一个单一的请求将被允许执行。值为1时表示并发请求允许同时积累新的批量执行请求。

    设置一个自定义的补偿政策,最初将等待100 ms,增加指数和重试3倍。重试尝试当一个或多个大部分项目没有一个请求。 EsRejectedExecutionException这表明有太少的计算资源可用于处理请求。禁用倒扣,通过BackoffPolicy.noBackoff().

    然后你可以添加你的请求 BulkProcessor:

    [java]  view plain  copy bulkProcessor.add(new IndexRequest("twitter""tweet""1").source(/* your doc here */));   bulkProcessor.add(new DeleteRequest("twitter""tweet""2"));  

    默认情况下, BulkProcessor:

    集bulkActions 1000 集bulkSize 5mb 不设置flushInterval concurrentRequests设置为1 集backoffPolicy指数倒扣8重试和开始50毫秒的延迟。总等待时间大约是5.1秒。

    当文档加载到 BulkProcessor它可以通过使用封闭 awaitClose close方法:

    [java]  view plain  copy bulkProcessor.awaitClose(10, TimeUnit.MINUTES);   [java]  view plain  copy bulkProcessor.close();  
    转载请注明原文地址: https://ju.6miu.com/read-18241.html

    最新回复(0)