From 7d7cb1d6652bf76ec215cf9ae7501a6e35a5923a Mon Sep 17 00:00:00 2001 From: Chris Trevino Date: Tue, 2 Apr 2024 07:23:48 -0700 Subject: [PATCH] streamline invocation pattern necessary for 'run_pipeline_with_config' --- python/graphrag/examples/custom_input/run.py | 2 +- .../graphrag/examples/custom_set_of_available_verbs/run.py | 2 +- .../examples/custom_set_of_available_workflows/run.py | 2 +- .../entity_extraction/with_graph_intelligence/run.py | 2 +- python/graphrag/examples/entity_extraction/with_nltk/run.py | 2 +- python/graphrag/examples/interdependent_workflows/run.py | 2 +- python/graphrag/examples/multiple_workflows/run.py | 2 +- python/graphrag/examples/single_verb/run.py | 2 +- python/graphrag/examples/use_built_in_workflows/run.py | 2 +- .../various_levels_of_configs/workflows_and_inputs.py | 2 +- .../workflows_and_inputs_with_custom_handlers.py | 2 +- .../examples/various_levels_of_configs/workflows_only.py | 2 +- python/graphrag/graphrag/index/cli.py | 2 +- python/graphrag/graphrag/index/run.py | 5 +++-- python/graphrag/tests/integration/_pipeline/test_run.py | 4 +--- 15 files changed, 17 insertions(+), 18 deletions(-) diff --git a/python/graphrag/examples/custom_input/run.py b/python/graphrag/examples/custom_input/run.py index 772a36b6b8..dd3bcde869 100644 --- a/python/graphrag/examples/custom_input/run.py +++ b/python/graphrag/examples/custom_input/run.py @@ -20,7 +20,7 @@ async def run(): # Grab the last result from the pipeline, should be our entity extraction outputs = [] - async for output in await run_pipeline_with_config( + async for output in run_pipeline_with_config( config_or_path=config, dataset=dataset ): outputs.append(output) diff --git a/python/graphrag/examples/custom_set_of_available_verbs/run.py b/python/graphrag/examples/custom_set_of_available_verbs/run.py index ad847a1ef2..952102764f 100644 --- a/python/graphrag/examples/custom_set_of_available_verbs/run.py +++ b/python/graphrag/examples/custom_set_of_available_verbs/run.py @@ -20,7 +20,7 @@ async def run_with_config(): ) outputs = [] - async for output in await run_pipeline_with_config( + async for output in run_pipeline_with_config( config_or_path=config_path, dataset=dataset ): outputs.append(output) diff --git a/python/graphrag/examples/custom_set_of_available_workflows/run.py b/python/graphrag/examples/custom_set_of_available_workflows/run.py index 17207cdd38..2a27df79c1 100644 --- a/python/graphrag/examples/custom_set_of_available_workflows/run.py +++ b/python/graphrag/examples/custom_set_of_available_workflows/run.py @@ -27,7 +27,7 @@ async def run_with_config(): # Grab the last result from the pipeline, should be our entity extraction tables = [] - async for table in await run_pipeline_with_config( + async for table in run_pipeline_with_config( config_or_path=config_path, dataset=dataset, additional_workflows=custom_workflows, diff --git a/python/graphrag/examples/entity_extraction/with_graph_intelligence/run.py b/python/graphrag/examples/entity_extraction/with_graph_intelligence/run.py index 94f57d6c81..631d52b4c8 100644 --- a/python/graphrag/examples/entity_extraction/with_graph_intelligence/run.py +++ b/python/graphrag/examples/entity_extraction/with_graph_intelligence/run.py @@ -37,7 +37,7 @@ async def run_with_config(): # Grab the last result from the pipeline, should be our entity extraction tables = [] - async for table in await run_pipeline_with_config( + async for table in run_pipeline_with_config( config_or_path=config_path, dataset=dataset ): tables.append(table) diff --git a/python/graphrag/examples/entity_extraction/with_nltk/run.py b/python/graphrag/examples/entity_extraction/with_nltk/run.py index e02acb5178..3e54381f9c 100644 --- a/python/graphrag/examples/entity_extraction/with_nltk/run.py +++ b/python/graphrag/examples/entity_extraction/with_nltk/run.py @@ -36,7 +36,7 @@ async def run_with_config(): # Grab the last result from the pipeline, should be our entity extraction tables = [] - async for table in await run_pipeline_with_config( + async for table in run_pipeline_with_config( config_or_path=config_path, dataset=dataset ): tables.append(table) diff --git a/python/graphrag/examples/interdependent_workflows/run.py b/python/graphrag/examples/interdependent_workflows/run.py index 603aa63738..167801f8d7 100644 --- a/python/graphrag/examples/interdependent_workflows/run.py +++ b/python/graphrag/examples/interdependent_workflows/run.py @@ -24,7 +24,7 @@ async def run_with_config(): ) tables = [] - async for table in await run_pipeline_with_config( + async for table in run_pipeline_with_config( config_or_path=config_path, dataset=dataset ): tables.append(table) diff --git a/python/graphrag/examples/multiple_workflows/run.py b/python/graphrag/examples/multiple_workflows/run.py index 5bc9639a02..5a2f8e5227 100644 --- a/python/graphrag/examples/multiple_workflows/run.py +++ b/python/graphrag/examples/multiple_workflows/run.py @@ -33,7 +33,7 @@ async def run_with_config(): os.path.dirname(os.path.abspath(__file__)), "./pipeline.yml" ) - async for result in await run_pipeline_with_config(pipeline_path, dataset=dataset): + async for result in run_pipeline_with_config(pipeline_path, dataset=dataset): print(f"Workflow {result.workflow} result\n: ") print(result.result) diff --git a/python/graphrag/examples/single_verb/run.py b/python/graphrag/examples/single_verb/run.py index 501c4de65d..12ec136567 100644 --- a/python/graphrag/examples/single_verb/run.py +++ b/python/graphrag/examples/single_verb/run.py @@ -19,7 +19,7 @@ async def run_with_config(): ) tables = [] - async for table in await run_pipeline_with_config( + async for table in run_pipeline_with_config( config_or_path=config_path, dataset=dataset ): tables.append(table) diff --git a/python/graphrag/examples/use_built_in_workflows/run.py b/python/graphrag/examples/use_built_in_workflows/run.py index a559ef7fcc..7a344033a1 100644 --- a/python/graphrag/examples/use_built_in_workflows/run.py +++ b/python/graphrag/examples/use_built_in_workflows/run.py @@ -38,7 +38,7 @@ async def run_with_config(): # Grab the last result from the pipeline, should be our entity extraction tables = [] - async for table in await run_pipeline_with_config( + async for table in run_pipeline_with_config( config_or_path=config_path, dataset=dataset ): tables.append(table) diff --git a/python/graphrag/examples/various_levels_of_configs/workflows_and_inputs.py b/python/graphrag/examples/various_levels_of_configs/workflows_and_inputs.py index f0cb4fba56..f68294b276 100644 --- a/python/graphrag/examples/various_levels_of_configs/workflows_and_inputs.py +++ b/python/graphrag/examples/various_levels_of_configs/workflows_and_inputs.py @@ -23,7 +23,7 @@ async def main(): # run the pipeline with the config, and override the dataset with the one we just created # and grab the last result from the pipeline, should be the last workflow that was run (our nodes) tables = [] - async for table in await run_pipeline_with_config(pipeline_path): + async for table in run_pipeline_with_config(pipeline_path): tables.append(table) pipeline_result = tables[-1] diff --git a/python/graphrag/examples/various_levels_of_configs/workflows_and_inputs_with_custom_handlers.py b/python/graphrag/examples/various_levels_of_configs/workflows_and_inputs_with_custom_handlers.py index 59d5b27435..971ab1ec17 100644 --- a/python/graphrag/examples/various_levels_of_configs/workflows_and_inputs_with_custom_handlers.py +++ b/python/graphrag/examples/various_levels_of_configs/workflows_and_inputs_with_custom_handlers.py @@ -37,7 +37,7 @@ async def main(): # run the pipeline with the config, and override the dataset with the one we just created # and grab the last result from the pipeline, should be the last workflow that was run (our nodes) pipeline_result = [] - async for result in await run_pipeline_with_config( + async for result in run_pipeline_with_config( pipeline_path, storage=custom_storage, callbacks=custom_reporter, diff --git a/python/graphrag/examples/various_levels_of_configs/workflows_only.py b/python/graphrag/examples/various_levels_of_configs/workflows_only.py index 91b8501a77..0da88f1861 100644 --- a/python/graphrag/examples/various_levels_of_configs/workflows_only.py +++ b/python/graphrag/examples/various_levels_of_configs/workflows_only.py @@ -40,7 +40,7 @@ async def main(): os.path.dirname(os.path.abspath(__file__)), "./pipelines/workflows_only.yml" ) tables = [] - async for table in await run_pipeline_with_config(pipeline_path, dataset=dataset): + async for table in run_pipeline_with_config(pipeline_path, dataset=dataset): tables.append(table) pipeline_result = tables[-1] diff --git a/python/graphrag/graphrag/index/cli.py b/python/graphrag/graphrag/index/cli.py index 6d85dc4644..d26b56105f 100644 --- a/python/graphrag/graphrag/index/cli.py +++ b/python/graphrag/graphrag/index/cli.py @@ -83,7 +83,7 @@ def handle_signal(signum, _): async def execute(): nonlocal encountered_errors - async for output in await run_pipeline_with_config( + async for output in run_pipeline_with_config( pipeline_config, debug=verbose, resume=resume, # type: ignore diff --git a/python/graphrag/graphrag/index/run.py b/python/graphrag/graphrag/index/run.py index ff3d059c5b..c69708782f 100644 --- a/python/graphrag/graphrag/index/run.py +++ b/python/graphrag/graphrag/index/run.py @@ -146,7 +146,7 @@ def _create_postprocess_steps( msg = "No dataset provided!" raise ValueError(msg) - return run_pipeline( + async for table in run_pipeline( workflows=workflows, dataset=dataset, storage=storage, @@ -158,7 +158,8 @@ def _create_postprocess_steps( additional_workflows=additional_workflows, progress_reporter=progress_reporter, emit=emit, - ) + ): + yield table async def run_pipeline( diff --git a/python/graphrag/tests/integration/_pipeline/test_run.py b/python/graphrag/tests/integration/_pipeline/test_run.py index e787feadb9..f0aed710c6 100644 --- a/python/graphrag/tests/integration/_pipeline/test_run.py +++ b/python/graphrag/tests/integration/_pipeline/test_run.py @@ -15,9 +15,7 @@ async def test_megapipeline(self): os.path.dirname(os.path.abspath(__file__)), "./megapipeline.yml", ) - pipeline_result = [ - gen async for gen in await run_pipeline_with_config(pipeline_path) - ] + pipeline_result = [gen async for gen in run_pipeline_with_config(pipeline_path)] errors = [] for result in pipeline_result: