Skip to content

Commit

Permalink
Fix multi-node spark cluster inconsictency
Browse files Browse the repository at this point in the history
  • Loading branch information
Vincent Royer committed Oct 30, 2017
1 parent 7460b4d commit 8059356
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions mr/src/main/java/org/elasticsearch/hadoop/rest/RestService.java
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,12 @@ public static PartitionReader createReader(Settings settings, PartitionDefinitio
EsMajorVersion version = InitializationUtils.discoverEsVersion(settings, log);
ValueReader reader = ObjectUtils.instantiate(settings.getSerializerValueReaderClassName(), settings);
// initialize REST client
RestRepository repository = new RestRepository(settings);
//RestRepository repository = new RestRepository(settings);
// force connection to the node having the partition.
Settings nodeSettings = settings.copy().setNodes(partition.getLocations()[0]);
SettingsUtils.setDiscoveredNodes(nodeSettings, null);
RestRepository repository = new RestRepository(nodeSettings);

Field fieldMapping = null;
if (StringUtils.hasText(partition.getSerializedMapping())) {
fieldMapping = IOUtils.deserializeFromBase64(partition.getSerializedMapping());
Expand Down Expand Up @@ -450,7 +455,8 @@ public static PartitionReader createReader(Settings settings, PartitionDefinitio
.limit(settings.getScrollLimit())
.fields(SettingsUtils.determineSourceFields(settings))
.filters(QueryUtils.parseFilters(settings))
.shard(Integer.toString(partition.getShardId()))
//.shard(Integer.toString(partition.getShardId()))
.shard("0")
.local(true)
.excludeSource(settings.getExcludeSource());
if (partition.getSlice() != null && partition.getSlice().max > 1) {
Expand Down

0 comments on commit 8059356

Please sign in to comment.