From 8059356b8384b0a637342dc04fc8a42561b06d8e Mon Sep 17 00:00:00 2001 From: Vincent Royer Date: Mon, 30 Oct 2017 09:02:20 +0100 Subject: [PATCH] Fix multi-node spark cluster inconsictency --- .../org/elasticsearch/hadoop/rest/RestService.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/mr/src/main/java/org/elasticsearch/hadoop/rest/RestService.java b/mr/src/main/java/org/elasticsearch/hadoop/rest/RestService.java index 418ad836..92317f56 100644 --- a/mr/src/main/java/org/elasticsearch/hadoop/rest/RestService.java +++ b/mr/src/main/java/org/elasticsearch/hadoop/rest/RestService.java @@ -418,7 +418,12 @@ public static PartitionReader createReader(Settings settings, PartitionDefinitio EsMajorVersion version = InitializationUtils.discoverEsVersion(settings, log); ValueReader reader = ObjectUtils.instantiate(settings.getSerializerValueReaderClassName(), settings); // initialize REST client - RestRepository repository = new RestRepository(settings); + //RestRepository repository = new RestRepository(settings); + // force connection to the node having the partition. + Settings nodeSettings = settings.copy().setNodes(partition.getLocations()[0]); + SettingsUtils.setDiscoveredNodes(nodeSettings, null); + RestRepository repository = new RestRepository(nodeSettings); + Field fieldMapping = null; if (StringUtils.hasText(partition.getSerializedMapping())) { fieldMapping = IOUtils.deserializeFromBase64(partition.getSerializedMapping()); @@ -450,7 +455,8 @@ public static PartitionReader createReader(Settings settings, PartitionDefinitio .limit(settings.getScrollLimit()) .fields(SettingsUtils.determineSourceFields(settings)) .filters(QueryUtils.parseFilters(settings)) - .shard(Integer.toString(partition.getShardId())) + //.shard(Integer.toString(partition.getShardId())) + .shard("0") .local(true) .excludeSource(settings.getExcludeSource()); if (partition.getSlice() != null && partition.getSlice().max > 1) {