Skip to content

Commit

Permalink
fix(elastic): allow more time for re-indexing tasks (#3794)
Browse files Browse the repository at this point in the history
  • Loading branch information
gabe-lyons authored Dec 27, 2021
1 parent 5df5150 commit e7b9379
Showing 1 changed file with 30 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
Expand All @@ -25,11 +27,11 @@
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.GetMappingsRequest;
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.ReindexRequest;


@Slf4j
@RequiredArgsConstructor
public class ESIndexBuilder {
Expand Down Expand Up @@ -89,9 +91,33 @@ public void buildIndex(String indexName, Map<String, Object> mappings, Map<Strin
String tempIndexName = indexName + "_" + System.currentTimeMillis();
createIndex(tempIndexName, mappings, finalSettings);
try {
searchClient.reindex(
new ReindexRequest().setSourceIndices(indexName).setDestIndex(tempIndexName),
RequestOptions.DEFAULT);
TaskSubmissionResponse reindexTask;
reindexTask = searchClient.submitReindexTask(new ReindexRequest().setSourceIndices(indexName).setDestIndex(tempIndexName),
RequestOptions.DEFAULT);

// wait up to 5 minutes for the task to complete
long startTime = System.currentTimeMillis();
long millisToWait60Minutes = 1000 * 60 * 60;
Boolean reindexTaskCompleted = false;

while ((System.currentTimeMillis() - startTime) < millisToWait60Minutes) {
log.info("Reindexing from {} to {} in progress...", indexName, tempIndexName);
ListTasksRequest request = new ListTasksRequest();
ListTasksResponse tasks = searchClient.tasks().list(request, RequestOptions.DEFAULT);
if (tasks.getTasks().stream().noneMatch(task -> task.getTaskId().toString().equals(reindexTask.getTask()))) {
log.info("Reindexing {} to {} task has completed, will now check if reindex was successful", indexName, tempIndexName);
reindexTaskCompleted = true;
break;
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
log.info("Trouble sleeping while reindexing {} to {}: Exception {}. Retrying...", indexName, tempIndexName, e.toString());
}
}
if (!reindexTaskCompleted) {
throw new RuntimeException(String.format("Reindex from %s to %s failed-- task exceeded 60 minute limit", indexName, tempIndexName));
}
} catch (Exception e) {
log.info("Failed to reindex {} to {}: Exception {}", indexName, tempIndexName, e.toString());
searchClient.indices().delete(new DeleteIndexRequest().indices(tempIndexName), RequestOptions.DEFAULT);
Expand Down

0 comments on commit e7b9379

Please sign in to comment.