Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sdk): support collections of params/artifacts for component I/O. Addresses #10840 #11219

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

zazulam
Copy link
Contributor

@zazulam zazulam commented Sep 17, 2024

Description of your changes:
This PR is intended to resolve #10840.

In v1, users were able to dynamically collect and store variables of component outputs and pass references to those pipeline parameter types at compilation time to downstream components. This allowed for a dynamic pipeline generation, however with the release of v2 this capability was not available with the new compiler.

The major piece was that elements in lists & dicts did not have their types checked, so the compiler inferred their types were python primitives and attempted to pull the corresponding protobuf value.

The approach for the solution was to add separate conditions for lists and dicts and check the types on their inputs separate from the primitive catch all here

elif isinstance(input_value, (str, int, float, bool, dict, list)):
and pull out the logic for checking the different types as a function to be reused.

Here is a sample showcasing this new capability:

import kfp
from kfp.dsl import component, pipeline

@component(base_image="python:3.9",)
def create_dataset_paths(name:str, input_dfs:dict={})->dict:
    
    print(f"{name}")

    if input_dfs:
        
        print(input_dfs.items())
    
    dataset_paths = {
        'wine': 's3://my-bucket/datasets/wine_dataset.csv',
        'iris': 's3://my-bucket/datasets/iris_dataset.csv',
        'cancer': 's3://my-bucket/datasets/cancer_dataset.csv'
    }

    return dataset_paths

@component(base_image="python:3.9",)
def process_datasets(name:str, dataset_artifact: dict):
    
    for name, path in dataset_artifact.items():
        print(f"Looking at {name} dataset at S3 path: {path}")

@pipeline(name="dynamic-pipeline-example")
def dynamic_pipeline():
    fruits = {
       'apple': ['banana', 'orange'],
      'banana': ['orange'],
     'orange': [],
   }
    sorted_fruits = dict(sorted(fruits.items(), key=lambda item: len(item[1])))
    output_pool = {}
    for fruit, children in sorted_fruits.items():
        if children:
            current_task = create_dataset_paths(name=fruit, input_dfs={child:output_pool[child] for child in children})#.set_display_name(f"{fruit}-task")
        else:
            current_task = create_dataset_paths(name=fruit)#.set_display_name(f"{fruit}-task")
        current_task.set_caching_options(False)
        output_pool[fruit] = current_task.output
        process_datasets(name=fruit, dataset_artifact=current_task.output).set_caching_options(False)#.set_display_name(f"{fruit}-process")

endpoint = 'http://localhost:80'
kfp_client = kfp.client.Client(host=endpoint)
run = kfp_client.create_run_from_pipeline_func(
    dynamic_pipeline,
    arguments={},
)

image

Associated IR

components:
  comp-create-dataset-paths:
    executorLabel: exec-create-dataset-paths
    inputDefinitions:
      parameters:
        input_dfs:
          defaultValue: {}
          isOptional: true
          parameterType: STRUCT
        name:
          parameterType: STRING
    outputDefinitions:
      parameters:
        Output:
          parameterType: STRUCT
  comp-create-dataset-paths-2:
    executorLabel: exec-create-dataset-paths-2
    inputDefinitions:
      parameters:
        input_dfs:
          defaultValue: {}
          isOptional: true
          parameterType: STRUCT
        name:
          parameterType: STRING
    outputDefinitions:
      parameters:
        Output:
          parameterType: STRUCT
  comp-create-dataset-paths-3:
    executorLabel: exec-create-dataset-paths-3
    inputDefinitions:
      parameters:
        input_dfs:
          defaultValue: {}
          isOptional: true
          parameterType: STRUCT
        name:
          parameterType: STRING
    outputDefinitions:
      parameters:
        Output:
          parameterType: STRUCT
  comp-process-datasets:
    executorLabel: exec-process-datasets
    inputDefinitions:
      parameters:
        dataset_artifact:
          parameterType: STRUCT
        name:
          parameterType: STRING
  comp-process-datasets-2:
    executorLabel: exec-process-datasets-2
    inputDefinitions:
      parameters:
        dataset_artifact:
          parameterType: STRUCT
        name:
          parameterType: STRING
  comp-process-datasets-3:
    executorLabel: exec-process-datasets-3
    inputDefinitions:
      parameters:
        dataset_artifact:
          parameterType: STRUCT
        name:
          parameterType: STRING
deploymentSpec:
  executors:
    exec-create-dataset-paths:
      container:
        args:
          - '--executor_input'
          - '{{$}}'
          - '--function_to_execute'
          - create_dataset_paths
        command:
          - sh
          - '-c'
          - >

            if ! [ -x "$(command -v pip)" ]; then
                python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
            fi


            PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet
            --no-warn-script-location 'kfp==2.9.0' '--no-deps'
            'typing-extensions>=3.7.4,<5; python_version<"3.9"' && "$0" "$@"
          - sh
          - '-ec'
          - >
            program_path=$(mktemp -d)


            printf "%s" "$0" > "$program_path/ephemeral_component.py"

            _KFP_RUNTIME=true python3 -m
            kfp.dsl.executor_main                        
            --component_module_path                        
            "$program_path/ephemeral_component.py"                         "$@"
          - |+

            import kfp
            from kfp import dsl
            from kfp.dsl import *
            from typing import *

            def create_dataset_paths(name:str, input_dfs:dict={})->dict:

                if input_dfs:

                    print(input_dfs.items())

                dataset_paths = {
                    'wine': 's3://my-bucket/datasets/wine_dataset.csv',
                    'iris': 's3://my-bucket/datasets/iris_dataset.csv',
                    'cancer': 's3://my-bucket/datasets/cancer_dataset.csv'
                }

                return dataset_paths

        image: 'python:3.9'
    exec-create-dataset-paths-2:
      container:
        args:
          - '--executor_input'
          - '{{$}}'
          - '--function_to_execute'
          - create_dataset_paths
        command:
          - sh
          - '-c'
          - >

            if ! [ -x "$(command -v pip)" ]; then
                python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
            fi


            PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet
            --no-warn-script-location 'kfp==2.9.0' '--no-deps'
            'typing-extensions>=3.7.4,<5; python_version<"3.9"' && "$0" "$@"
          - sh
          - '-ec'
          - >
            program_path=$(mktemp -d)


            printf "%s" "$0" > "$program_path/ephemeral_component.py"

            _KFP_RUNTIME=true python3 -m
            kfp.dsl.executor_main                        
            --component_module_path                        
            "$program_path/ephemeral_component.py"                         "$@"
          - |+

            import kfp
            from kfp import dsl
            from kfp.dsl import *
            from typing import *

            def create_dataset_paths(name:str, input_dfs:dict={})->dict:

                if input_dfs:

                    print(input_dfs.items())

                dataset_paths = {
                    'wine': 's3://my-bucket/datasets/wine_dataset.csv',
                    'iris': 's3://my-bucket/datasets/iris_dataset.csv',
                    'cancer': 's3://my-bucket/datasets/cancer_dataset.csv'
                }

                return dataset_paths

        image: 'python:3.9'
    exec-create-dataset-paths-3:
      container:
        args:
          - '--executor_input'
          - '{{$}}'
          - '--function_to_execute'
          - create_dataset_paths
        command:
          - sh
          - '-c'
          - >

            if ! [ -x "$(command -v pip)" ]; then
                python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
            fi


            PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet
            --no-warn-script-location 'kfp==2.9.0' '--no-deps'
            'typing-extensions>=3.7.4,<5; python_version<"3.9"' && "$0" "$@"
          - sh
          - '-ec'
          - >
            program_path=$(mktemp -d)


            printf "%s" "$0" > "$program_path/ephemeral_component.py"

            _KFP_RUNTIME=true python3 -m
            kfp.dsl.executor_main                        
            --component_module_path                        
            "$program_path/ephemeral_component.py"                         "$@"
          - |+

            import kfp
            from kfp import dsl
            from kfp.dsl import *
            from typing import *

            def create_dataset_paths(name:str, input_dfs:dict={})->dict:

                if input_dfs:

                    print(input_dfs.items())

                dataset_paths = {
                    'wine': 's3://my-bucket/datasets/wine_dataset.csv',
                    'iris': 's3://my-bucket/datasets/iris_dataset.csv',
                    'cancer': 's3://my-bucket/datasets/cancer_dataset.csv'
                }

                return dataset_paths

        image: 'python:3.9'
    exec-process-datasets:
      container:
        args:
          - '--executor_input'
          - '{{$}}'
          - '--function_to_execute'
          - process_datasets
        command:
          - sh
          - '-c'
          - >

            if ! [ -x "$(command -v pip)" ]; then
                python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
            fi


            PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet
            --no-warn-script-location 'kfp==2.9.0' '--no-deps'
            'typing-extensions>=3.7.4,<5; python_version<"3.9"' && "$0" "$@"
          - sh
          - '-ec'
          - >
            program_path=$(mktemp -d)


            printf "%s" "$0" > "$program_path/ephemeral_component.py"

            _KFP_RUNTIME=true python3 -m
            kfp.dsl.executor_main                        
            --component_module_path                        
            "$program_path/ephemeral_component.py"                         "$@"
          - |+

            import kfp
            from kfp import dsl
            from kfp.dsl import *
            from typing import *

            def process_datasets(name:str, dataset_artifact: dict):

                for name, path in dataset_artifact.items():
                    print(f"Looking at {name} dataset at S3 path: {path}")

        image: 'python:3.9'
    exec-process-datasets-2:
      container:
        args:
          - '--executor_input'
          - '{{$}}'
          - '--function_to_execute'
          - process_datasets
        command:
          - sh
          - '-c'
          - >

            if ! [ -x "$(command -v pip)" ]; then
                python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
            fi


            PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet
            --no-warn-script-location 'kfp==2.9.0' '--no-deps'
            'typing-extensions>=3.7.4,<5; python_version<"3.9"' && "$0" "$@"
          - sh
          - '-ec'
          - >
            program_path=$(mktemp -d)


            printf "%s" "$0" > "$program_path/ephemeral_component.py"

            _KFP_RUNTIME=true python3 -m
            kfp.dsl.executor_main                        
            --component_module_path                        
            "$program_path/ephemeral_component.py"                         "$@"
          - |+

            import kfp
            from kfp import dsl
            from kfp.dsl import *
            from typing import *

            def process_datasets(name:str, dataset_artifact: dict):

                for name, path in dataset_artifact.items():
                    print(f"Looking at {name} dataset at S3 path: {path}")

        image: 'python:3.9'
    exec-process-datasets-3:
      container:
        args:
          - '--executor_input'
          - '{{$}}'
          - '--function_to_execute'
          - process_datasets
        command:
          - sh
          - '-c'
          - >

            if ! [ -x "$(command -v pip)" ]; then
                python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip
            fi


            PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet
            --no-warn-script-location 'kfp==2.9.0' '--no-deps'
            'typing-extensions>=3.7.4,<5; python_version<"3.9"' && "$0" "$@"
          - sh
          - '-ec'
          - >
            program_path=$(mktemp -d)


            printf "%s" "$0" > "$program_path/ephemeral_component.py"

            _KFP_RUNTIME=true python3 -m
            kfp.dsl.executor_main                        
            --component_module_path                        
            "$program_path/ephemeral_component.py"                         "$@"
          - |+

            import kfp
            from kfp import dsl
            from kfp.dsl import *
            from typing import *

            def process_datasets(name:str, dataset_artifact: dict):

                for name, path in dataset_artifact.items():
                    print(f"Looking at {name} dataset at S3 path: {path}")

        image: 'python:3.9'
pipelineInfo:
  name: dynamic-pipeline-example
root:
  dag:
    tasks:
      create-dataset-paths:
        cachingOptions: {}
        componentRef:
          name: comp-create-dataset-paths
        inputs:
          parameters:
            name:
              runtimeValue:
                constant: orange
        taskInfo:
          name: create-dataset-paths
      create-dataset-paths-2:
        cachingOptions: {}
        componentRef:
          name: comp-create-dataset-paths-2
        dependentTasks:
          - create-dataset-paths
        inputs:
          parameters:
            input_dfs:
              taskOutputParameter:
                outputParameterKey: Output
                producerTask: create-dataset-paths
            name:
              runtimeValue:
                constant: banana
        taskInfo:
          name: create-dataset-paths-2
      create-dataset-paths-3:
        cachingOptions: {}
        componentRef:
          name: comp-create-dataset-paths-3
        dependentTasks:
          - create-dataset-paths
          - create-dataset-paths-2
        inputs:
          parameters:
            input_dfs:
              taskOutputParameter:
                outputParameterKey: Output
                producerTask: create-dataset-paths
            name:
              runtimeValue:
                constant: apple
        taskInfo:
          name: create-dataset-paths-3
      process-datasets:
        cachingOptions: {}
        componentRef:
          name: comp-process-datasets
        dependentTasks:
          - create-dataset-paths
        inputs:
          parameters:
            dataset_artifact:
              taskOutputParameter:
                outputParameterKey: Output
                producerTask: create-dataset-paths
            name:
              runtimeValue:
                constant: orange
        taskInfo:
          name: process-datasets
      process-datasets-2:
        cachingOptions: {}
        componentRef:
          name: comp-process-datasets-2
        dependentTasks:
          - create-dataset-paths-2
        inputs:
          parameters:
            dataset_artifact:
              taskOutputParameter:
                outputParameterKey: Output
                producerTask: create-dataset-paths-2
            name:
              runtimeValue:
                constant: banana
        taskInfo:
          name: process-datasets-2
      process-datasets-3:
        cachingOptions: {}
        componentRef:
          name: comp-process-datasets-3
        dependentTasks:
          - create-dataset-paths-3
        inputs:
          parameters:
            dataset_artifact:
              taskOutputParameter:
                outputParameterKey: Output
                producerTask: create-dataset-paths-3
            name:
              runtimeValue:
                constant: apple
        taskInfo:
          name: process-datasets-3
schemaVersion: 2.1.0
sdkVersion: kfp-2.9.0

Checklist:

Copy link

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign chensun for approval. For more information see the Kubernetes Code Review Process.

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

Copy link

Hi @zazulam. Thanks for your PR.

I'm waiting for a kubeflow member to verify that this patch is reasonable to test. If it is, they should reply with /ok-to-test on its own line. Until that is done, I will not automatically test new commits in this PR, but the usual testing commands by org members will still work. Regular contributors should join the org to skip this step.

Once the patch is verified, the new status will be reflected by the ok-to-test label.

I understand the commands that are listed here.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@zazulam
Copy link
Contributor Author

zazulam commented Sep 19, 2024

Currently, there is an issue when attempting to pull a task's output and resolve it during the downstream tasks when set_display_name is used, hence the commented out part from the sample in the PR. This seems to have been the case prior to my testing, but I'll confirm. I began looking into the backend on the issue, however it may be out of scope for this PR.

@droctothorpe
Copy link
Contributor

/ok-to-test

@rimolive
Copy link
Member

/rerun-all

@zazulam zazulam changed the title [WIP] feat(sdk): support collections of params/artifacts for component I/O. Addresses #10840 feat(sdk): support collections of params/artifacts for component I/O. Addresses #10840 Oct 8, 2024
@HumairAK HumairAK added this to the KFP 2.4.0 milestone Oct 8, 2024
Copy link
Member

@chensun chensun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have a minor comment for code, but I think we should get aligned on the feature first. I have a few questions, I'll comment in the issue.

@@ -82,6 +82,220 @@ def to_protobuf_value(value: type_utils.PARAMETER_TYPES) -> struct_pb2.Value:
f'"{value}" of type "{type(value)}".')


def check_task_input_types(input_value, input_name, pipeline_task_spec, task,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By exacting the code into a function, it's hard to tell what are the actual diffs. Can you please edit in place without creating this method?

@hbelmiro
Copy link
Contributor

@zazulam can you give us the status of this PR? Is there anything blocking it?
I'm asking because it's planned for KFP 2.4.

Also, can you please link the PR to its issue?

@zazulam
Copy link
Contributor Author

zazulam commented Nov 25, 2024

@hbelmiro I should have more bandwidth within then next sprint to wrap this up. Currently, I have to update the pipelinespec.proto and have the api image reference a local build of the package on the image so I can attempt to test my changes without needing to have the spec changes pushed in a separate PR.

@hbelmiro
Copy link
Contributor

hbelmiro commented Jan 6, 2025

@zazulam any updates on this?

@zazulam
Copy link
Contributor Author

zazulam commented Jan 6, 2025

@hbelmiro Right before the new year I was able to test the appropriate changes to the spec in order to support a dictionary of pipelineparameter channels. The final piece is just adding some test cases for it. I'll make sure to push up my changes this week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[feature] Parse Iterables for PipelineParameterChannels
6 participants