2.ES文档基本操作CURD实例演示

一灰灰blogSpringBoot搜索系列ElasticSearchElasticSearch约 1758 字大约 6 分钟

本文将作为es系列第二篇,在前文项目搭建的基础上,先来看一下es的基本操作姿势,如何实现CURD

I. 项目搭建

1. 项目依赖

本项目借助SpringBoot 2.2.1.RELEASE + maven 3.5.3 + IDEA进行开发

开一个web服务用于测试

<dependencies>
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
    </dependency>
</dependencies>

2. 配置信息

配置文件application.yml,注意下面的配置信息,下面采用的是由我们自己来解析配置的方式

elasticsearch:
  host: localhost
  port: 9200
  user: elastic
  pwd: test123
  connTimeout: 3000
  socketTimeout: 5000
  connectionRequestTimeout: 500

II. CURD实例

1. 配置

注意,本文介绍的es是添加了权限验证,因此我们在于es进行交互时,需要在请求头中携带验证信息,注意下面的实现姿势

读取配置,初始化RestHighLevelClient,和前文介绍的差不多

@Getter
@Configuration
public class ElasticsearchConfiguration {

    @Value("${elasticsearch.host}")
    private String host;

    @Value("${elasticsearch.port}")
    private int port;

    @Value("${elasticsearch.connTimeout}")
    private int connTimeout;

    @Value("${elasticsearch.socketTimeout}")
    private int socketTimeout;

    @Value("${elasticsearch.connectionRequestTimeout}")
    private int connectionRequestTimeout;

    @Value("${elasticsearch.user}")
    private String user;

    @Value("${elasticsearch.pwd}")
    private String pwd;

    @Bean(destroyMethod = "close", name = "client")
    public RestHighLevelClient initRestClient() {
        RestClientBuilder builder = RestClient.builder(new HttpHost(host, port))
                .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
                        .setConnectTimeout(connTimeout)
                        .setSocketTimeout(socketTimeout)
                        .setConnectionRequestTimeout(connectionRequestTimeout));
        return new RestHighLevelClient(builder);
    }

    @Bean
    public RequestOptions requestOptions() {
        String auth = "Basic " + Base64Utils.encodeToString((user + ":" + pwd).getBytes());
        RequestOptions.Builder build = RequestOptions.DEFAULT.toBuilder();
        build.addHeader("Authorization", auth);
        return build.build();
    }
}

2. 添加数据

@Component
public class BasicCurdDemo {
    @Autowired
    private RestHighLevelClient client;
    @Autowired
    private RequestOptions requestOptions;

    private String TEST_ID = "11123-33345-66543-55231";

    /**
     * 新增数据
     */
    public void addDoc(String indexName, Object obj, String id) throws IOException {
        // 指定索引
        IndexRequest request = new IndexRequest(indexName);
        request.type("_doc");
        // 文档内容,source传参,第一种时按照 fieldName, fieldValue 成对的方式传入;下面是采用json串 + 指定ContentType的方式传入
        request.source(JSON.toJSONString(obj), XContentType.JSON);
        // 指定特殊的id,不指定时自动生成id
        request.id(id);
        IndexResponse response = client.index(request, requestOptions);
        System.out.println("添加数据返回结果: " + response.toString());
    }
}

添加数据,注意是利用 IndexRequest 来构建请求对象,添加文档时有几个注意事项

  • request.source() : 具体需要上传的文档,就是通过它挂上去的,我们这里采用的是json方式
  • request.id(): 如果上传的文档需要指定id,则可以使用它;若未指定,则表明自动生成id

发起请求: client.index()

3. 查询数据

这里先介绍一个基础的根据id进行查询的实例case,更多的查询姿势后面会详细介绍

/**
 * 查询结果
 *
 * @param indexName
 * @param id
 * @throws Exception
 */
public void get(String indexName, String id) throws IOException {
    GetRequest getRequest = new GetRequest(indexName, "_doc", id);
    GetResponse response = client.get(getRequest, requestOptions);
    System.out.println("查询结果:" + response.toString());
}

3. 增量更新数据

根据主键进行更新文档,如下

/**
 * 更新文档,根据id进行更新,增量更新,存在的字段,覆盖;新增的字段,插入;旧字段,保留
 *
 * @param indexName
 * @param docId
 * @param obj
 * @throws IOException
 */
public void updateDoc(String indexName, String docId, Object obj) throws IOException {
    UpdateRequest updateRequest = new UpdateRequest();
    updateRequest.index(indexName);
    updateRequest.type("_doc");
    updateRequest.id(docId);
    // 设置数据
    updateRequest.doc(JSON.toJSONString(obj), XContentType.JSON);

    UpdateResponse response = client.update(updateRequest, requestOptions);
    System.out.println("更新数据返回:" + response.toString());
}

注意

  • 上面的实现属于增量更新策略
  • 即:新传的文档,若key之前已经存在,则覆盖更新;若之前不存在,则插入;之前文档中未被覆盖的数据依然保留

4. 全量更新

另外一个根据条件进行更新的使用case如下

/**
 * 条件更新
 *
 * @param indexName
 * @param query
 * @param data
 * @throws IOException
 */
public void updateByCondition(String indexName, Map<String, String> query, Map<String, Object> data) throws IOException {
    UpdateByQueryRequest updateRequest = new UpdateByQueryRequest(indexName);
    for (Map.Entry<String, String> entry : query.entrySet()) {
        QueryBuilder queryBuilder = new TermQueryBuilder(entry.getKey(), entry.getValue());
        updateRequest.setQuery(queryBuilder);
    }

    // 更新值脚本,精确的更新方式
    // ctx._source['xx'].add('新增字段')
    // 条件判定 if(ctx._source.addr == 'hubei') { ctx._source.addr = 'wuhan';}
    String source = "ctx._source.name='1hui';";
    Script script = new Script(source);
    updateRequest.setScript(script);

    BulkByScrollResponse response = client.updateByQuery(updateRequest, requestOptions);
    System.out.println("条件更新返回: " + response.toString());
    get(indexName, TEST_ID);
    System.out.println("0---------------------0");

    // 采用全量覆盖式更新,直接使用data中的数据,覆盖之前的文档内容
    source = "ctx._source=params";
    script = new Script(ScriptType.INLINE, "painless", source, data);
    updateRequest.setScript(script);
    response = client.updateByQuery(updateRequest, requestOptions);
    System.out.println("条件更新返回: " + response.toString());
    get(indexName, TEST_ID);
}

5. 删除数据

直接根据id进行删除

/**
 * 根据id进行删除
 *
 * @param indexName
 * @param id
 * @throws IOException
 */
public void delete(String indexName, String id) throws IOException {
    DeleteRequest deleteRequest = new DeleteRequest(indexName);
    deleteRequest.type("_doc");
    deleteRequest.id(id);

    DeleteResponse response = client.delete(deleteRequest, requestOptions);
    System.out.println("删除后返回" + response.toString());
}

6. 条件删除数据

根据条件进行匹配删除

/**
 * 条件删除
 *
 * @param indexName
 * @param query
 * @throws IOException
 */
public void deleteByQuery(String indexName, Map<String, String> query) throws IOException {
    DeleteByQueryRequest request = new DeleteByQueryRequest(indexName);
    request.types("_doc");
    for (Map.Entry<String, String> entry : query.entrySet()) {
        QueryBuilder queryBuilder = new TermQueryBuilder(entry.getKey(), entry.getValue());
        request.setQuery(queryBuilder);
    }
    BulkByScrollResponse response = client.deleteByQuery(request, requestOptions);
    System.out.println("条件删除:" + response.toString());
    get(indexName, TEST_ID);
}

7. 测试case

写一个测试demo,将上面的case都跑一遍

public void testOperate() throws IOException {
    String index = "basic_demo";
    Map<String, Object> doc = newMap("name", "一灰灰", "age", 10, "skills", Arrays.asList("java", "python"));
    // 新增
    addDoc(index, doc, TEST_ID);
    // 查询
    get(index, TEST_ID);
    // 更新
    doc.clear();
    doc.put("name", "一灰灰blog");
    doc.put("addr", "hubei");
    updateDoc(index, TEST_ID, doc);
    get(index, TEST_ID);

    updateByCondition(index, newMap("addr", "hubei"), newMap("name", "yihuihui", "site", "https://hhui.top"));
    get(index, TEST_ID);

    // 删除文档
    delete(index, TEST_ID);
}

public <K, V> Map<K, V> newMap(K k, V v, Object... kv) {
    Map<K, V> map = new HashMap<>();
    map.put(k, v);
    for (int i = 0; i < kv.length; i += 2) {
        map.put((K) kv[i], (V) kv[i + 1]);
    }
    return map;
}

输出如下

# 1. 添加数据
添加数据返回结果: IndexResponse[index=basic_demo,type=_doc,id=11123-33345-66543-55231,version=1,result=created,seqNo=34,primaryTerm=4,shards={"total":2,"successful":1,"failed":0}]

# 2. 查询数据
查询结果:{"_index":"basic_demo","_type":"_doc","_id":"11123-33345-66543-55231","_version":1,"_seq_no":34,"_primary_term":4,"found":true,"_source":{"skills":["java","python"],"name":"一灰灰","age":10}}

# 3. 增量更新
2022-03-28 19:56:08.781  WARN 18332 --- [/O dispatcher 1] org.elasticsearch.client.RestClient      : request [POST http://localhost:9200/basic_demo/_doc/11123-33345-66543-55231/_update?timeout=1m] returned 1 warnings: [299 Elasticsearch-7.12.0-78722783c38caa25a70982b5b042074cde5d3b3a "[types removal] Specifying types in document update requests is deprecated, use the endpoint /{index}/_update/{id} instead."]
更新数据返回:UpdateResponse[index=basic_demo,type=_doc,id=11123-33345-66543-55231,version=2,seqNo=35,primaryTerm=4,result=updated,shards=ShardInfo{total=2, successful=1, failures=[]}]
查询结果:{"_index":"basic_demo","_type":"_doc","_id":"11123-33345-66543-55231","_version":2,"_seq_no":35,"_primary_term":4,"found":true,"_source":{"skills":["java","python"],"name":"一灰灰blog","age":10,"addr":"hubei"}}

# 4. 全量条件更新
条件更新返回: BulkByScrollResponse[took=970ms,timed_out=false,sliceId=null,updated=1,created=0,deleted=0,batches=1,versionConflicts=0,noops=0,retries=0,throttledUntil=0s,bulk_failures=[],search_failures=[]]
查询结果:{"_index":"basic_demo","_type":"_doc","_id":"11123-33345-66543-55231","_version":3,"_seq_no":36,"_primary_term":4,"found":true,"_source":{"skills":["java","python"],"name":"1hui","addr":"hubei","age":10}}

III. 不能错过的源码和相关知识点

0. 项目

系列博文

源码

1. 微信公众号: 一灰灰Blog

尽信书则不如,以上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激

下面一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛

一灰灰blog
一灰灰blog
Loading...