Skip to content

Commit

Permalink
Migration notebook (#1492)
Browse files Browse the repository at this point in the history
* Add migration notebook

* Update migration instructions

* Semver

* Rename item in relationships table

* Remove indexing vector store shim

* Remove query shims

* Remove columns from migrated data

* Format

* Add community parents
  • Loading branch information
natoverse authored Dec 10, 2024
1 parent 1a13e0f commit 61816e0
Show file tree
Hide file tree
Showing 21 changed files with 310 additions and 324 deletions.
4 changes: 4 additions & 0 deletions .semversioner/next-release/patch-20241209225934573225.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "patch",
"description": "Add migration notebook."
}
39 changes: 6 additions & 33 deletions docs/examples_notebooks/drift_search.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,6 @@
" read_indexer_reports,\n",
" read_indexer_text_units,\n",
")\n",
"from graphrag.query.input.loaders.dfs import (\n",
" store_entity_semantic_embeddings,\n",
")\n",
"from graphrag.query.llm.oai.chat_openai import ChatOpenAI\n",
"from graphrag.query.llm.oai.embedding import OpenAIEmbedding\n",
"from graphrag.query.llm.oai.typing import OpenaiApiType\n",
Expand Down Expand Up @@ -207,9 +204,6 @@
" collection_name=\"default-entity-description\",\n",
")\n",
"description_embedding_store.connect(db_uri=LANCEDB_URI)\n",
"entity_description_embeddings = store_entity_semantic_embeddings(\n",
" entities=entities, vectorstore=description_embedding_store\n",
")\n",
"\n",
"print(f\"Entity count: {len(entity_df)}\")\n",
"entity_df.head()\n",
Expand Down Expand Up @@ -270,37 +264,16 @@
}
],
"source": [
"def embed_community_reports(\n",
"def read_community_reports(\n",
" input_dir: str,\n",
" embedder: OpenAIEmbedding,\n",
" community_report_table: str = COMMUNITY_REPORT_TABLE,\n",
"):\n",
" \"\"\"Embeds the full content of the community reports and saves the DataFrame with embeddings to the output path.\"\"\"\n",
" input_path = Path(input_dir) / f\"{community_report_table}.parquet\"\n",
" output_path = Path(input_dir) / f\"{community_report_table}_with_embeddings.parquet\"\n",
"\n",
" if not Path(output_path).exists():\n",
" print(\"Embedding file not found. Computing community report embeddings...\")\n",
"\n",
" report_df = pd.read_parquet(input_path)\n",
"\n",
" if \"full_content\" not in report_df.columns:\n",
" error_msg = f\"'full_content' column not found in {input_path}\"\n",
" raise ValueError(error_msg)\n",
"\n",
" report_df[\"full_content_embeddings\"] = report_df.loc[:, \"full_content\"].apply(\n",
" lambda x: embedder.embed(x)\n",
" )\n",
"\n",
" # Save the DataFrame with embeddings to the output path\n",
" report_df.to_parquet(output_path)\n",
" print(f\"Embeddings saved to {output_path}\")\n",
" return report_df\n",
" print(f\"Embeddings file already exists at {output_path}\")\n",
" return pd.read_parquet(output_path)\n",
" return pd.read_parquet(input_path)\n",
"\n",
"\n",
"report_df = embed_community_reports(INPUT_DIR, text_embedder)\n",
"report_df = read_community_reports(INPUT_DIR)\n",
"reports = read_indexer_reports(\n",
" report_df,\n",
" entity_df,\n",
Expand All @@ -321,7 +294,7 @@
" entities=entities,\n",
" relationships=relationships,\n",
" reports=reports,\n",
" entity_text_embeddings=entity_description_embeddings,\n",
" entity_text_embeddings=description_embedding_store,\n",
" text_units=text_units,\n",
")\n",
"\n",
Expand Down Expand Up @@ -3172,7 +3145,7 @@
],
"metadata": {
"kernelspec": {
"display_name": "graphrag-ta_-cxM1-py3.10",
"display_name": ".venv",
"language": "python",
"name": "python3"
},
Expand All @@ -3186,7 +3159,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
"version": "3.11.9"
}
},
"nbformat": 4,
Expand Down
263 changes: 263 additions & 0 deletions docs/examples_notebooks/index_migration.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"# Copyright (c) 2024 Microsoft Corporation.\n",
"# Licensed under the MIT License."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Index Migration\n",
"\n",
"This notebook is used to maintain data model parity with older indexes for the latest versions of GraphRAG. If you have a pre-1.0 index and need to migrate without re-running the entire pipeline, you can use this notebook to only update the pieces necessary for alignment.\n",
"\n",
"NOTE: we recommend regenerating your settings.yml with the latest version of GraphRAG using `graphrag init`. Copy your LLM settings into it before running this notebook. This ensures your config is aligned with the latest version for the migration. This also ensures that you have default vector store config, which is now required or indexing will fail.\n",
"\n",
"WARNING: This will overwrite your parquet files, you may want to make a backup!"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"# This is the directory that has your settings.yml\n",
"# NOTE: much older indexes may have been output with a timestamped directory\n",
"# if this is the case, you will need to make sure the storage.base_dir in settings.yml points to it correctly\n",
"PROJECT_DIRECTORY = \"<your project directory>\""
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"from pathlib import Path\n",
"\n",
"from graphrag.config.load_config import load_config\n",
"from graphrag.config.resolve_path import resolve_paths\n",
"from graphrag.index.create_pipeline_config import create_pipeline_config\n",
"from graphrag.storage.factory import create_storage\n",
"\n",
"# This first block does some config loading, path resolution, and translation that is normally done by the CLI/API when running a full workflow\n",
"config = load_config(Path(PROJECT_DIRECTORY))\n",
"resolve_paths(config)\n",
"pipeline_config = create_pipeline_config(config)\n",
"storage = create_storage(pipeline_config.storage)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"def remove_columns(df, columns):\n",
" \"\"\"Remove columns from a DataFrame, suppressing errors.\"\"\"\n",
" df.drop(labels=columns, axis=1, errors=\"ignore\", inplace=True)"
]
},
{
"cell_type": "code",
"execution_count": 63,
"metadata": {},
"outputs": [],
"source": [
"def get_community_parent(nodes):\n",
" \"\"\"Compute the parent community using the node membership as a lookup.\"\"\"\n",
" parent_mapping = nodes.loc[:, [\"level\", \"community\", \"title\"]]\n",
" nodes = nodes.loc[:, [\"level\", \"community\", \"title\"]]\n",
"\n",
" # Create a parent mapping by adding 1 to the level column\n",
" parent_mapping[\"level\"] += 1 # Shift levels for parent relationship\n",
" parent_mapping.rename(columns={\"community\": \"parent\"}, inplace=True)\n",
"\n",
" # Merge the parent information back into the base DataFrame\n",
" nodes = nodes.merge(parent_mapping, on=[\"level\", \"title\"], how=\"left\")\n",
"\n",
" # Fill missing parents with -1 (default value)\n",
" nodes[\"parent\"] = nodes[\"parent\"].fillna(-1).astype(int)\n",
"\n",
" join = (\n",
" nodes.groupby([\"community\", \"level\", \"parent\"])\n",
" .agg({\"title\": list})\n",
" .reset_index()\n",
" )\n",
" return join[join[\"community\"] > -1].loc[:, [\"community\", \"parent\"]]"
]
},
{
"cell_type": "code",
"execution_count": 64,
"metadata": {},
"outputs": [],
"source": [
"from uuid import uuid4\n",
"\n",
"from graphrag.utils.storage import load_table_from_storage, write_table_to_storage\n",
"\n",
"# First we'll go through any parquet files that had model changes and update them\n",
"# The new data model may have removed excess columns as well, but we will only make the minimal changes required for compatibility\n",
"\n",
"final_documents = await load_table_from_storage(\n",
" \"create_final_documents.parquet\", storage\n",
")\n",
"final_text_units = await load_table_from_storage(\n",
" \"create_final_text_units.parquet\", storage\n",
")\n",
"final_entities = await load_table_from_storage(\"create_final_entities.parquet\", storage)\n",
"final_nodes = await load_table_from_storage(\"create_final_nodes.parquet\", storage)\n",
"final_relationships = await load_table_from_storage(\n",
" \"create_final_relationships.parquet\", storage\n",
")\n",
"final_communities = await load_table_from_storage(\n",
" \"create_final_communities.parquet\", storage\n",
")\n",
"final_community_reports = await load_table_from_storage(\n",
" \"create_final_community_reports.parquet\", storage\n",
")\n",
"\n",
"\n",
"# Documents renames raw_content for consistency\n",
"if \"raw_content\" in final_documents.columns:\n",
" final_documents.rename(columns={\"raw_content\": \"text\"}, inplace=True)\n",
"final_documents[\"human_readable_id\"] = final_documents.index + 1\n",
"\n",
"# Text units just get a human_readable_id or consistency\n",
"final_text_units[\"human_readable_id\"] = final_text_units.index + 1\n",
"\n",
"# We renamed \"name\" to \"title\" for consistency with the rest of the tables\n",
"if \"name\" in final_entities.columns:\n",
" final_entities.rename(columns={\"name\": \"title\"}, inplace=True)\n",
"remove_columns(\n",
" final_entities, [\"mname_embedding\", \"graph_embedding\", \"description_embedding\"]\n",
")\n",
"\n",
"# Final nodes uses community for joins, which is now an int everywhere\n",
"final_nodes[\"community\"] = final_nodes[\"community\"].fillna(-1)\n",
"final_nodes[\"community\"] = final_nodes[\"community\"].astype(int)\n",
"remove_columns(\n",
" final_nodes,\n",
" [\n",
" \"type\",\n",
" \"description\",\n",
" \"source_id\",\n",
" \"graph_embedding\",\n",
" \"entity_type\",\n",
" \"top_level_node_id\",\n",
" \"size\",\n",
" ],\n",
")\n",
"\n",
"# Relationships renames \"rank\" to \"combined_degree\" to be clear what the default ranking is\n",
"if \"rank\" in final_relationships.columns:\n",
" final_relationships.rename(columns={\"rank\": \"combined_degree\"}, inplace=True)\n",
"\n",
"\n",
"# Compute the parents for each community, to add to communities and reports\n",
"parent_df = get_community_parent(final_nodes)\n",
"\n",
"# Communities previously used the \"id\" field for the Leiden id, but we've moved this to the community field and use a uuid for id like the others\n",
"if \"community\" not in final_communities.columns:\n",
" final_communities[\"community\"] = final_communities[\"id\"].astype(int)\n",
" final_communities[\"human_readable_id\"] = final_communities[\"community\"]\n",
" final_communities[\"id\"] = [str(uuid4()) for _ in range(len(final_communities))]\n",
"if \"parent\" not in final_communities.columns:\n",
" final_communities = final_communities.merge(parent_df, on=\"community\", how=\"left\")\n",
"remove_columns(final_communities, [\"raw_community\"])\n",
"\n",
"# We need int for community and the human_readable_id copy for consistency\n",
"final_community_reports[\"community\"] = final_community_reports[\"community\"].astype(int)\n",
"final_community_reports[\"human_readable_id\"] = final_community_reports[\"community\"]\n",
"if \"parent\" not in final_community_reports.columns:\n",
" final_community_reports = final_community_reports.merge(\n",
" parent_df, on=\"community\", how=\"left\"\n",
" )\n",
"\n",
"await write_table_to_storage(final_documents, \"create_final_documents.parquet\", storage)\n",
"await write_table_to_storage(\n",
" final_text_units, \"create_final_text_units.parquet\", storage\n",
")\n",
"await write_table_to_storage(final_entities, \"create_final_entities.parquet\", storage)\n",
"await write_table_to_storage(final_nodes, \"create_final_nodes.parquet\", storage)\n",
"await write_table_to_storage(\n",
" final_relationships, \"create_final_relationships.parquet\", storage\n",
")\n",
"await write_table_to_storage(\n",
" final_communities, \"create_final_communities.parquet\", storage\n",
")\n",
"await write_table_to_storage(\n",
" final_community_reports, \"create_final_community_reports.parquet\", storage\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"from datashaper import NoopVerbCallbacks\n",
"\n",
"from graphrag.cache.factory import create_cache\n",
"from graphrag.index.flows.generate_text_embeddings import generate_text_embeddings\n",
"\n",
"# We only need to re-run the embeddings workflow, to ensure that embeddings for all required search fields are in place\n",
"# We'll construct the context and run this function flow directly to avoid everything else\n",
"\n",
"workflow = next(\n",
" (x for x in pipeline_config.workflows if x.name == \"generate_text_embeddings\"), None\n",
")\n",
"config = workflow.config\n",
"text_embed = config.get(\"text_embed\", {})\n",
"embedded_fields = config.get(\"embedded_fields\", {})\n",
"callbacks = NoopVerbCallbacks()\n",
"cache = create_cache(pipeline_config.cache, PROJECT_DIRECTORY)\n",
"\n",
"await generate_text_embeddings(\n",
" final_documents=None,\n",
" final_relationships=None,\n",
" final_text_units=final_text_units,\n",
" final_entities=final_entities,\n",
" final_community_reports=final_community_reports,\n",
" callbacks=callbacks,\n",
" cache=cache,\n",
" storage=storage,\n",
" text_embed_config=text_embed,\n",
" embedded_fields=embedded_fields,\n",
" snapshot_embeddings_enabled=False,\n",
")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.9"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
6 changes: 0 additions & 6 deletions docs/examples_notebooks/local_search.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@
" read_indexer_reports,\n",
" read_indexer_text_units,\n",
")\n",
"from graphrag.query.input.loaders.dfs import (\n",
" store_entity_semantic_embeddings,\n",
")\n",
"from graphrag.query.llm.oai.chat_openai import ChatOpenAI\n",
"from graphrag.query.llm.oai.embedding import OpenAIEmbedding\n",
"from graphrag.query.llm.oai.typing import OpenaiApiType\n",
Expand Down Expand Up @@ -287,9 +284,6 @@
" collection_name=\"default-entity-description\",\n",
")\n",
"description_embedding_store.connect(db_uri=LANCEDB_URI)\n",
"entity_description_embeddings = store_entity_semantic_embeddings(\n",
" entities=entities, vectorstore=description_embedding_store\n",
")\n",
"\n",
"print(f\"Entity count: {len(entity_df)}\")\n",
"entity_df.head()"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@
" read_indexer_reports,\n",
" read_indexer_text_units,\n",
")\n",
"from graphrag.query.input.loaders.dfs import (\n",
" store_entity_semantic_embeddings,\n",
")\n",
"from graphrag.query.llm.oai.chat_openai import ChatOpenAI\n",
"from graphrag.query.llm.oai.embedding import OpenAIEmbedding\n",
"from graphrag.query.llm.oai.typing import OpenaiApiType\n",
Expand Down Expand Up @@ -302,9 +299,6 @@
" collection_name=\"default-entity-description\",\n",
")\n",
"description_embedding_store.connect(db_uri=LANCEDB_URI)\n",
"entity_description_embeddings = store_entity_semantic_embeddings(\n",
" entities=entities, vectorstore=description_embedding_store\n",
")\n",
"covariate_df = pd.read_parquet(f\"{INPUT_DIR}/{COVARIATE_TABLE}.parquet\")\n",
"claims = read_indexer_covariates(covariate_df)\n",
"covariates = {\"claims\": claims}\n",
Expand Down
Loading

0 comments on commit 61816e0

Please sign in to comment.