国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Elasticsearch Java API 6.2(文檔API)

lykops / 1632人閱讀

摘要:注意當(dāng)一個文檔在快照的時間和索引請求過程之間發(fā)生變化時,會發(fā)生版本沖突。當(dāng)版本匹配時,更新文檔并增加版本號。在正在運行的更新中,使用更改的值使用查找的值。值加快進程立即生效,減慢查詢的值在完成當(dāng)前批處理后生效,以防止?jié)L動超時。

文檔API

本節(jié)描述以下CRUD API:

單文檔的API

Index API

Get API

Delete API

Update API

多文檔API

Multi Get API

Bulk API

Reindex API

Update By Query API

Delete By Query API

注意
所有CRUD API都是單索引API,索引參數(shù)接受單個索引名,或指向單個索引的別名
Index API

index API允許將類型化的JSON文檔索引到特定的索引中,并使其可搜索。

生成JSON文檔

生成JSON文檔有幾種不同的方法:

手動的(也就是你自己)使用原生byte[]或作為String

使用一個Map,該Map將自動轉(zhuǎn)換為它的JSON等效項

使用第三方庫對bean(如Jackson)進行序列化

使用內(nèi)置的助手XContentFactory.jsonBuilder()

在內(nèi)部,每個類型被轉(zhuǎn)換為byte[](像String被轉(zhuǎn)換為byte[]),因此,如果對象已經(jīng)以這種形式存在,那么就使用它,jsonBuilder是高度優(yōu)化的JSON生成器,它直接構(gòu)造一個byte[]。

自己動手

這里沒有什么困難,但是請注意,您必須根據(jù)日期格式對日期進行編碼。

String json = "{" +
        ""user":"kimchy"," +
        ""postDate":"2013-01-30"," +
        ""message":"trying out Elasticsearch"" +
    "}";
使用Map

Map是一個鍵:值對集合,它表示一個JSON結(jié)構(gòu):

Map json = new HashMap();
json.put("user","kimchy");
json.put("postDate",new Date());
json.put("message","trying out Elasticsearch");
bean序列化

可以使用Jacksonbean序列化為JSON,請將Jackson Databind添加到您的項目中,然后,您可以使用ObjectMapper來序列化您的bean:

import com.fasterxml.jackson.databind.*;

// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse

// generate json
byte[] json = mapper.writeValueAsBytes(yourbeaninstance);
使用Elasticsearch助手

Elasticsearch提供了內(nèi)置的助手來生成JSON內(nèi)容。

import static org.elasticsearch.common.xcontent.XContentFactory.*;

XContentBuilder builder = jsonBuilder()
    .startObject()
        .field("user", "kimchy")
        .field("postDate", new Date())
        .field("message", "trying out Elasticsearch")
    .endObject()

注意,您還可以使用startArray(String)endArray()方法添加數(shù)組,順便說一下,field方法接受許多對象類型,您可以直接傳遞數(shù)字、日期甚至其他XContentBuilder對象。

如果需要查看生成的JSON內(nèi)容,可以使用string()方法。

String json = builder.string();
索引文檔

下面的示例將JSON文檔索引為一個名為twitter的索引,其類型為tweet, id值為1:

import static org.elasticsearch.common.xcontent.XContentFactory.*;

IndexResponse response = client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        .get();

注意,您還可以將文檔索引為JSON字符串,并且不需要提供ID:

String json = "{" +
        ""user":"kimchy"," +
        ""postDate":"2013-01-30"," +
        ""message":"trying out Elasticsearch"" +
    "}";

IndexResponse response = client.prepareIndex("twitter", "tweet")
        .setSource(json, XContentType.JSON)
        .get();

IndexResponse對象會給你一個響應(yīng):

// Index name
String _index = response.getIndex();
// Type name
String _type = response.getType();
// Document ID (generated or not)
String _id = response.getId();
// Version (if it"s the first time you index this document, you will get: 1)
long _version = response.getVersion();
// status has stored current instance statement.
RestStatus status = response.status();

有關(guān)索引操作的更多信息,請查看REST索引文檔

Get API

get API允許根據(jù)索引的id從索引中獲取類型化的JSON文檔,下面的示例從一個名為twitter的索引中獲取JSON文檔,該索引的類型名為tweet, id值為1:

GetResponse response = client.prepareGet("twitter", "tweet", "1").get();

有關(guān)get操作的更多信息,請查看REST get文檔。

Delete API

delete API允許基于id從特定索引中刪除類型化的JSON文檔,下面的示例從名為twitter的索引中刪除JSON文檔,該索引的類型名為tweet, id值為1:

DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();
Delete By Query API

通過查詢刪除的API可以根據(jù)查詢結(jié)果刪除給定的一組文檔:

BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
    .filter(QueryBuilders.matchQuery("gender", "male")) 
    .source("persons")                                  
    .get();                                             
long deleted = response.getDeleted();

QueryBuilders.matchQuery("gender", "male")(查詢)

source("persons") (索引)

get()(執(zhí)行操作)

response.getDeleted()(被刪除的文檔數(shù))

由于這是一個長時間運行的操作,如果您希望異步執(zhí)行,可以調(diào)用execute而不是get,并提供如下監(jiān)聽器:

DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
    .filter(QueryBuilders.matchQuery("gender", "male"))     
    .source("persons")                                      
    .execute(new ActionListener() {   
        @Override
        public void onResponse(BulkByScrollResponse response) {
            long deleted = response.getDeleted();           
        }
        @Override
        public void onFailure(Exception e) {
            // Handle the exception
        }
    });
Update API

您可以創(chuàng)建一個UpdateRequest并將其發(fā)送給客戶端:

UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("index");
updateRequest.type("type");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
        .startObject()
            .field("gender", "male")
        .endObject());
client.update(updateRequest).get();

也可以使用prepareUpdate()方法:

client.prepareUpdate("ttl", "doc", "1")
        .setScript(new Script("ctx._source.gender = "male""  , ScriptService.ScriptType.INLINE, null, null))
        .get();

client.prepareUpdate("ttl", "doc", "1")
        .setDoc(jsonBuilder()               
            .startObject()
                .field("gender", "male")
            .endObject())
        .get();

Script()(你的腳本,它也可以是本地存儲的腳本名)

setDoc()(將合并到現(xiàn)有的文檔)

注意,您不能同時提供腳本和doc

使用腳本更新

update API允許基于提供的腳本更新文檔:

UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1")
        .script(new Script("ctx._source.gender = "male""));
client.update(updateRequest).get();
通過合并文檔更新

update API還支持傳遞一個部分文檔合并到現(xiàn)有文檔中(簡單的遞歸合并,內(nèi)部合并對象,取代核心的“鍵/值”和數(shù)組),例如:

UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject());
client.update(updateRequest).get();
Upsert

也有對Upsert的支持,如果文檔不存在,則使用upsert元素的內(nèi)容索引新的doc:

IndexRequest indexRequest = new IndexRequest("index", "type", "1")
        .source(jsonBuilder()
            .startObject()
                .field("name", "Joe Smith")
                .field("gender", "male")
            .endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject())
        .upsert(indexRequest);              
client.update(updateRequest).get();

如果文檔不存在,將添加indexRequest中的文檔。

如果文件index/type/1已經(jīng)存在,我們將在此操作后獲得如下文件:

{
    "name"  : "Joe Dalton",
    "gender": "male"        
}

"gender": "male"(此字段由更新請求添加)

如果不存在,我們將有一份新文件:

{
    "name" : "Joe Smith",
    "gender": "male"
}
Multi Get API

multi get API允許根據(jù)文檔的indextypeid獲取文檔列表:

MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
    .add("twitter", "tweet", "1")           
    .add("twitter", "tweet", "2", "3", "4") 
    .add("another", "type", "foo")          
    .get();

for (MultiGetItemResponse itemResponse : multiGetItemResponses) { 
    GetResponse response = itemResponse.getResponse();
    if (response.isExists()) {                      
        String json = response.getSourceAsString(); 
    }
}

add("twitter", "tweet", "1")(通過單一id)

add("twitter", "tweet", "2", "3", "4")(或以相同index/type的id列表)

add("another", "type", "foo")(你也可以從另一個索引中得到)

MultiGetItemResponse itemResponse : multiGetItemResponses(迭代結(jié)果集)

response.isExists()(您可以檢查文檔是否存在)

response.getSourceAsString()(訪問_source字段)

有關(guān)multi get操作的更多信息,請查看剩余的multi get文檔

Bulk API

bulk API允許在一個請求中索引和刪除多個文檔,這里有一個示例用法:

import static org.elasticsearch.common.xcontent.XContentFactory.*;

BulkRequestBuilder bulkRequest = client.prepareBulk();

// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        );

bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "another post")
                    .endObject()
                  )
        );

BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
    // process failures by iterating through each bulk response item
}
使用Bulk處理器

BulkProcessor類提供了一個簡單的接口,可以根據(jù)請求的數(shù)量或大小自動刷新bulk操作,或者在給定的時間之后。

要使用它,首先創(chuàng)建一個BulkProcessor實例:

import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;

BulkProcessor bulkProcessor = BulkProcessor.builder(
        client,  
        new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId,
                                   BulkRequest request) { ... } 

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  BulkResponse response) { ... } 

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  Throwable failure) { ... } 
        })
        .setBulkActions(10000) 
        .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) 
        .setFlushInterval(TimeValue.timeValueSeconds(5)) 
        .setConcurrentRequests(1) 
        .setBackoffPolicy(
            BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) 
        .build();

beforeBulk()

此方法在執(zhí)行bulk之前被調(diào)用,例如,您可以通過request.numberOfActions()查看numberOfActions

afterBulk(...BulkResponse response)

此方法在執(zhí)行bulk之后被調(diào)用,例如,您可以通過response.hasFailures()檢查是否存在失敗請求

afterBulk(...Throwable failure)

當(dāng)bulk失敗并引發(fā)一個可拋出對象時,將調(diào)用此方法

setBulkActions(10000)

我們希望每10,000個請求就執(zhí)行一次bulk

setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))

我們希望每5MB就flush一次

setFlushInterval(TimeValue.timeValueSeconds(5))

無論請求的數(shù)量是多少,我們都希望每5秒flush一次

setConcurrentRequests(1)

設(shè)置并發(fā)請求的數(shù)量,值為0意味著只允許執(zhí)行一個請求,在積累新的bulk請求時,允許執(zhí)行一個值為1的并發(fā)請求

setBackoffPolicy()

設(shè)置一個自定義的備份策略,該策略最初將等待100ms,以指數(shù)形式增加并重試三次,當(dāng)一個或多個bulk項目請求以EsRejectedExecutionException失敗時,將嘗試重試,該異常表明用于處理請求的計算資源太少,要禁用backoff,請傳遞BackoffPolicy.noBackoff()

默認情況下,BulkProcessor:

bulkActions設(shè)置為1000

bulkSize設(shè)置為5mb

不設(shè)置flushInterval

concurrentrequest設(shè)置為1,這意味著flush操作的異步執(zhí)行

backoffPolicy設(shè)置為一個指數(shù)備份,8次重試,啟動延時為50ms,總等待時間約為5.1秒

添加請求

然后您可以簡單地將您的請求添加到BulkProcessor:

bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
關(guān)閉Bulk Processor

當(dāng)所有的文檔都被加載到BulkProcessor,可以使用awaitCloseclose方法進行關(guān)閉:

bulkProcessor.awaitClose(10, TimeUnit.MINUTES);

bulkProcessor.close();

如果通過設(shè)置flushInterval來調(diào)度其他計劃的flush,這兩種方法都將flush所有剩余的文檔,并禁用所有其他計劃flush。如果并發(fā)請求被啟用,那么awaitClose方法等待指定的超時以完成所有bulk請求,然后返回true,如果在所有bulk請求完成之前指定的等待時間已經(jīng)過去,則返回false,close方法不等待任何剩余的批量請求完成并立即退出。

在測試中使用Bulk Processor

如果您正在使用Elasticsearch運行測試,并且正在使用BulkProcessor來填充數(shù)據(jù)集,那么您最好將并發(fā)請求的數(shù)量設(shè)置為0,以便以同步方式執(zhí)行批量的flush操作:

BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
        .setBulkActions(10000)
        .setConcurrentRequests(0)
        .build();

// Add your requests
bulkProcessor.add(/* Your requests */);

// Flush any remaining requests
bulkProcessor.flush();

// Or close the bulkProcessor if you don"t need it anymore
bulkProcessor.close();

// Refresh your indices
client.admin().indices().prepareRefresh().get();

// Now you can start searching!
client.prepareSearch().get();
Update By Query API

updateByQuery最簡單的用法是在不更改源的情況下更新索引中的每個文檔,這種用法允許獲取一個新屬性或另一個在線映射更改。

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index").abortOnVersionConflict(false);
BulkByScrollResponse response = updateByQuery.get();

updateByQuery API的調(diào)用從獲取索引快照開始,索引使用內(nèi)部版本控制找到任何文檔。

注意
當(dāng)一個文檔在快照的時間和索引請求過程之間發(fā)生變化時,會發(fā)生版本沖突。

當(dāng)版本匹配時,updateByQuery更新文檔并增加版本號。

所有更新和查詢失敗都會導(dǎo)致updateByQuery中止,這些故障可以從BulkByScrollResponse#getIndexingFailures方法中獲得,任何成功的更新仍然存在,并且不會回滾,當(dāng)?shù)谝淮问?dǎo)致中止時,響應(yīng)包含由失敗的bulk請求生成的所有失敗。

為了防止版本沖突導(dǎo)致updateByQuery中止,請設(shè)置abortOnVersionConflict(false),第一個示例之所以這樣做,是因為它試圖獲取在線映射更改,而版本沖突意味著在相同時間開始updateByQuery和試圖更新文檔的沖突文檔。這很好,因為該更新將獲取在線映射更新。

UpdateByQueryRequestBuilder API支持過濾更新的文檔,限制要更新的文檔總數(shù),并使用腳本更新文檔:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
    .filter(QueryBuilders.termQuery("level", "awesome"))
    .size(1000)
    .script(new Script(ScriptType.INLINE, "ctx._source.awesome = "absolutely"", "painless", Collections.emptyMap()));
BulkByScrollResponse response = updateByQuery.get();

UpdateByQueryRequestBuilder還允許直接訪問用于選擇文檔的查詢,您可以使用此訪問來更改默認的滾動大小,或者以其他方式修改對匹配文檔的請求。

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
    .source().setSize(500);
BulkByScrollResponse response = updateByQuery.get();

您還可以將大小與排序相結(jié)合以限制文檔的更新:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index").size(100)
    .source().addSort("cat", SortOrder.DESC);
BulkByScrollResponse response = updateByQuery.get();

除了更改文檔的_source字段外,還可以使用腳本更改操作,類似于Update API:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
    .script(new Script(
        ScriptType.INLINE,
        "if (ctx._source.awesome == "absolutely) {"
            + "  ctx.op="noop""
            + "} else if (ctx._source.awesome == "lame") {"
            + "  ctx.op="delete""
            + "} else {"
            + "ctx._source.awesome = "absolutely"}",
        "painless",
        Collections.emptyMap()));
BulkByScrollResponse response = updateByQuery.get();

Update API中,可以設(shè)置ctx.op的值來更改執(zhí)行的操作:

noop

如果您的腳本沒有做任何更改,設(shè)置ctx.op = "noop",updateByQuery操作將從更新中省略該文檔,這種行為增加了響應(yīng)主體中的noop計數(shù)器。

delete

如果您的腳本決定必須刪除該文檔,設(shè)置ctx.op = "delete",刪除將在響應(yīng)主體中已刪除的計數(shù)器中報告。

ctx.op設(shè)置為任何其他值都會產(chǎn)生錯誤,在ctx中設(shè)置任何其他字段都會產(chǎn)生錯誤。

這個API不允許您移動它所接觸的文檔,只是修改它們的源,這是故意的!我們沒有規(guī)定要把文件從原來的位置移走。

您也可以同時對多個索引和類型執(zhí)行這些操作,類似于search API:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("foo", "bar").source().setTypes("a", "b");
BulkByScrollResponse response = updateByQuery.get();

如果提供路由值,則進程將路由值復(fù)制到滾動查詢,將進程限制為與路由值匹配的碎片:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source().setRouting("cat");
BulkByScrollResponse response = updateByQuery.get();

updateByQuery也可以通過指定這樣的pipeline來使用ingest節(jié)點:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.setPipeline("hurray");
BulkByScrollResponse response = updateByQuery.get();
使用Task API

您可以使用Task API獲取所有正在運行的update-by-query請求的狀態(tài):

ListTasksResponse tasksList = client.admin().cluster().prepareListTasks()
    .setActions(UpdateByQueryAction.NAME).setDetailed(true).get();
for (TaskInfo info: tasksList.getTasks()) {
    TaskId taskId = info.getTaskId();
    BulkByScrollTask.Status status = (BulkByScrollTask.Status) info.getStatus();
    // do stuff
}

使用上面所示的TaskId,您可以直接查找任務(wù):

GetTaskResponse get = client.admin().cluster().prepareGetTask(taskId).get();

使用Cancel Task API

任何查詢更新都可以使用Task Cancel API取消:

// Cancel all update-by-query requests
client.admin().cluster().prepareCancelTasks().setActions(UpdateByQueryAction.NAME).get().getTasks();
// Cancel a specific update-by-query request
client.admin().cluster().prepareCancelTasks().setTaskId(taskId).get().getTasks();

使用list tasks API查找taskId的值。

取消請求通常是一個非??焖俚倪^程,但可能要花費幾秒鐘的時間,task status API繼續(xù)列出任務(wù),直到取消完成。

Rethrottling

在正在運行的更新中,使用_rethrottle API更改requests_per_second的值:

RethrottleAction.INSTANCE.newRequestBuilder(client)
    .setTaskId(taskId)
    .setRequestsPerSecond(2.0f)
    .get();

使用list tasks API查找taskId的值。

updateByQuery API一樣,requests_per_second的值可以是任何正值的浮點值來設(shè)置節(jié)流的級別,或者Float.POSITIVE_INFINITY禁用節(jié)流。requests_per_second值加快進程立即生效,減慢查詢的requests_per_second值在完成當(dāng)前批處理后生效,以防止?jié)L動超時。

Reindex API

詳情見reindex API

BulkByScrollResponse response = ReindexAction.INSTANCE.newRequestBuilder(client)
    .destination("target_index")
    .filter(QueryBuilders.matchQuery("category", "xzy")) 
    .get();

還可以提供查詢來篩選應(yīng)該從源索引到目標(biāo)索引的哪些文檔。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://m.specialneedsforspecialkids.com/yun/69697.html

相關(guān)文章

  • Elasticsearch Java API 6.2java client)

    摘要:高級客戶端目前支持更常用的,但還有很多東西需要補充,您可以通過告訴我們您的應(yīng)用程序需要哪些缺失的來幫助我們優(yōu)化優(yōu)先級,通過向這個添加注釋高級客戶端完整性。傳輸客戶端排除非數(shù)據(jù)節(jié)點的原因是為了避免將搜索流量發(fā)送給主節(jié)點。 前言 本節(jié)描述了Elasticsearch提供的Java API,所有的Elasticsearch操作都使用客戶端對象執(zhí)行,所有操作本質(zhì)上都是完全異步的(要么接收監(jiān)聽器...

    Gu_Yan 評論0 收藏0
  • Elasticsearch Java High Level REST Client(入門)

    摘要:入門本節(jié)描述從獲取工件到在應(yīng)用程序中使用它如何開始使用高級別客戶端。保證能夠與運行在相同主版本和大于或等于的次要版本上的任何節(jié)點通信。與具有相同的發(fā)布周期,將版本替換為想要的客戶端版本。 Java High Level REST Client 入門 本節(jié)描述從獲取工件到在應(yīng)用程序中使用它如何開始使用高級別REST客戶端。 兼容性 Java High Level REST Client需...

    honmaple 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<