Skip to content

Commit

Permalink
Implement block prefiltering for Joins of a lazy result + `IndexSca…
Browse files Browse the repository at this point in the history
…n` (#1647)

With this PR, a join between a lazy intermediate result and an `IndexScan` prefilters the blocks of the `IndexScan` before reading them. Before this PR, this prefilter was only applied to joins between two `IndexScan`s and between an `IndexScan` and a fully materialized intermediate result.
  • Loading branch information
RobinTF authored Dec 3, 2024
1 parent 5c0d28a commit 44e2ba8
Show file tree
Hide file tree
Showing 9 changed files with 739 additions and 154 deletions.
206 changes: 202 additions & 4 deletions src/engine/IndexScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "engine/IndexScan.h"

#include <absl/container/inlined_vector.h>
#include <absl/strings/str_join.h>

#include <boost/optional.hpp>
Expand All @@ -14,6 +15,7 @@
#include "parser/ParsedQuery.h"

using std::string;
using LazyScanMetadata = CompressedRelationReader::LazyScanMetadata;

// _____________________________________________________________________________
// Return the number of `Variables` given the `TripleComponent` values for
Expand Down Expand Up @@ -442,15 +444,211 @@ Permutation::IdTableGenerator IndexScan::lazyScanForJoinOfColumnWithScan(
AD_CORRECTNESS_CHECK(numVariables_ <= 3 && numVariables_ > 0);
AD_CONTRACT_CHECK(joinColumn.empty() || !joinColumn[0].isUndefined());

auto metaBlocks1 = getMetadataForScan();
auto metaBlocks = getMetadataForScan();

if (!metaBlocks1.has_value()) {
if (!metaBlocks.has_value()) {
return {};
}
auto blocks = CompressedRelationReader::getBlocksForJoin(joinColumn,
metaBlocks1.value());
metaBlocks.value());

auto result = getLazyScan(blocks);
result.details().numBlocksAll_ = metaBlocks1.value().blockMetadata_.size();
result.details().numBlocksAll_ = metaBlocks.value().blockMetadata_.size();
return result;
}

// _____________________________________________________________________________
void IndexScan::updateRuntimeInfoForLazyScan(const LazyScanMetadata& metadata) {
updateRuntimeInformationWhenOptimizedOut(
RuntimeInformation::Status::lazilyMaterialized);
auto& rti = runtimeInfo();
rti.numRows_ = metadata.numElementsYielded_;
rti.totalTime_ = metadata.blockingTime_;
rti.addDetail("num-blocks-read", metadata.numBlocksRead_);
rti.addDetail("num-blocks-all", metadata.numBlocksAll_);
rti.addDetail("num-elements-read", metadata.numElementsRead_);

// Add more details, but only if the respective value is non-zero.
auto updateIfPositive = [&rti](const auto& value, const std::string& key) {
if (value > 0) {
rti.addDetail(key, value);
}
};
updateIfPositive(metadata.numBlocksSkippedBecauseOfGraph_,
"num-blocks-skipped-graph");
updateIfPositive(metadata.numBlocksPostprocessed_,
"num-blocks-postprocessed");
updateIfPositive(metadata.numBlocksWithUpdate_, "num-blocks-with-update");
}

// Store a Generator and its corresponding iterator as well as unconsumed values
// resulting from the generator.
struct IndexScan::SharedGeneratorState {
// The generator that yields the tables to be joined with the index scan.
Result::Generator generator_;
// The column index of the join column in the tables yielded by the generator.
ColumnIndex joinColumn_;
// Metadata and blocks of this index scan.
Permutation::MetadataAndBlocks metaBlocks_;
// The iterator of the generator that is currently being consumed.
std::optional<Result::Generator::iterator> iterator_ = std::nullopt;
// Values returned by the generator that have not been re-yielded yet.
// Typically we expect only 3 or less values to be prefetched (this is an
// implementation detail of `BlockZipperJoinImpl`).
using PrefetchStorage = absl::InlinedVector<Result::IdTableVocabPair, 3>;
PrefetchStorage prefetchedValues_{};
// Metadata of blocks that still need to be read.
std::vector<CompressedBlockMetadata> pendingBlocks_{};
// The index of the last matching block that was found using the join column.
std::optional<size_t> lastBlockIndex_ = std::nullopt;
// Indicates if the generator has yielded any undefined values.
bool hasUndef_ = false;
// Indicates if the generator has been fully consumed.
bool doneFetching_ = false;

// Advance the `iterator` to the next non-empty table. Set `hasUndef_` to true
// if the first table is undefined. Also set `doneFetching_` if the generator
// has been fully consumed.
void advanceInputToNextNonEmptyTable() {
bool firstStep = !iterator_.has_value();
if (iterator_.has_value()) {
++iterator_.value();
} else {
iterator_ = generator_.begin();
}
auto& iterator = iterator_.value();
while (iterator != generator_.end()) {
if (!iterator->idTable_.empty()) {
break;
}
++iterator;
}
doneFetching_ = iterator_ == generator_.end();
// Set the undef flag if the first table is undefined.
if (firstStep) {
hasUndef_ =
!doneFetching_ && iterator->idTable_.at(0, joinColumn_).isUndefined();
}
}

// Consume the next non-empty table from the generator and calculate the next
// matching blocks from the index scan.
void fetch() {
advanceInputToNextNonEmptyTable();
if (doneFetching_) {
return;
}
auto& idTable = iterator_.value()->idTable_;
auto joinColumn = idTable.getColumn(joinColumn_);
AD_EXPENSIVE_CHECK(std::ranges::is_sorted(joinColumn));
AD_CORRECTNESS_CHECK(!joinColumn.empty());
// Skip processing for undef case, it will be handled differently
if (hasUndef_) {
return;
}
AD_CORRECTNESS_CHECK(!joinColumn[0].isUndefined());
auto newBlocks =
CompressedRelationReader::getBlocksForJoin(joinColumn, metaBlocks_);
if (newBlocks.empty()) {
// The current input table matches no blocks, so we don't have to yield
// it.
return;
}
prefetchedValues_.push_back(std::move(*iterator_.value()));
// Find first value that differs from the last one that was used to find
// matching blocks.
auto startIterator =
lastBlockIndex_.has_value()
? std::ranges::upper_bound(newBlocks, lastBlockIndex_.value(), {},
&CompressedBlockMetadata::blockIndex_)
: newBlocks.begin();
lastBlockIndex_ = newBlocks.back().blockIndex_;
std::ranges::move(startIterator, newBlocks.end(),
std::back_inserter(pendingBlocks_));
}

// Check if there are any undefined values yielded by the original generator.
// If the generator hasn't been started to get consumed, this will start it.
bool hasUndef() {
if (!iterator_.has_value()) {
fetch();
}
return hasUndef_;
}
};

// _____________________________________________________________________________
Result::Generator IndexScan::createPrefilteredJoinSide(
std::shared_ptr<SharedGeneratorState> innerState) {
if (innerState->hasUndef()) {
AD_CORRECTNESS_CHECK(innerState->prefetchedValues_.empty());
for (auto& value : std::ranges::subrange{innerState->iterator_.value(),
innerState->generator_.end()}) {
co_yield value;
}
co_return;
}
auto& prefetchedValues = innerState->prefetchedValues_;
while (true) {
if (prefetchedValues.empty()) {
if (innerState->doneFetching_) {
co_return;
}
innerState->fetch();
AD_CORRECTNESS_CHECK(!prefetchedValues.empty() ||
innerState->doneFetching_);
}
// Make a defensive copy of the values to avoid modification during
// iteration when yielding.
auto copy = std::move(prefetchedValues);
// Moving out does not necessarily clear the values, so we do it explicitly.
prefetchedValues.clear();
for (auto& value : copy) {
co_yield value;
}
}
}

// _____________________________________________________________________________
Result::Generator IndexScan::createPrefilteredIndexScanSide(
std::shared_ptr<SharedGeneratorState> innerState) {
if (innerState->hasUndef()) {
for (auto& pair : chunkedIndexScan()) {
co_yield pair;
}
co_return;
}
LazyScanMetadata metadata;
auto& pendingBlocks = innerState->pendingBlocks_;
while (true) {
if (pendingBlocks.empty()) {
if (innerState->doneFetching_) {
metadata.numBlocksAll_ = innerState->metaBlocks_.blockMetadata_.size();
updateRuntimeInfoForLazyScan(metadata);
co_return;
}
innerState->fetch();
}
auto scan = getLazyScan(std::move(pendingBlocks));
AD_CORRECTNESS_CHECK(pendingBlocks.empty());
for (IdTable& idTable : scan) {
co_yield {std::move(idTable), LocalVocab{}};
}
metadata.aggregate(scan.details());
}
}

// _____________________________________________________________________________
std::pair<Result::Generator, Result::Generator> IndexScan::prefilterTables(
Result::Generator input, ColumnIndex joinColumn) {
AD_CORRECTNESS_CHECK(numVariables_ <= 3 && numVariables_ > 0);
auto metaBlocks = getMetadataForScan();

if (!metaBlocks.has_value()) {
return {Result::Generator{}, Result::Generator{}};
}
auto state = std::make_shared<SharedGeneratorState>(
std::move(input), joinColumn, std::move(metaBlocks.value()));
return {createPrefilteredJoinSide(state),
createPrefilteredIndexScanSide(state)};
}
30 changes: 30 additions & 0 deletions src/engine/IndexScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,32 @@ class IndexScan final : public Operation {
Permutation::IdTableGenerator lazyScanForJoinOfColumnWithScan(
std::span<const Id> joinColumn) const;

// Return two generators, the first of which yields exactly the elements of
// `input` and the second of which yields the matching blocks, skipping the
// blocks consisting only of rows that don't match the tables yielded by
// `input` to speed up join algorithms when no undef values are presend. When
// there are undef values, the second generator represents the full index
// scan.
std::pair<Result::Generator, Result::Generator> prefilterTables(
Result::Generator input, ColumnIndex joinColumn);

private:
// Implementation detail that allows to consume a generator from two other
// cooperating generators. Needs to be forward declared as it is used by
// several member functions below.
struct SharedGeneratorState;

// Helper function that creates a generator that re-yields the generator
// wrapped by `innerState`.
static Result::Generator createPrefilteredJoinSide(
std::shared_ptr<SharedGeneratorState> innerState);

// Helper function that creates a generator yielding prefiltered rows of this
// index scan according to the block metadata, that match the tables yielded
// by the generator wrapped by `innerState`.
Result::Generator createPrefilteredIndexScanSide(
std::shared_ptr<SharedGeneratorState> innerState);

// TODO<joka921> Make the `getSizeEstimateBeforeLimit()` function `const` for
// ALL the `Operations`.
uint64_t getSizeEstimateBeforeLimit() override { return sizeEstimate_; }
Expand Down Expand Up @@ -145,6 +170,11 @@ class IndexScan final : public Operation {
ScanSpecification getScanSpecification() const;
ScanSpecificationAsTripleComponent getScanSpecificationTc() const;

// Set the runtime info of the `scanTree` when it was lazily executed during a
// join.
void updateRuntimeInfoForLazyScan(
const CompressedRelationReader::LazyScanMetadata& metadata);

private:
ProtoResult computeResult(bool requestLaziness) override;

Expand Down
Loading

0 comments on commit 44e2ba8

Please sign in to comment.