diff --git a/src/instructlab/sdg/pipeline.py b/src/instructlab/sdg/pipeline.py index fb54417f..3c911679 100644 --- a/src/instructlab/sdg/pipeline.py +++ b/src/instructlab/sdg/pipeline.py @@ -8,10 +8,12 @@ import logging import math import os.path +import requests # Third Party from datasets import Dataset, concatenate_datasets from openai import OpenAI +from openai import OpenAIError import yaml # First Party @@ -203,9 +205,22 @@ def generate(self, dataset, checkpoint_name=None) -> Dataset: output_splits.append(ds) # Store the successful result checkpointer.checkpoint(ds) # Save progress throttler.adjust_workers(success=True) # Increase workers on success - except Exception as err: - logger.error("Error in pipeline batch generation: %s", err) - throttler.adjust_workers(success=False) + + except PipelineBlockError as err: + root_exception = err.exception # Access the underlying exception + + if isinstance(root_exception, (requests.exceptions.RequestException, TimeoutError, OpenAIError)): + # Retryable errors + logger.warning("Retryable error in pipeline batch generation: %s", root_exception) + throttler.adjust_workers(success=False) + else: + # Non-retryable errors + logger.error("Non Retryable error in pipeline batch generation: %s", err) + throttler.adjust_workers(success=False) + except Exception as generic_err: + logger.error("Unexpected error in batch generation: %s", generic_err) + throttler.adjust_workers(success=False) # Adjust workers for unexpected errors + checkpointer.done() if pre_generated_data: