【工作笔记】ElasticSearch从零开始学(四)—— Java

    xiaoxiao2021-03-25  123

    启动ElasticSearch

    #linux ./bin/elasticsearch #windows bin/elasticsearch.bat

    注意:为了测试JavaApi所以采用windows方式启动。

    启动成功页面

    Maven

    pom.xml

    <dependencies> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>5.2.2</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.7</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.7</version> </dependency> <!-- https://mvnrepository.com/artifact/joda-time/joda-time --> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>2.9.7</version> </dependency> <dependency> <groupId>org.apache.lucene</groupId> <artifactId>lucene-core</artifactId> <version>6.4.2</version> </dependency> </dependencies> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.1</version> <executions> <execution> <phase>package</phase> <goals><goal>shade</goal></goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <!-- Main方法自动调用java -jar yourjar.jar --> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>org.elasticsearch.demo.Generate</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin>

    src/main/resources/og4j2.properties

    appender.console.type = Console appender.console.name = console appender.console.layout.type = PatternLayout rootLogger.level = info rootLogger.appenderRef.console.ref = console

    创建Java客户端

    最常见的方式客户端是通过创建一个连接到一个clusteTransportClient

    TransportClient

    连接远程Elasticsearch集群使用传输模块


    es有一个集群”嗅探”功能,当开始”嗅探”,可以知道

    Client允许动态的增加或删除主机地址传输客户端将连接到节点的内部节点列表客户机将调用内部集群状态API在这些节点发现可用的数据节点上。客户端将被替换的内部节点列表中与数据节点(数据列表5s刷新一次)

    注意:如果节点不是一个数据节点,可能不包括原始节点连接。例如:如果开始连接一个主节点,通过”嗅探”后,没有将请求传递给主节点,而是请求到其他数据节点,是为了避免只在主节点上耗费搜索流量

    Settings settings = Settings.builder() .put("cluster.name", "myClusterName")//集群名称 .put("client.transport.sniff", true) //启动"嗅探" .build(); //开启连接 TransportClient client = new PreBuiltTransportClient(Settings.EMPTY) //settings .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300)) .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300)); //关闭连接 client.close();

    其他参数

    //gnore cluster name validation of connected nodes client.transport.ignore_cluster_name //The time to wait for a ping response from a node(5s) client.transport.ping_timeout //ping the nodes listed and connected(5s) client.transport.nodes_sampler_interval

    协调节点

    在本地一个协调节点,然后简单地创建一个TransportClient在您的应用程序连接到这个协调节点。这样,只有协调节点可以加载任何你所需要的插件

    DocumentAPI

    CRUD

    简单的文档API

    Index APIGet APIDelete APIDelete By Query APIUpdate API

    多文档API

    1.Multi Get API 2.Bulk API


    Generate JSON Ways

    index API 允许将一个索引类型的JSON文档转换成特定的索引,以便于搜索

    生成JSON文档的几种方式

    手动字符串拼接

    String json = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}";

    通过map自动转换

    Map<String, Object> json = new HashMap<String, Object>(); json.put("user","kimchy"); json.put("postDate",new Date()); json.put("message","trying out Elasticsearch");

    运用第三方工具,例如Jackson等

    import com.fasterxml.jackson.databind.*; // instance a json mapper ObjectMapper mapper = new ObjectMapper(); // create once, reuse // generate json byte[] json = mapper.writeValueAsBytes(yourbeaninstance);

    使用 XContentFactory.jsonBuilder()(Es结构化工具)

    import static org.elasticsearch.common.xcontent.XContentFactory.*; XContentBuilder builder = jsonBuilder() .startObject()//拼接对象 .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject(); String json = builder.string(); //拼接数组 startArray(json);

    Index API

    需要将拼接的数据填入_source字段

    import static org.elasticsearch.common.xcontent.XContentFactory.*; //以某网站的用户为例 //查找tmall商城的编号为1的用户user IndexResponse response = client.prepareIndex("tmall", "user", "1") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "Elasticsearch") .endObject() ) //将json文档放入_source字段 .get();

    如果仅仅是字符串类型,则不需要id

    String json = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}"; IndexResponse response = client.prepareIndex("twitter", "tweet") .setSource(json) .get();

    IndexResponse

    可以得到数据报告

    // Index name String _index = response.getIndex(); // Type name String _type = response.getType(); // Document ID (generated or not) String _id = response.getId(); // Version (if it's the first time you index this document, you will get: 1) long _version = response.getVersion(); // status has stored current instance statement. RestStatus status = response.status();

    操作线程

    setOperationThreaded(boolean)

    关于IndexAPI更多请参考


    Get API

    GetResponse response = client.prepareGet("tmall", "user", "1").get();

    Delete API

    DeleteResponse response = client.prepareDelete("tmall", "user", "1").get();

    Delete API Update API

    BulkIndexByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client) .filter(QueryBuilders.matchQuery("sex", "男")) //条件 .source("persons") //索引 .get(); long deleted = response.getDeleted();

    异步删除

    DeleteByQueryAction.INSTANCE.newRequestBuilder(client) .filter(QueryBuilders.matchQuery("sex", "男")) .source("persons") .execute(new ActionListener<BulkIndexByScrollResponse>() { @Override public void onResponse(BulkIndexByScrollResponse response) { long deleted = response.getDeleted(); } @Override public void onFailure(Exception e) { // Handle the exception } });

    UpdateAPI

    UpdateRequest updateRequest = new UpdateRequest(); updateRequest.index("index"); updateRequest.type("type"); updateRequest.id("1"); updateRequest.doc(jsonBuilder() .startObject() .field("sex", "女") .endObject()); client.update(updateRequest).get();

    Update By Script

    client.prepareUpdate("tmall", "user", "1") .setScript(new Script("ctx._source.sex = \"男\"" , ScriptService.ScriptType.INLINE, null, null)) .get(); client.prepareUpdate("tmall", "user", "1") .setDoc(jsonBuilder() .startObject() .field("sex", "女") .endObject()) .get(); UpdateRequest updateRequest = new UpdateRequest("tmall", "user", "1") .script(new Script("ctx._source.sex = \"男\"")); client.update(updateRequest).get();

    更新后,通过部分文档合并到现有文档

    UpdateRequest updateRequest = new UpdateRequest("index", "type", "1") .doc(jsonBuilder() .startObject() .field("gender", "male") .endObject()); client.update(updateRequest).get();

    Upsert

    IndexRequest indexRequest = new IndexRequest("index", "type", "1") .source(jsonBuilder() .startObject() .field("name", "Joe Smith") .field("gender", "male") .endObject()); UpdateRequest updateRequest = new UpdateRequest("index", "type", "1") .doc(jsonBuilder() .startObject() .field("gender", "male") .endObject()) .upsert(indexRequest); client.update(updateRequest).get();

    Multi Get API //批量查询

    MultiGetResponse multiGetItemResponses = client.prepareMultiGet() .add("twitter", "tweet", "1") .add("twitter", "tweet", "2", "3", "4") .add("another", "type", "foo") .get(); for (MultiGetItemResponse itemResponse : multiGetItemResponses) { GetResponse response = itemResponse.getResponse(); if (response.isExists()) { String json = response.getSourceAsString(); } }

    更多MultiGetAPI参考

    Bulk API 批量导入

    import static org.elasticsearch.common.xcontent.XContentFactory.*; BulkRequestBuilder bulkRequest = client.prepareBulk(); // either use client#prepare, or use Requests# to directly build index/delete requests bulkRequest.add(client.prepareIndex("twitter", "tweet", "1") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject() ) ); bulkRequest.add(client.prepareIndex("twitter", "tweet", "2") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "another post") .endObject() ) ); BulkResponse bulkResponse = bulkRequest.get(); if (bulkResponse.hasFailures()) { // process failures by iterating through each bulk response item }

    Bulk Processer(批处理器)

    在给定的时间段或者请求达到一定的数量之后自动”冲洗”批处理请求 The BulkProcessor class offers a simple interface to flush bulk operations automatically based on the number or size of requests, or after a given period

    import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; //添加elasticsearch客户端 BulkProcessor bulkProcessor = BulkProcessor.builder( client, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { //etc.request.numberOfActions() } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { //etc.response.hasFailures() } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { //etc.raise Throwable } }) .setBulkActions(10000) //批量执行请求数目(10000.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))//清洗大小(5Mb) //清洗间隔(5s) .setFlushInterval(TimeValue.timeValueSeconds(5)) //设置并发请求的数量。的值为0意味着只有一个单一的请求将被允许执行。值为1时表示1并发请求允许同时积累新的批量执行请求。 .setConcurrentRequests(1) //请求失败最多尝试的次数(3)和时间间隔(100ms) .setBackoffPolicy( BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) .build();
    转载请注明原文地址: https://ju.6miu.com/read-14817.html

    最新回复(0)