Skip to main content
Dagster와 W&B를 사용해 MLOps 파이프라인을 오케스트레이션하고 ML 자산을 관리하세요. W&B 인테그레이션을 사용하면 Dagster 내에서 다음 작업을 쉽게 수행할 수 있습니다.
  • W&B 아티팩트를 생성하고 사용합니다.
  • W&B Registry에서 Registered Models를 사용하고 생성합니다.
  • W&B Launch를 사용해 전용 컴퓨트에서 트레이닝 작업을 실행합니다.
  • ops와 assets에서 wandb 클라이언트를 사용합니다.
W&B Dagster 인테그레이션은 W&B 전용 Dagster 리소스와 IO Manager를 제공합니다.
  • wandb_resource: W&B API 인증 및 통신에 사용하는 Dagster 리소스입니다.
  • wandb_artifacts_io_manager: W&B Artifacts를 사용하기 위한 Dagster IO Manager입니다.
다음 가이드에서는 Dagster에서 W&B를 사용하기 위한 사전 요구 사항을 충족하는 방법, ops와 assets에서 W&B Artifacts를 생성하고 사용하는 방법, W&B Launch를 사용하는 방법, 그리고 권장 모범 사례를 설명합니다.

시작하기 전에

W&B에서 Dagster를 사용하려면 다음 리소스가 필요합니다:
  1. W&B API 키.
  2. W&B entity(사용자 또는 팀): entity는 W&B Runs 및 Artifacts를 보내는 사용자 이름 또는 팀 이름입니다. run을 로그하기 전에 W&B App UI에서 계정 또는 팀 entity를 만들어 두세요. entity를 지정하지 않으면 run은 기본 entity로 전송되며, 일반적으로 사용자 이름이 기본값입니다. 기본 entity는 Settings의 Project Defaults에서 변경하세요.
  3. W&B 프로젝트: W&B Runs가 저장되는 프로젝트 이름입니다.
W&B App에서 해당 사용자 또는 팀의 profile 페이지를 확인하면 W&B entity를 찾을 수 있습니다. 기존 W&B 프로젝트를 사용하거나 새로 만들 수 있습니다. 새 프로젝트는 W&B App 홈페이지 또는 사용자/팀 profile 페이지에서 만들 수 있습니다. 프로젝트가 없으면 처음 사용할 때 자동으로 생성됩니다.

API 키 설정

  1. W&B에 로그인하세요. 참고: W&B Server를 사용하는 경우 관리자에게 인스턴스 호스트 이름을 문의하세요.
  2. User Settings에서 API 키를 생성하세요. 프로덕션 환경에서는 해당 키의 소유 주체로 service account를 사용하는 것을 권장합니다.
  3. 해당 API 키에 대한 환경 변수를 설정하세요: export WANDB_API_KEY=YOUR_KEY.
다음 예제에서는 Dagster 코드에서 API 키를 어디에 지정해야 하는지 보여줍니다. wandb_config 중첩 딕셔너리 안에 entity와 프로젝트 이름을 지정해야 합니다. 다른 W&B 프로젝트를 사용하려는 경우, 각 ops/assets에 서로 다른 wandb_config 값을 전달할 수 있습니다. 전달할 수 있는 키에 대한 자세한 내용은 아래의 설정 섹션을 참조하세요.
예제: @job용 설정
# config.yaml에 이것을 추가하세요
# 또는 Dagit의 Launchpad나 JobDefinition.execute_in_process에서 설정할 수 있습니다
# 레퍼런스: https://docs.dagster.io/concepts/configuration/config-schema#specifying-runtime-configuration
resources:
 wandb_config:
   config:
     entity: my_entity # 여기에 W&B entity를 입력하세요
     project: my_project # 여기에 W&B 프로젝트를 입력하세요


@job(
   resource_defs={
       "wandb_config": make_values_resource(
           entity=str,
           project=str,
       ),
       "wandb_resource": wandb_resource.configured(
           {"api_key": {"env": "WANDB_API_KEY"}}
       ),
       "io_manager": wandb_artifacts_io_manager,
   }
)
def simple_job_example():
   my_op()

설정

다음 설정 옵션은 이 인테그레이션에서 제공하는 W&B 전용 Dagster 리소스 및 IO Manager의 설정으로 사용됩니다.
  • wandb_resource: W&B API와 통신하는 데 사용되는 Dagster 리소스입니다. 제공된 API 키를 사용해 자동으로 인증합니다. 속성:
    • api_key: (str, required): W&B API와 통신하는 데 필요한 W&B API 키입니다.
    • host: (str, optional): 사용할 API 호스트 서버입니다. W&B Server를 사용하는 경우에만 필요합니다. 기본값은 Public Cloud 호스트인 https://api.wandb.ai입니다.
  • wandb_artifacts_io_manager: W&B Artifacts를 사용하기 위한 Dagster IO Manager입니다. 속성:
    • base_dir: (int, optional) 로컬 저장소 및 캐싱에 사용되는 기본 디렉터리입니다. W&B Artifacts와 W&B Run 로그는 해당 디렉터리에 기록되고, 해당 디렉터리에서 읽어 옵니다. 기본적으로 DAGSTER_HOME 디렉터리를 사용합니다.
    • cache_duration_in_minutes: (int, optional) W&B Artifacts와 W&B Run 로그를 로컬 저장소에 유지할 시간을 정의합니다. 해당 시간 동안 열리지 않은 파일과 디렉터리만 캐시에서 제거됩니다. 캐시 정리는 IO Manager 실행이 끝날 때 수행됩니다. 캐싱을 완전히 끄려면 0으로 설정할 수 있습니다. 캐싱은 동일한 머신에서 실행되는 작업 간에 아티팩트를 재사용할 때 속도를 높여 줍니다. 기본값은 30일입니다.
    • run_id: (str, optional): 재개에 사용되는 이 run의 고유 ID입니다. 프로젝트 내에서 고유해야 하며, run을 삭제한 후에는 해당 ID를 다시 사용할 수 없습니다. 짧고 설명적인 이름에는 name 필드를 사용하고, runs 간 비교를 위한 하이퍼파라미터 저장에는 config를 사용하세요. ID에는 다음 특수 문자를 포함할 수 없습니다: /\#?%:.. Dagster 내에서 experiment tracking을 수행하는 경우, IO Manager가 run을 재개할 수 있도록 Run ID를 설정해야 합니다. 기본적으로는 Dagster Run ID(예: 7e4df022-1bf2-44b5-a383-bb852df4077e)로 설정됩니다.
    • run_name: (str, optional) UI에서 이 run을 쉽게 파악할 수 있도록 하는 짧은 표시 이름입니다. 기본적으로 dagster-run-[8 first characters of the Dagster Run ID] 형식의 문자열이 사용됩니다. 예: dagster-run-7e4df022.
    • run_tags: (list[str], optional): UI에서 이 run의 태그 목록을 채우는 문자열 목록입니다. 태그는 runs를 함께 구성하거나 baseline 또는 production 같은 임시 레이블을 적용하는 데 유용합니다. UI에서 태그를 쉽게 추가하거나 제거할 수 있으며, 특정 태그가 있는 run만 필터링할 수도 있습니다. 이 인테그레이션에서 사용하는 모든 W&B Run에는 dagster_wandb 태그가 포함됩니다.

W&B Artifacts 사용하기

W&B Artifact와의 인테그레이션은 Dagster IO Manager를 기반으로 합니다. IO Managers는 사용자가 제공하는 객체로, asset 또는 op의 출력을 저장하고 이를 다운스트림 asset 또는 op의 입력으로 로드하는 역할을 합니다. 예를 들어, IO Manager는 파일 시스템의 파일에 객체를 저장하고 해당 파일에서 다시 로드할 수 있습니다. 이 인테그레이션은 W&B Artifacts용 IO Manager를 제공합니다. 이를 통해 모든 Dagster @op 또는 @asset이 W&B Artifacts를 네이티브하게 생성하고 사용할 수 있습니다. 다음은 Python 목록이 포함된 dataset 유형의 W&B Artifact를 생성하는 @asset의 간단한 예입니다.
@asset(
    name="my_artifact",
    metadata={
        "wandb_artifact_arguments": {
            "type": "dataset",
        }
    },
    io_manager_key="wandb_artifacts_manager",
)
def create_dataset():
    return [1, 2, 3] # 이 값은 아티팩트에 저장됩니다
@op, @asset, @multi_asset에 메타데이터 설정을 지정해 Artifacts를 기록할 수 있습니다. 또한 Dagster 외부에서 생성된 W&B Artifacts도 사용할 수 있습니다.

W&B Artifacts 작성

계속하기 전에 W&B Artifacts 사용 방법을 충분히 이해하시는 것을 권장합니다. Artifacts 가이드를 읽어보세요. Python 함수에서 객체를 반환하면 W&B Artifact를 작성할 수 있습니다. W&B는 다음 객체를 지원합니다.
  • Python 객체 (int, dict, list 등)
  • W&B 객체 (Table, Image, Graph 등)
  • W&B Artifact 객체
다음 예제에서는 Dagster asset(@asset)을 사용해 W&B Artifacts를 작성하는 방법을 보여줍니다.
pickle 모듈로 직렬화할 수 있는 모든 항목은 pickle로 직렬화되어 인테그레이션이 생성한 Artifact에 추가됩니다. Dagster 내에서 해당 Artifact를 읽으면 내용이 역직렬화됩니다(자세한 내용은 아티팩트 읽기를 참고하세요).
@asset(
    name="my_artifact",
    metadata={
        "wandb_artifact_arguments": {
            "type": "dataset",
        }
    },
    io_manager_key="wandb_artifacts_manager",
)
def create_dataset():
    return [1, 2, 3]
W&B는 여러 Pickle 기반 직렬화 모듈(pickle, dill, cloudpickle, joblib)을 지원합니다. ONNX 또는 PMML과 같은 고급 직렬화 방식도 사용할 수 있습니다. 자세한 내용은 직렬화 섹션을 참고하세요.

설정

wandb_artifact_configuration라는 설정 딕셔너리는 @op, @asset, @multi_asset에 지정할 수 있습니다. 이 딕셔너리는 metadata로 데코레이터 인수에 전달해야 합니다. 이 설정은 W&B Artifacts에 대한 IO Manager의 읽기 및 쓰기를 제어하는 데 필요합니다. @op의 경우 Out metadata 인수를 통해 출력 metadata에 지정합니다. @asset의 경우 asset의 metadata 인수에 지정합니다. @multi_asset의 경우 AssetOut metadata 인수를 통해 각 출력 metadata에 지정합니다. 다음 코드 예제는 @op, @asset, @multi_asset 계산에 딕셔너리를 설정하는 방법을 보여줍니다.
@op 예제:
@op(
   out=Out(
       metadata={
           "wandb_artifact_configuration": {
               "name": "my_artifact",
               "type": "dataset",
           }
       }
   )
)
def create_dataset():
   return [1, 2, 3]
지원되는 속성:
  • name: (str) 이 아티팩트의 사람이 알아보기 쉬운 이름으로, UI에서 이 아티팩트를 파악하거나 use_artifact 호출에서 참조할 때 사용합니다. 이름에는 문자, 숫자, 밑줄, 하이픈, 점을 포함할 수 있습니다. 이름은 프로젝트 전체에서 고유해야 합니다. @op에는 필수입니다.
  • type: (str) 아티팩트의 유형으로, 아티팩트를 정리하고 구분하는 데 사용됩니다. 일반적인 유형에는 데이터셋이나 모델이 포함되지만, 문자, 숫자, 밑줄, 하이픈, 점을 포함하는 임의의 문자열을 사용할 수 있습니다. 출력이 이미 아티팩트가 아닌 경우 필수입니다.
  • description: (str) 아티팩트에 대한 설명을 제공하는 자유 형식 텍스트입니다. 설명은 UI에서 Markdown으로 렌더링되므로 표, 링크 등을 넣기에 좋습니다.
  • aliases: (list[str]) 아티팩트에 적용할 하나 이상의 별칭이 들어 있는 배열입니다. 인테그레이션은 설정 여부와 관계없이 해당 목록에 “latest” 태그도 추가합니다. 이는 모델과 데이터셋의 버전 관리를 위한 효과적인 방법입니다.
  • add_dirs: (list[dict[str, Any]]): 아티팩트에 포함할 각 로컬 디렉터리에 대한 설정이 들어 있는 배열입니다.
  • add_files: (list[dict[str, Any]]): 아티팩트에 포함할 각 로컬 파일에 대한 설정이 들어 있는 배열입니다.
  • add_references: (list[dict[str, Any]]): 아티팩트에 포함할 각 외부 참조에 대한 설정이 들어 있는 배열입니다.
  • serialization_module: (dict) 사용할 직렬화 모듈의 설정입니다. 자세한 내용은 Serialization 섹션을 참고하세요.
    • name: (str) 직렬화 모듈의 이름입니다. 허용되는 값: pickle, dill, cloudpickle, joblib. 이 모듈은 로컬에서 사용할 수 있어야 합니다.
    • parameters: (dict[str, Any]) 직렬화 함수에 전달되는 선택 인수입니다. 이 모듈의 dump 메서드와 동일한 매개변수를 받습니다. 예를 들어 {"compress": 3, "protocol": 4}입니다.
고급 예시:
@asset(
   name="my_advanced_artifact",
   metadata={
       "wandb_artifact_configuration": {
           "type": "dataset",
           "description": "My *Markdown* description",
           "aliases": ["my_first_alias", "my_second_alias"],
           "add_dirs": [
               {
                   "name": "My directory",
                   "local_path": "path/to/directory",
               }
           ],
           "add_files": [
               {
                   "name": "validation_dataset",
                   "local_path": "path/to/data.json",
               },
               {
                   "is_tmp": True,
                   "local_path": "path/to/temp",
               },
           ],
           "add_references": [
               {
                   "uri": "https://picsum.photos/200/300",
                   "name": "External HTTP reference to an image",
               },
               {
                   "uri": "s3://my-bucket/datasets/mnist",
                   "name": "External S3 reference",
               },
           ],
       }
   },
   io_manager_key="wandb_artifacts_manager",
)
def create_advanced_artifact():
   return [1, 2, 3]
이 asset은 인테그레이션의 양쪽 모두에서 유용한 메타데이터와 함께 구체화됩니다.
  • W&B 측: 소스 인테그레이션 이름과 버전, 사용된 Python 버전, pickle 프로토콜 버전 등
  • Dagster 측:
    • Dagster Run ID
    • W&B Run: ID, 이름, 경로, URL
    • W&B Artifact: ID, 이름, 유형, 버전, 크기, URL
    • W&B Entity
    • W&B Project
다음 이미지는 Dagster asset에 추가된 W&B 메타데이터를 보여줍니다. 이 정보는 인테그레이션을 통해 Dagster로 전파됩니다.
W&B 프로젝트 및 run 참조를 포함한 W&B 메타데이터가 연결된 asset 상세 뷰가 표시된 Dagster UI
다음 이미지는 제공된 설정이 W&B Artifact에서 유용한 메타데이터로 어떻게 보강되었는지 보여줍니다. 이 정보는 재현성과 유지 관리에 도움이 됩니다. 인테그레이션이 없으면 이 정보는 사용할 수 없습니다.
Dagster에서 보강된 설정 메타데이터가 표시된 W&B Artifact 페이지
Dagster의 추가 설정 세부정보가 표시된 W&B Artifact 메타데이터 패널
Dagster에서 추가로 보강한 설정 메타데이터 필드가 포함된 W&B Artifact 뷰
mypy와 같은 정적 유형 검사기를 사용하는 경우, 다음과 같이 설정 유형 정의 객체를 임포트하세요.
from dagster_wandb import WandbArtifactConfiguration

파티션 사용하기

이 인테그레이션은 Dagster 파티션을 기본적으로 지원합니다. 다음은 DailyPartitionsDefinition을 사용한 파티션 예시입니다.
@asset(
    partitions_def=DailyPartitionsDefinition(start_date="2023-01-01", end_date="2023-02-01"),
    name="my_daily_partitioned_asset",
    compute_kind="wandb",
    metadata={
        "wandb_artifact_configuration": {
            "type": "dataset",
        }
    },
)
def create_my_daily_partitioned_asset(context):
    partition_key = context.asset_partition_key_for_output()
    context.log.info(f"파티션된 에셋 생성 중: {partition_key}")
    return random.randint(0, 100)
이 코드는 각 파티션에 대해 W&B 아티팩트 하나를 생성합니다. 파티션 키가 덧붙은 asset 이름 아래의 Artifact 패널(UI)에서 아티팩트를 확인하세요. 예를 들어 my_daily_partitioned_asset.2023-01-01, my_daily_partitioned_asset.2023-01-02, 또는 my_daily_partitioned_asset.2023-01-03입니다. 여러 차원으로 파티셔닝된 asset은 각 차원을 점(.)으로 구분된 형식으로 표시합니다. 예를 들어 my_asset.car.blue입니다.
이 인테그레이션에서는 하나의 run 안에서 여러 파티션을 머티리얼라이즈할 수 없습니다. asset을 머티리얼라이즈하려면 여러 run을 수행해야 합니다. asset을 머티리얼라이즈할 때는 Dagit에서 이를 실행할 수 있습니다.
파티셔닝된 asset에 대해 여러 run이 표시된 Dagster UI, 각 파티션은 별도의 run으로 표시됨

고급 사용법

W&B Artifacts 조회

W&B Artifacts를 조회하는 방법은 이를 쓰는 것과 비슷합니다. wandb_artifact_configuration라는 설정 딕셔너리를 @op 또는 @asset에 설정할 수 있습니다. 차이점은 설정을 출력이 아니라 입력에 지정해야 한다는 점뿐입니다. @op의 경우, In metadata 인수를 통해 입력 메타데이터에 설정합니다. 아티팩트 이름을 명시적으로 전달해야 합니다. @asset의 경우, Asset In metadata 인수를 통해 입력 메타데이터에 설정합니다. 상위 asset의 이름과 일치해야 하므로 아티팩트 이름은 전달하지 않아야 합니다. 인테그레이션 외부에서 생성된 아티팩트에 종속되도록 하려면 SourceAsset을 사용해야 합니다. 그러면 항상 해당 asset의 최신 버전을 조회합니다. 다음 예제는 다양한 op에서 아티팩트를 조회하는 방법을 보여줍니다.
@op에서 아티팩트 조회
@op(
   ins={
       "artifact": In(
           metadata={
               "wandb_artifact_configuration": {
                   "name": "my_artifact",
               }
           }
       )
   },
   io_manager_key="wandb_artifacts_manager"
)
def read_artifact(context, artifact):
   context.log.info(artifact)

설정

앞서 설명한 설정은 IO Manager가 데코레이터가 적용된 함수의 입력으로 무엇을 수집하고 제공할지 지정하는 데 사용됩니다. 다음 조회 패턴을 지원합니다.
  1. 아티팩트에 포함된 이름이 지정된 객체를 가져오려면 get을 사용하세요:
@asset(
   ins={
       "table": AssetIn(
           key="my_artifact_with_table",
           metadata={
               "wandb_artifact_configuration": {
                   "get": "my_table",
               }
           },
           input_manager_key="wandb_artifacts_manager",
       )
   }
)
def get_table(context, table):
   context.log.info(table.get_column("a"))
  1. 아티팩트에 포함된 다운로드된 파일의 로컬 경로를 조회하려면 get_path를 사용하세요:
@asset(
   ins={
       "path": AssetIn(
           key="my_artifact_with_file",
           metadata={
               "wandb_artifact_configuration": {
                   "get_path": "name_of_file",
               }
           },
           input_manager_key="wandb_artifacts_manager",
       )
   }
)
def get_path(context, path):
   context.log.info(path)
  1. 전체 아티팩트 객체를 가져오려면(콘텐츠가 로컬에 다운로드된 상태로):
@asset(
   ins={
       "artifact": AssetIn(
           key="my_artifact",
           input_manager_key="wandb_artifacts_manager",
       )
   },
)
def get_artifact(context, artifact):
   context.log.info(artifact.name)
지원되는 속성
  • get: (str) 아티팩트의 상대 이름에 해당하는 W&B 객체를 조회합니다.
  • get_path: (str) 아티팩트의 상대 이름에 해당하는 파일 경로를 조회합니다.

직렬화 설정

기본적으로 이 인테그레이션은 표준 pickle 모듈을 사용하지만, 일부 객체는 이 모듈과 호환되지 않습니다. 예를 들어 yield를 사용하는 함수는 pickle하려고 하면 오류가 발생합니다. dill, cloudpickle, joblib 같은 추가적인 Pickle 기반 직렬화 모듈도 지원합니다. 직렬화된 문자열을 반환하거나 아티팩트를 직접 생성해 ONNX 또는 PMML과 같은 더 고급 직렬화 방식을 사용할 수도 있습니다. 어떤 방식을 선택할지는 사용 사례에 따라 다르므로, 이 주제에 대해서는 관련 문헌을 참고하세요.

Pickle 기반 직렬화 모듈

Pickle은 보안에 취약한 것으로 알려져 있습니다. 보안이 중요하다면 W&B 객체만 사용하세요. 데이터에 서명하고 해시 키는 자체 시스템에 저장하는 것을 권장합니다. 더 복잡한 사용 사례가 있다면 언제든지 문의해 주세요. 기꺼이 도와드리겠습니다.
wandb_artifact_configurationserialization_module 딕셔너리에서 사용할 직렬화 방식을 설정할 수 있습니다. Dagster를 실행하는 머신에서 해당 모듈을 사용할 수 있는지 확인하세요. 인테그레이션은 해당 아티팩트를 조회할 때 어떤 직렬화 모듈을 사용해야 하는지 자동으로 인식합니다. 현재 지원되는 모듈은 pickle, dill, cloudpickle, joblib입니다. 다음은 joblib으로 직렬화된 “모델”을 생성한 다음 이를 추론에 사용하는 간단한 예입니다.
@asset(
    name="my_joblib_serialized_model",
    compute_kind="Python",
    metadata={
        "wandb_artifact_configuration": {
            "type": "model",
            "serialization_module": {
                "name": "joblib"
            },
        }
    },
    io_manager_key="wandb_artifacts_manager",
)
def create_model_serialized_with_joblib():
    # 실제 ML 모델은 아니지만 pickle 모듈로는 이 작업이 불가능합니다
    return lambda x, y: x + y

@asset(
    name="inference_result_from_joblib_serialized_model",
    compute_kind="Python",
    ins={
        "my_joblib_serialized_model": AssetIn(
            input_manager_key="wandb_artifacts_manager",
        )
    },
    metadata={
        "wandb_artifact_configuration": {
            "type": "results",
        }
    },
    io_manager_key="wandb_artifacts_manager",
)
def use_model_serialized_with_joblib(
    context: OpExecutionContext, my_joblib_serialized_model
):
    inference_result = my_joblib_serialized_model(1, 2)
    context.log.info(inference_result)  # 출력: 3
    return inference_result

고급 직렬화 형식 (ONNX, PMML)

ONNX 및 PMML과 같은 교환 파일 형식을 사용하는 경우가 많습니다. 이 인테그레이션은 이러한 형식을 지원하지만, Pickle 기반 직렬화보다 약간 더 많은 작업이 필요합니다. 이러한 형식을 사용하는 방법은 두 가지입니다.
  1. 모델을 선택한 형식으로 변환한 다음, 일반 Python 객체인 것처럼 해당 형식의 문자열 표현을 반환하세요. 인테그레이션이 해당 문자열을 pickle로 직렬화합니다. 그런 다음 그 문자열을 사용해 모델을 다시 복원할 수 있습니다.
  2. 직렬화된 모델이 포함된 새 로컬 파일을 만든 다음, add_file 설정을 사용해 해당 파일로 맞춤형 아티팩트를 구축하세요.
다음은 Scikit-learn 모델을 ONNX로 직렬화하는 예입니다.
import numpy
import onnxruntime as rt
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
from sklearn.datasets import load_iris
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

from dagster import AssetIn, AssetOut, asset, multi_asset

@multi_asset(
    compute_kind="Python",
    outs={
        "my_onnx_model": AssetOut(
            metadata={
                "wandb_artifact_configuration": {
                    "type": "model",
                }
            },
            io_manager_key="wandb_artifacts_manager",
        ),
        "my_test_set": AssetOut(
            metadata={
                "wandb_artifact_configuration": {
                    "type": "test_set",
                }
            },
            io_manager_key="wandb_artifacts_manager",
        ),
    },
    group_name="onnx_example",
)
def create_onnx_model():
    # Inspired from https://onnx.ai/sklearn-onnx/

    # Train a model.
    iris = load_iris()
    X, y = iris.data, iris.target
    X_train, X_test, y_train, y_test = train_test_split(X, y)
    clr = RandomForestClassifier()
    clr.fit(X_train, y_train)

    # Convert into ONNX format
    initial_type = [("float_input", FloatTensorType([None, 4]))]
    onx = convert_sklearn(clr, initial_types=initial_type)

    # Write artifacts (model + test_set)
    return onx.SerializeToString(), {"X_test": X_test, "y_test": y_test}

@asset(
    name="experiment_results",
    compute_kind="Python",
    ins={
        "my_onnx_model": AssetIn(
            input_manager_key="wandb_artifacts_manager",
        ),
        "my_test_set": AssetIn(
            input_manager_key="wandb_artifacts_manager",
        ),
    },
    group_name="onnx_example",
)
def use_onnx_model(context, my_onnx_model, my_test_set):
    # https://onnx.ai/sklearn-onnx/ 참고

    # ONNX Runtime으로 예측값 계산
    sess = rt.InferenceSession(my_onnx_model)
    input_name = sess.get_inputs()[0].name
    label_name = sess.get_outputs()[0].name
    pred_onx = sess.run(
        [label_name], {input_name: my_test_set["X_test"].astype(numpy.float32)}
    )[0]
    context.log.info(pred_onx)
    return pred_onx

파티션 사용

이 인테그레이션은 Dagster partitions를 기본적으로 지원합니다. 아티팩트의 특정 파티션 하나, 여러 파티션 또는 모든 파티션을 선택적으로 조회할 수 있습니다. 모든 파티션은 딕셔너리 형태로 제공되며, 여기서 키와 값은 각각 파티션 키와 아티팩트 콘텐츠를 나타냅니다.
업스트림 @asset의 모든 파티션을 딕셔너리 형태로 조회합니다. 이 딕셔너리에서 키와 값은 각각 파티션 키와 아티팩트 콘텐츠에 해당합니다.
@asset(
    compute_kind="wandb",
    ins={"my_daily_partitioned_asset": AssetIn()},
    output_required=False,
)
def read_all_partitions(context, my_daily_partitioned_asset):
    for partition, content in my_daily_partitioned_asset.items():
        context.log.info(f"partition={partition}, content={content}")
설정 객체인 metadata는 프로젝트에서 W&B가 서로 다른 아티팩트 파티션과 상호작용하는 방식을 정의합니다. metadata 객체에는 wandb_artifact_configuration이라는 키가 있으며, 그 안에 partitions라는 중첩 객체가 포함됩니다. partitions 객체는 각 파티션의 이름을 해당 설정에 매핑합니다. 각 파티션의 설정에서는 해당 파티션에서 데이터를 조회하는 방법을 지정할 수 있습니다. 이러한 설정에는 각 파티션의 요구 사항에 따라 get, version, alias와 같은 서로 다른 키가 포함될 수 있습니다. 설정 키
  1. get: get 키는 데이터를 가져올 W&B 객체(Table, Image…)의 이름을 지정합니다.
  2. version: version 키는 아티팩트의 특정 버전을 가져오려는 경우 사용합니다.
  3. alias: alias 키를 사용하면 별칭으로 아티팩트를 가져올 수 있습니다.
와일드카드 설정 와일드카드 "*"는 설정되지 않은 모든 파티션을 의미합니다. 즉, partitions 객체에 명시적으로 언급되지 않은 파티션에 대한 기본 설정을 제공합니다. 예를 들면,
"*": {
    "get": "default_table_name",
},
이 설정은 명시적으로 설정되지 않은 모든 파티션의 데이터를 default_table_name이라는 이름의 테이블에서 가져온다는 의미입니다. 특정 파티션 설정 특정 파티션의 키를 사용해 해당 파티션별 설정을 지정하면 와일드카드 설정을 재정의할 수 있습니다. 예를 들면 다음과 같습니다.
"yellow": {
    "get": "custom_table_name",
},
이 설정은 yellow라는 이름의 파티션에 대해, 와일드카드 설정을 재정의하고 custom_table_name라는 이름의 테이블에서 데이터를 가져오도록 한다는 의미입니다. 버전 관리 및 별칭 지정 버전 관리 및 별칭 지정을 위해 설정에서 특정 versionalias 키를 지정할 수 있습니다. 버전의 경우,
"orange": {
    "version": "v0",
},
이 설정은 orange 아티팩트 파티션의 v0 버전에서 데이터를 불러옵니다. 별칭의 경우,
"blue": {
    "alias": "special_alias",
},
이 설정은 별칭 special_alias가 지정된 아티팩트 파티션의 테이블 default_table_name에서 데이터를 가져옵니다(설정에서는 blue로 지칭됨).

고급 사용

인테그레이션의 고급 사용에 대해서는 다음 전체 코드 예제를 참고하세요:

W&B Launch 사용하기

현재 활발히 개발 중인 베타 제품입니다. Launch에 관심이 있으신가요? W&B Launch 고객 파일럿 프로그램 참여에 대해 상담하려면 담당 계정팀에 문의하세요. 파일럿 고객으로 베타 프로그램에 참여하려면 AWS EKS 또는 SageMaker를 사용해야 합니다. 향후에는 추가 플랫폼도 지원할 예정입니다.
계속하기 전에 W&B Launch 사용 방법을 충분히 이해하는 것을 권장합니다. Launch 가이드를 읽어보세요. Dagster 인테그레이션은 다음과 같은 작업에 도움이 됩니다:
  • Dagster 인스턴스에서 하나 이상의 Launch 에이전트 실행
  • Dagster 인스턴스 내에서 로컬 Launch 작업 실행
  • 온프레미스 또는 클라우드에서 원격 Launch 작업 실행

Launch 에이전트

이 인테그레이션은 임포트할 수 있는 run_launch_agent라는 @op를 제공합니다. 이 @op는 Launch Agent를 시작하고, 수동으로 중지할 때까지 장기 실행 프로세스로 계속 실행합니다. 에이전트는 Launch 큐를 폴링하여 작업을 순서대로 실행하거나, 실행되도록 외부 서비스에 디스패치하는 프로세스입니다. Launch 페이지를 참고하세요. Launchpad에서는 모든 속성에 대한 유용한 설명도 확인할 수 있습니다.
Dagster 인테그레이션용 에이전트 설정 옵션과 설명이 포함된 W&B Launchpad 인터페이스
단순한 예
# config.yaml에 추가하세요
# 또는 Dagit의 Launchpad나 JobDefinition.execute_in_process에서 설정할 수 있습니다
# 레퍼런스: https://docs.dagster.io/concepts/configuration/config-schema#specifying-runtime-configuration
resources:
 wandb_config:
   config:
     entity: my_entity # 본인의 W&B entity로 교체하세요
     project: my_project # 본인의 W&B 프로젝트로 교체하세요
ops:
 run_launch_agent:
   config:
     max_jobs: -1
     queues: 
       - my_dagster_queue

from dagster_wandb.launch.ops import run_launch_agent
from dagster_wandb.resources import wandb_resource

from dagster import job, make_values_resource

@job(
   resource_defs={
       "wandb_config": make_values_resource(
           entity=str,
           project=str,
       ),
       "wandb_resource": wandb_resource.configured(
           {"api_key": {"env": "WANDB_API_KEY"}}
       ),
   },
)
def run_launch_agent_example():
   run_launch_agent()

Launch 작업

이 인테그레이션은 임포트 가능한 @oprun_launch_job을 제공합니다. 이 @op은 Launch 작업을 실행합니다. Launch 작업을 실행하려면 먼저 큐에 부여해야 합니다. 큐를 생성하거나 기본 큐를 사용할 수 있습니다. 해당 큐를 수신하는 활성 에이전트가 있는지 확인하세요. Dagster 인스턴스 내에서 에이전트를 실행할 수도 있고, Kubernetes에서 배포 가능한 에이전트 사용도 고려할 수 있습니다. Launch 페이지를 참고하세요. Launchpad에서는 모든 속성에 대한 유용한 설명도 확인할 수 있습니다.
Dagster 인테그레이션용 작업 설정 옵션과 설명이 표시된 W&B Launchpad 인터페이스
간단한 예시
# config.yaml에 추가하세요
# 또는 Dagit의 Launchpad나 JobDefinition.execute_in_process에서 설정할 수 있습니다
# 레퍼런스: https://docs.dagster.io/concepts/configuration/config-schema#specifying-runtime-configuration
resources:
 wandb_config:
   config:
     entity: my_entity # 본인의 W&B entity로 교체하세요
     project: my_project # 본인의 W&B 프로젝트로 교체하세요
ops:
 my_launched_job:
   config:
     entry_point:
       - python
       - train.py
     queue: my_dagster_queue
     uri: https://github.com/wandb/example-dagster-integration-with-launch


from dagster_wandb.launch.ops import run_launch_job
from dagster_wandb.resources import wandb_resource

from dagster import job, make_values_resource


@job(resource_defs={
       "wandb_config": make_values_resource(
           entity=str,
           project=str,
       ),
       "wandb_resource": wandb_resource.configured(
           {"api_key": {"env": "WANDB_API_KEY"}}
       ),
   },
)
def run_launch_job_example():
   run_launch_job.alias("my_launched_job")() # 별칭으로 작업 이름을 변경합니다

모범 사례

  1. IO Manager를 사용해 아티팩트를 조회하고 쓰세요. 아티팩트.download() 또는 Run.log_artifact()를 직접 사용하지 마세요. 이러한 메서드는 인테그레이션에서 처리됩니다. 대신 아티팩트에 저장하려는 데이터를 반환하고, 나머지는 인테그레이션이 처리하도록 하세요. 이 방식은 아티팩트의 리니지를 더 잘 제공합니다.
  2. 복잡한 사용 사례에서만 아티팩트 객체를 직접 생성하세요. Python 객체와 W&B 객체는 ops/assets에서 반환해야 합니다. 인테그레이션이 아티팩트 번들링을 처리합니다. 복잡한 사용 사례에서는 Dagster 작업에서 아티팩트를 직접 생성할 수 있습니다. 소스 인테그레이션 이름과 버전, 사용한 Python 버전, pickle 프로토콜 버전 등의 메타데이터를 보강할 수 있도록 아티팩트 객체를 인테그레이션에 전달하는 것을 권장합니다.
  3. 메타데이터를 통해 아티팩트에 파일, 디렉터리, 외부 참조를 추가하세요. 인테그레이션 wandb_artifact_configuration 객체를 사용해 파일, 디렉터리 또는 외부 참조(Amazon S3, GCS, HTTP…)를 추가하세요. 자세한 내용은 아티팩트 설정 section의 고급 예제를 참조하세요.
  4. 아티팩트가 생성될 때는 @op 대신 @asset을 사용하세요. 아티팩트는 asset입니다. Dagster가 해당 asset을 관리하는 경우 asset을 사용하는 것이 좋습니다. 이렇게 하면 Dagit Asset Catalog에서 더 나은 가시성을 얻을 수 있습니다.
  5. Dagster 외부에서 생성된 아티팩트를 사용하려면 SourceAsset을 사용하세요. 이렇게 하면 인테그레이션을 활용해 외부에서 생성된 아티팩트를 읽을 수 있습니다. 그렇지 않으면 인테그레이션에서 생성한 아티팩트만 사용할 수 있습니다.
  6. 대규모 모델 트레이닝을 전용 컴퓨트에서 오케스트레이션하려면 W&B Launch를 사용하세요. 작은 모델은 Dagster cluster 내에서 트레이닝할 수 있고, GPU 노드가 있는 Kubernetes cluster에서 Dagster를 실행할 수도 있습니다. 대규모 모델 트레이닝에는 W&B Launch를 사용하는 것을 권장합니다. 이렇게 하면 인스턴스 과부하를 방지하고 더 적절한 컴퓨트에 액세스할 수 있습니다.
  7. Dagster 내에서 experiment tracking을 수행할 때는 W&B Run ID를 Dagster Run ID 값으로 설정하세요. Run을 재개 가능하게 설정하고 W&B Run ID를 Dagster Run ID 또는 원하는 문자열로 설정하는 두 가지를 모두 권장합니다. 이 권장 사항을 따르면 Dagster 내에서 모델을 트레이닝할 때 W&B 메트릭과 W&B 아티팩트가 동일한 W&B Run에 저장됩니다.
W&B Run ID를 Dagster Run ID로 설정하세요.
wandb.init(
    id=context.run_id,
    resume="allow",
    ...
)
또는 직접 W&B Run ID를 지정해 IO Manager 설정에 전달하세요.
wandb.init(
    id="my_resumable_run_id",
    resume="allow",
    ...
)

@job(
   resource_defs={
       "io_manager": wandb_artifacts_io_manager.configured(
           {"wandb_run_id": "my_resumable_run_id"}
       ),
   }
)
  1. 대용량 W&B 아티팩트의 경우 get 또는 get_path를 사용해 필요한 데이터만 가져오세요. 기본적으로 인테그레이션은 아티팩트 전체를 다운로드합니다. 매우 큰 아티팩트를 사용하는 경우에는 필요한 특정 파일이나 객체만 가져오는 것이 좋습니다. 이렇게 하면 속도와 리소스 사용량이 향상됩니다.
  2. Python 객체의 경우 사용 사례에 맞게 pickling 모듈을 조정하세요. 기본적으로 W&B 인테그레이션은 표준 pickle 모듈을 사용합니다. 하지만 일부 객체는 이 모듈과 호환되지 않습니다. 예를 들어, yield를 사용하는 함수는 pickle하려고 하면 오류가 발생합니다. W&B는 다른 Pickle 기반 직렬화 모듈(dill, cloudpickle, joblib)도 지원합니다.
직렬화된 문자열을 반환하거나 아티팩트를 직접 생성하여 ONNX 또는 PMML 같은 더 고급 직렬화 방식을 사용할 수도 있습니다. 적절한 선택은 사용 사례에 따라 달라지므로, 이 주제에 관한 관련 문헌을 참고하세요.