注意:为了测试JavaApi所以采用windows方式启动。
pom.xml
<dependencies> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>5.2.2</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.7</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.7</version> </dependency> <!-- https://mvnrepository.com/artifact/joda-time/joda-time --> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>2.9.7</version> </dependency> <dependency> <groupId>org.apache.lucene</groupId> <artifactId>lucene-core</artifactId> <version>6.4.2</version> </dependency> </dependencies> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.1</version> <executions> <execution> <phase>package</phase> <goals><goal>shade</goal></goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <!-- Main方法自动调用java -jar yourjar.jar --> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>org.elasticsearch.demo.Generate</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin>src/main/resources/og4j2.properties
appender.console.type = Console appender.console.name = console appender.console.layout.type = PatternLayout rootLogger.level = info rootLogger.appenderRef.console.ref = console最常见的方式客户端是通过创建一个连接到一个clusteTransportClient
连接远程Elasticsearch集群使用传输模块
es有一个集群”嗅探”功能,当开始”嗅探”,可以知道
Client允许动态的增加或删除主机地址传输客户端将连接到节点的内部节点列表客户机将调用内部集群状态API在这些节点发现可用的数据节点上。客户端将被替换的内部节点列表中与数据节点(数据列表5s刷新一次)注意:如果节点不是一个数据节点,可能不包括原始节点连接。例如:如果开始连接一个主节点,通过”嗅探”后,没有将请求传递给主节点,而是请求到其他数据节点,是为了避免只在主节点上耗费搜索流量
Settings settings = Settings.builder() .put("cluster.name", "myClusterName")//集群名称 .put("client.transport.sniff", true) //启动"嗅探" .build(); //开启连接 TransportClient client = new PreBuiltTransportClient(Settings.EMPTY) //settings .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300)) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300)); //关闭连接 client.close();其他参数
//gnore cluster name validation of connected nodes client.transport.ignore_cluster_name //The time to wait for a ping response from a node(5s) client.transport.ping_timeout //ping the nodes listed and connected(5s) client.transport.nodes_sampler_interval在本地一个协调节点,然后简单地创建一个TransportClient在您的应用程序连接到这个协调节点。这样,只有协调节点可以加载任何你所需要的插件
1.Multi Get API 2.Bulk API
index API 允许将一个索引类型的JSON文档转换成特定的索引,以便于搜索
生成JSON文档的几种方式
手动字符串拼接
String json = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}";通过map自动转换
Map<String, Object> json = new HashMap<String, Object>(); json.put("user","kimchy"); json.put("postDate",new Date()); json.put("message","trying out Elasticsearch");运用第三方工具,例如Jackson等
import com.fasterxml.jackson.databind.*; // instance a json mapper ObjectMapper mapper = new ObjectMapper(); // create once, reuse // generate json byte[] json = mapper.writeValueAsBytes(yourbeaninstance);使用 XContentFactory.jsonBuilder()(Es结构化工具)
import static org.elasticsearch.common.xcontent.XContentFactory.*; XContentBuilder builder = jsonBuilder() .startObject()//拼接对象 .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject(); String json = builder.string(); //拼接数组 startArray(json);需要将拼接的数据填入_source字段
import static org.elasticsearch.common.xcontent.XContentFactory.*; //以某网站的用户为例 //查找tmall商城的编号为1的用户user IndexResponse response = client.prepareIndex("tmall", "user", "1") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "Elasticsearch") .endObject() ) //将json文档放入_source字段 .get();如果仅仅是字符串类型,则不需要id
String json = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}"; IndexResponse response = client.prepareIndex("twitter", "tweet") .setSource(json) .get();可以得到数据报告
// Index name String _index = response.getIndex(); // Type name String _type = response.getType(); // Document ID (generated or not) String _id = response.getId(); // Version (if it's the first time you index this document, you will get: 1) long _version = response.getVersion(); // status has stored current instance statement. RestStatus status = response.status();关于IndexAPI更多请参考
更多MultiGetAPI参考
在给定的时间段或者请求达到一定的数量之后自动”冲洗”批处理请求 The BulkProcessor class offers a simple interface to flush bulk operations automatically based on the number or size of requests, or after a given period
import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; //添加elasticsearch客户端 BulkProcessor bulkProcessor = BulkProcessor.builder( client, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { //etc.request.numberOfActions() } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { //etc.response.hasFailures() } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { //etc.raise Throwable } }) .setBulkActions(10000) //批量执行请求数目(10000) .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))//清洗大小(5Mb) //清洗间隔(5s) .setFlushInterval(TimeValue.timeValueSeconds(5)) //设置并发请求的数量。的值为0意味着只有一个单一的请求将被允许执行。值为1时表示1并发请求允许同时积累新的批量执行请求。 .setConcurrentRequests(1) //请求失败最多尝试的次数(3)和时间间隔(100ms) .setBackoffPolicy( BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) .build();