Skip to content

Commit

Permalink
Merge pull request #4581 from effective-webwork/elastic-search-update
Browse files Browse the repository at this point in the history
Elastic search update
  • Loading branch information
Kathrin-Huber authored Sep 9, 2021
2 parents b6111ae + 21b57a3 commit 9d8cd61
Show file tree
Hide file tree
Showing 34 changed files with 1,319 additions and 1,254 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,11 @@ public void setConfigProductionDateShow(boolean configProductionDateShow) {
this.configProductionDateShow = configProductionDateShow;
}

/**
* Get Metadata language.
*
* @return metadata language as String
*/
public String getMetadataLanguage() {
if (Objects.isNull(this.metadataLanguage)) {
return "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
package org.kitodo.data.elasticsearch;

import java.io.IOException;
import java.util.Collections;
import java.util.Arrays;
import java.util.List;

import javax.ws.rs.HttpMethod;

Expand All @@ -28,28 +29,33 @@
import org.apache.http.util.EntityUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.main.MainResponse;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.MainResponse;
import org.elasticsearch.rest.RestStatus;
import org.kitodo.config.ConfigMain;
import org.kitodo.data.elasticsearch.api.RestClientInterface;
import org.kitodo.data.elasticsearch.exceptions.CustomResponseException;

/**
* Implementation of Elastic Search REST Client for Index Module.
* Implementation of ElasticSearch REST Client for Index Module.
*/
public abstract class KitodoRestClient implements RestClientInterface {

private static final Logger logger = LogManager.getLogger(KitodoRestClient.class);

protected String index;
protected String indexBase;
protected RestClient client;
protected RestHighLevelClient highLevelClient;

public static final List<String> MAPPING_TYPES = Arrays.asList("batch", "docket", "filter", "process", "project",
"property", "ruleset", "task", "template", "workflow");

/**
* Create REST client.
*
Expand Down Expand Up @@ -85,7 +91,7 @@ private void initiateClient(String host, Integer port, String protocol, String p
}

client = builder.build();
highLevelClient = new RestHighLevelClient(client);
highLevelClient = new RestHighLevelClient(builder);
}
}

Expand Down Expand Up @@ -115,7 +121,7 @@ private void initiateClientWithAuth(String host, Integer port, String protocol,
builder.setPathPrefix(path);
}
client = builder.build();
highLevelClient = new RestHighLevelClient(client);
highLevelClient = new RestHighLevelClient(builder);
}

/**
Expand All @@ -124,7 +130,9 @@ private void initiateClientWithAuth(String host, Integer port, String protocol,
* @return information about the server
*/
public String getServerInformation() throws IOException {
Response response = client.performRequest(HttpMethod.GET, "/", Collections.singletonMap("pretty", "true"));
Request request = new Request(HttpMethod.GET, "/");
request.addParameter("pretty", "true");
Response response = client.performRequest(request);
return EntityUtils.toString(response.getEntity());
}

Expand All @@ -134,81 +142,106 @@ public String getServerInformation() throws IOException {
* @return information about the server
*/
public String getServerInfo() throws IOException {
MainResponse response = highLevelClient.info();
MainResponse response = highLevelClient.info(RequestOptions.DEFAULT);
return response.toString();
}

/**
* Get mapping.
*
* @param mappingType
* the name of table in database as String
* @return mapping
*/
public String getMapping() throws IOException {
Response response = client.performRequest(HttpMethod.GET, "/" + index + "/_mapping",
Collections.singletonMap("pretty", "true"));
public String getMapping(String mappingType) throws IOException {
Request request = new Request(HttpMethod.GET, "/" + indexBase + "_" + mappingType + "/_mapping");
request.addParameter("pretty", "true");
Response response = client.performRequest(request);
return EntityUtils.toString(response.getEntity());
}

/**
* Create new index without mapping.
* Create new indexes without mappings.
*/
public boolean createIndex() throws IOException, CustomResponseException {
return createIndex(null);
public void createIndexes() throws IOException, CustomResponseException {
for (String mappingType : MAPPING_TYPES) {
createIndex(null, mappingType);
}
}

/**
* Create new index with mapping.
*
* @param query
* contains mapping
* @param mappingType
* the name of table in database as String
* @return true or false - can be used for displaying information to user if
* success
*/
public boolean createIndex(String query) throws IOException, CustomResponseException {
public boolean createIndex(String query, String mappingType) throws IOException, CustomResponseException {
if (query == null) {
query = "{\"settings\" : {\"index\" : {\"number_of_shards\" : 1,\"number_of_replicas\" : 0}}}";
}
HttpEntity entity = new NStringEntity(query, ContentType.APPLICATION_JSON);
Response indexResponse = client.performRequest(HttpMethod.PUT, "/" + index, Collections.emptyMap(), entity);
Request request = new Request(HttpMethod.PUT, "/" + indexBase + "_" + mappingType);
request.setEntity(entity);
Response indexResponse = client.performRequest(request);
int statusCode = processStatusCode(indexResponse.getStatusLine());
return statusCode == 200 || statusCode == 201;
}

/**
* Check if index already exists. Needed for frontend.
* Check if all indexes already exist. Needed for frontend.
*
* @return false if doesn't exists, true if exists
* @return false if any index doesn't exist, true else
*/
public boolean indexExists() throws IOException, CustomResponseException {
Response indexResponse = client.performRequest(HttpMethod.GET, "/" + index, Collections.emptyMap());
int statusCode = processStatusCode(indexResponse.getStatusLine());
return statusCode == 200 || statusCode == 201;
public boolean typeIndexesExist() throws IOException, CustomResponseException {
for (String mappingType : MAPPING_TYPES) {
Response indexResponse = client.performRequest(new Request(HttpMethod.GET, "/" + indexBase + "_" + mappingType));
int statusCode = processStatusCode(indexResponse.getStatusLine());
if (statusCode != 200 && statusCode != 201) {
return false;
}
}
return true;
}

/**
* Delete the whole index. Used for cleaning after tests!
* Delete index of given mappingType. Currently, only used in test cases.
* @param mappingType mapping type
*/
public void deleteIndex() throws IOException {
client.performRequest(HttpMethod.DELETE, "/" + index);
public void deleteIndex(String mappingType) throws IOException {
client.performRequest(new Request(HttpMethod.DELETE, "/" + indexBase + "_" + mappingType));
}

/**
* Delete all indexes. Used for cleaning after tests!
*/
public void deleteAllIndexes() throws IOException {
for (String mappingType : MAPPING_TYPES) {
client.performRequest(new Request(HttpMethod.DELETE, "/" + indexBase + "_" + mappingType));
}
}

/**
* Getter for index.
* Getter for indexBase.
* The indexBase is the prefix of all indexes - equal to the name of database, default kitodo.
*
* @return index name
* @return indexBase name
*/
public String getIndex() {
return index;
public String getIndexBase() {
return indexBase;
}

/**
* Setter for index.
* Setter for indexBase. The indexBase is the prefix of all indexes
*
* @param index
* @param indexBase
* - equal to the name of database, default kitodo
*/
public void setIndex(String index) {
this.index = index;
public void setIndexBase(String indexBase) {
this.indexBase = indexBase;
}

protected void handleResponseException(ResponseException e) throws CustomResponseException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
package org.kitodo.data.elasticsearch.index;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;

Expand All @@ -27,14 +26,16 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.kitodo.data.elasticsearch.KitodoRestClient;
import org.kitodo.data.elasticsearch.exceptions.CustomResponseException;
import org.kitodo.data.exceptions.DataException;

/**
* Implementation of Elastic Search REST Client for index package.
* Implementation of ElasticSearch REST Client for index package.
*/
public class IndexRestClient extends KitodoRestClient {

Expand Down Expand Up @@ -83,12 +84,13 @@ public static IndexRestClient getInstance() {
*/
public void addDocument(String type, Map<String, Object> entity, Integer id, boolean forceRefresh)
throws IOException, CustomResponseException {
IndexRequest indexRequest = new IndexRequest(this.index, type, String.valueOf(id)).source(entity);
IndexRequest indexRequest = new IndexRequest(this.indexBase + "_" + type).source(entity);
indexRequest.id(String.valueOf(id));
if (forceRefresh) {
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
}

IndexResponse indexResponse = highLevelClient.index(indexRequest);
IndexResponse indexResponse = highLevelClient.index(indexRequest, RequestOptions.DEFAULT);
processStatusCode(indexResponse.status());
}

Expand All @@ -105,7 +107,7 @@ void addTypeSync(String type, Map<Integer, Map<String, Object>> documentsToIndex
BulkRequest bulkRequest = prepareBulkRequest(type, documentsToIndex);

try {
BulkResponse bulkResponse = highLevelClient.bulk(bulkRequest);
BulkResponse bulkResponse = highLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
if (bulkResponse.hasFailures()) {
throw new CustomResponseException(bulkResponse.buildFailureMessage());
}
Expand All @@ -127,7 +129,7 @@ void addTypeAsync(String type, Map<Integer, Map<String, Object>> documentsToInde
BulkRequest bulkRequest = prepareBulkRequest(type, documentsToIndex);

ResponseListener responseListener = new ResponseListener(type, documentsToIndex.size());
highLevelClient.bulkAsync(bulkRequest, responseListener);
highLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, responseListener);

synchronized (lock) {
while (Objects.isNull(responseListener.getBulkResponse())) {
Expand All @@ -142,7 +144,7 @@ void addTypeAsync(String type, Map<Integer, Map<String, Object>> documentsToInde
}

/**
* Delete document from the index.
* Delete document from type specific index.
*
* @param type
* for which request is performed
Expand All @@ -153,13 +155,14 @@ void addTypeAsync(String type, Map<Integer, Map<String, Object>> documentsToInde
* object is right after that available for display
*/
void deleteDocument(String type, Integer id, boolean forceRefresh) throws CustomResponseException, DataException {
DeleteRequest deleteRequest = new DeleteRequest(this.index, type, String.valueOf(id));
DeleteRequest deleteRequest = new DeleteRequest(this.indexBase + "_" + type);
deleteRequest.id(String.valueOf(id));
if (forceRefresh) {
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
}

try {
highLevelClient.delete(deleteRequest);
highLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
} catch (ResponseException e) {
handleResponseException(e);
} catch (IOException e) {
Expand All @@ -170,26 +173,29 @@ void deleteDocument(String type, Integer id, boolean forceRefresh) throws Custom
/**
* Enable sorting by text field.
*
* @param type
* as String
* @param field
* as String
* @param mappingType
* as String
*/
public void enableSortingByTextField(String type, String field) throws IOException, CustomResponseException {
public void enableSortingByTextField(String field, String mappingType) throws IOException, CustomResponseException {
String query = "{\n \"properties\": {\n\"" + field + "\": {\n" + " \"type\": \"text\",\n"
+ " \"fielddata\": true,\n" + " \"fields\": {\n" + " \"raw\": {\n"
+ " \"type\": \"text\",\n" + " \"index\": false}\n" + " }\n" + " }}}";
HttpEntity entity = new NStringEntity(query, ContentType.APPLICATION_JSON);
Response indexResponse = client.performRequest(HttpMethod.PUT,
"/" + this.getIndex() + "/_mapping/" + type + "?update_all_types", Collections.emptyMap(), entity);
Request request = new Request(HttpMethod.PUT,
"/" + this.getIndexBase() + "_" + mappingType + "/_mappings");
request.setEntity(entity);
Response indexResponse = client.performRequest(request);
processStatusCode(indexResponse.getStatusLine());
}

private BulkRequest prepareBulkRequest(String type, Map<Integer, Map<String, Object>> documentsToIndex) {
BulkRequest bulkRequest = new BulkRequest();

for (Map.Entry<Integer, Map<String, Object>> entry : documentsToIndex.entrySet()) {
IndexRequest indexRequest = new IndexRequest(this.index, type, String.valueOf(entry.getKey()));
IndexRequest indexRequest = new IndexRequest(this.indexBase + "_" + type);
indexRequest.id(String.valueOf(entry.getKey()));
bulkRequest.add(indexRequest.source(entry.getValue()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.kitodo.data.exceptions.DataException;

/**
* Implementation of Elastic Search Indexer for index package.
* Implementation of ElasticSearch Indexer for index package.
*/
public class Indexer<T extends BaseIndexedBean, S extends BaseType> extends Index {

Expand Down Expand Up @@ -123,7 +123,7 @@ public void performMultipleRequests(List<T> baseIndexedBeans, S baseType, boolea

private IndexRestClient initiateRestClient() {
IndexRestClient restClient = IndexRestClient.getInstance();
restClient.setIndex(index);
restClient.setIndexBase(index);
return restClient;
}

Expand Down
Loading

0 comments on commit 9d8cd61

Please sign in to comment.