Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ATHENA / OMOP dump import #59

Open
3 of 6 tasks
tiadams opened this issue Nov 28, 2024 · 6 comments
Open
3 of 6 tasks

ATHENA / OMOP dump import #59

tiadams opened this issue Nov 28, 2024 · 6 comments
Assignees

Comments

@tiadams
Copy link
Member

tiadams commented Nov 28, 2024

  • Check in which format they provide concept data
  • Research & test how weaviate bulk import works
  • Reseach & implement parallelization of embedding calculation (should be possible to enable GPU usage for this)
  • write a (performant) transformation pipeline that transforms the input data into a (JSON?) format that can be bulk imported into weaviate
  • Add an bulk-import function to the Weaviate Repository
  • get data from @MarcZimmermann and test for a subset
@mehmetcanay
Copy link
Member

Each ontology comes as a combination of 9 CSV files:

  1. Concept
  2. Concept Ancestor
  3. Concept Class
  4. Concept Relationship
  5. Concept Synonym
  6. Domain
  7. Drug Strength
  8. Relationship
  9. Vocabulary

@mehmetcanay
Copy link
Member

From ChatGPT on parallelization:

import concurrent.futures

class GPT4Adapter(EmbeddingModel):

    def get_embeddings(self, messages: [str], max_length=2048, num_workers=4):
        sanitized_messages = [self.sanitize(message) for message in messages]
        
        # Function to process a single chunk
        def process_chunk(chunk):
            try:
                response = openai.Embedding.create(input=chunk, model=self.model_name)
                return [item["embedding"] for item in response["data"]]
            except Exception as e:
                logging.error(f"Error processing chunk: {e}")
                return [None] * len(chunk)
        
        # Split messages into chunks
        chunks = [sanitized_messages[i:i + max_length] for i in range(0, len(sanitized_messages), max_length)]
        
        embeddings = []
        with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
            futures = {executor.submit(process_chunk, chunk): chunk for chunk in chunks}
            for future in concurrent.futures.as_completed(futures):
                chunk_result = future.result()
                embeddings.extend(chunk_result)
        
        return embeddings

@mehmetcanay
Copy link
Member

This is for enabling GPU and parallelization for SentenceTransformer models:

import torch
from sentence_transformers import SentenceTransformer

class MPNetAdapter(EmbeddingModel):
    def __init__(self, model_name="sentence-transformers/all-mpnet-base-v2"):
        logging.getLogger().setLevel(logging.INFO)
        self.model = SentenceTransformer(model_name).to('cuda')  # Use GPU
        self.model_name = model_name

    def get_embedding(self, text: str):
        logging.info(f"Getting embedding for {text}")
        try:
            if text is None or text == "" or text is np.nan:
                logging.warning(f"Empty text passed to get_embedding")
                return None
            if isinstance(text, str):
                text = text.replace("\n", " ")
                text = self.sanitize(text)
            return self.model.encode(text, device='cuda')  # Explicitly use GPU
        except Exception as e:
            logging.error(f"Error getting embedding for {text}: {e}")
            return None

    def get_embeddings(self, messages: [str], batch_size=64, num_threads=4) -> [[float]]:
        sanitized_messages = [self.sanitize(message) for message in messages]
        try:
            # Enable parallel processing for SentenceTransformer
            torch.set_num_threads(num_threads)

            # Encode in batches to improve GPU utilization
            embeddings = self.model.encode(sanitized_messages, batch_size=batch_size, device='cuda', show_progress_bar=True)
        except Exception as e:
            logging.error(f"Failed for messages {sanitized_messages}")
            raise e

        # Flatten to list of lists and ensure compatibility with JSON serialization
        flattened_embeddings = [[float(element) for element in row] for row in embeddings]
        return flattened_embeddings

@mehmetcanay
Copy link
Member

For batch/bulk import the data to be imported must be in JSON format that follows our Weaviate schema.

@MarcZimmermann
Copy link
Member

we have it in a nice csv (all terminologies have already been parsed and harmonized) - shouldn"t be to complicated

/home/bio/groupshare/OHDSI

concept_id      concept_name    domain_id       vocabulary_id   concept_class_id        standard_concept        concept_code    valid_start_date        valid_end_date  invalid_reason
45756805        Pediatric Cardiology    Provider        ABMS    Physician Specialty     S       OMOP4821938     19700101        20991231        
45756804        Pediatric Anesthesiology        Provider        ABMS    Physician Specialty     S       OMOP4821939     19700101        20991231        
45756803        Pathology-Anatomic / Pathology-Clinical Provider        ABMS    Physician Specialty     S       OMOP4821940     19700101        20991231

@MarcZimmermann
Copy link
Member

we even do have synonyms and relationships ;-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants