mephisto.tools.scripts

Utilities that are useful for Mephisto-related scripts.

View Source
#!/usr/bin/env python3

# Copyright (c) Meta Platforms and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
"""
Utilities that are useful for Mephisto-related scripts.
"""

from mephisto.abstractions.databases.local_database import LocalMephistoDB
from mephisto.operations.operator import Operator
from mephisto.abstractions.databases.local_singleton_database import MephistoSingletonDB
from mephisto.utils.testing import get_mock_requester
from mephisto.utils.dirs import get_root_data_dir, get_run_file_dir
from mephisto.operations.hydra_config import (
    build_default_task_config,
    register_script_config,
    TaskConfig,
)

from omegaconf import DictConfig, OmegaConf

import functools
import hydra
import argparse
import subprocess
from typing import (
    Tuple,
    Dict,
    Any,
    Type,
    TypeVar,
    Callable,
    Optional,
    cast,
    TYPE_CHECKING,
)
import os

if TYPE_CHECKING:
    from mephisto.abstractions.database import MephistoDB


def load_db_and_process_config(
    cfg: DictConfig, print_config=False
) -> Tuple["MephistoDB", DictConfig]:
    """
    Using a Hydra DictConfig built from a TaskConfig,
    load the desired MephistoDB and
    validate the config against the database contents, then
    return the database and validated config.

    Takes in an option to print out the configuration before returning
    """
    db = get_db_from_config(cfg)
    valid_config = augment_config_from_db(cfg, db)
    if print_config:
        print(OmegaConf.to_yaml(valid_config))
    return db, valid_config


def process_config_and_get_operator(
    cfg: DictConfig, print_config=False
) -> Tuple["Operator", DictConfig]:
    """
    Using a Hydra DictConfig built from a TaskConfig,
    return an operator for that task as well as a validated config.

    Takes in an option to print out the configuration before returning
    """
    db, valid_config = load_db_and_process_config(cfg, print_config=print_config)
    return Operator(db), valid_config


TaskFunction = TypeVar("TaskFunction", bound=Callable[..., Any])


def task_script(
    config: Optional[Type[TaskConfig]] = None,
    default_config_file: Optional[str] = None,
    config_path: str = "hydra_configs",  # Override if using a different dir
) -> Callable[[TaskFunction], Any]:
    """
    Create a decorator for the main of a Mephisto task script

    Must provide one of config (a TaskConfig dataclass) or default_config_file
    (the location of a default task config hydra yaml), the former
    will be preferred.

    May specify a config_path override if not using `hydra_configs` at the
    run script location.
    """
    if config is not None:
        used_config = config
    else:
        assert (
            default_config_file is not None
        ), "Must provide one of config or default_config_file"
        used_config = build_default_task_config(default_config_file)
    register_script_config(name="taskconfig", module=used_config)

    def task_script_wrapper(script_func: TaskFunction) -> TaskFunction:
        @functools.wraps(script_func)
        def process_config_and_run_main(cfg: "DictConfig"):
            operator, cfg = process_config_and_get_operator(cfg)
            try:
                ret_val = script_func(operator, cfg)
            except Exception as e:
                raise e
            finally:
                if not operator.is_shutdown:
                    operator.shutdown()
            return ret_val

        absolute_config_path = os.path.abspath(
            os.path.join(get_run_file_dir(), config_path)
        )
        hydra_wrapper = hydra.main(
            config_path=absolute_config_path, config_name="taskconfig"
        )
        return cast(TaskFunction, hydra_wrapper(process_config_and_run_main))

    return task_script_wrapper


def get_db_from_config(cfg: DictConfig) -> "MephistoDB":
    """
    Get a MephistoDB from the given configuration. As of now
    this defaults to a LocalMephistoDB
    """
    datapath = cfg.mephisto.get("datapath", None)

    if datapath is None:
        datapath = get_root_data_dir()

    database_path = os.path.join(datapath, "database.db")

    database_type = cfg.mephisto.database._database_type

    if database_type == "local":
        return LocalMephistoDB(database_path=database_path)
    elif database_type == "singleton":
        return MephistoSingletonDB(database_path=database_path)
    else:
        raise AssertionError(f"Provided database_type {database_type} is not valid")


def augment_config_from_db(script_cfg: DictConfig, db: "MephistoDB") -> DictConfig:
    """
    Check the database for validity of the incoming MephistoConfig, ensure
    that the config has all the necessary fields set.
    """
    cfg = script_cfg.mephisto
    requester_name = cfg.provider.get("requester_name", None)
    provider_type = cfg.provider.get("_provider_type", None)
    architect_type = cfg.architect.get("_architect_type", None)

    if requester_name is None:
        if provider_type is None:
            print("No requester specified, defaulting to mock")
            provider_type = "mock"
        if provider_type == "mock":
            req = get_mock_requester(db)
            requester_name = req.requester_name
        else:
            reqs = db.find_requesters(provider_type=provider_type)
            if len(reqs) == 0:
                print(
                    f"No requesters found for provider type {provider_type}, please "
                    f"register one. You can register with `mephisto register {provider_type}`, "
                    f"or `python mephisto/client/cli.py register {provider_type}` if you haven't "
                    "installed Mephisto using poetry."
                )
                exit(1)
            elif len(reqs) == 1:
                req = reqs[0]
                requester_name = req.requester_name
                print(
                    f"Found one `{provider_type}` requester to launch with: {requester_name}"
                )
            else:
                req = reqs[-1]
                requester_name = req.requester_name
                print(
                    f"Found many `{provider_type}` requesters to launch with, "
                    f"choosing the most recent: {requester_name}"
                )
    else:
        # Ensure provided requester exists
        reqs = db.find_requesters(requester_name=requester_name)
        if len(reqs) == 0:
            print(
                f"No requesters found under name {requester_name}, "
                "have you registered with `mephisto register`?"
            )
            exit(1)
        provider_type = reqs[0].provider_type

    if provider_type in ["mturk"]:
        input(
            f"This task is going to launch live on {provider_type}, press enter to continue: "
        )
    if provider_type in ["mturk_sandbox", "mturk"] and architect_type not in [
        "heroku",
        "ec2",
    ]:
        input(
            f"This task is going to launch live on {provider_type}, but your "
            f"provided architect is {architect_type}, are you sure you "
            "want to do this? : "
        )

    cfg.provider.requester_name = requester_name
    cfg.provider._provider_type = provider_type
    return script_cfg


def build_custom_bundle(custom_src_dir):
    """Locate all of the custom files used for a custom build, create
    a prebuild directory containing all of them, then build the
    custom source.

    Check dates to only go through this build process when files have changes
    """
    IGNORE_FOLDERS = {"node_modules", "build"}

    prebuild_path = os.path.join(custom_src_dir, "webapp")

    IGNORE_FOLDERS = {os.path.join(prebuild_path, f) for f in IGNORE_FOLDERS}
    build_path = os.path.join(prebuild_path, "build", "bundle.js")

    # see if we need to rebuild
    if os.path.exists(build_path):
        created_date = os.path.getmtime(build_path)
        up_to_date = True

        for root, dirs, files in os.walk(prebuild_path):
            for igf in IGNORE_FOLDERS:
                should_ignore = False
                if igf in root:
                    should_ignore = True
            if should_ignore:
                continue
            if not up_to_date:
                break
            for fname in files:
                path = os.path.join(root, fname)
                if os.path.getmtime(path) > created_date:
                    up_to_date = False
                    break
        if up_to_date:
            return build_path

    # navigate and build
    return_dir = os.getcwd()
    os.chdir(prebuild_path)
    packages_installed = subprocess.call(["npm", "install"])
    if packages_installed != 0:
        raise Exception(
            "please make sure npm is installed, otherwise view "
            "the above error for more info."
        )

    webpack_complete = subprocess.call(["npm", "run", "dev"])
    if webpack_complete != 0:
        raise Exception(
            "Webpack appears to have failed to build your "
            "frontend. See the above error for more information."
        )

    # cleanup and return
    os.chdir(return_dir)
    return build_path
#   def load_db_and_process_config( cfg: omegaconf.dictconfig.DictConfig, print_config=False ) -> tuple[mephisto.abstractions.database.MephistoDB, omegaconf.dictconfig.DictConfig]:
View Source
def load_db_and_process_config(
    cfg: DictConfig, print_config=False
) -> Tuple["MephistoDB", DictConfig]:
    """
    Using a Hydra DictConfig built from a TaskConfig,
    load the desired MephistoDB and
    validate the config against the database contents, then
    return the database and validated config.

    Takes in an option to print out the configuration before returning
    """
    db = get_db_from_config(cfg)
    valid_config = augment_config_from_db(cfg, db)
    if print_config:
        print(OmegaConf.to_yaml(valid_config))
    return db, valid_config

Using a Hydra DictConfig built from a TaskConfig, load the desired MephistoDB and validate the config against the database contents, then return the database and validated config.

Takes in an option to print out the configuration before returning

#   def process_config_and_get_operator( cfg: omegaconf.dictconfig.DictConfig, print_config=False ) -> tuple[mephisto.operations.operator.Operator, omegaconf.dictconfig.DictConfig]:
View Source
def process_config_and_get_operator(
    cfg: DictConfig, print_config=False
) -> Tuple["Operator", DictConfig]:
    """
    Using a Hydra DictConfig built from a TaskConfig,
    return an operator for that task as well as a validated config.

    Takes in an option to print out the configuration before returning
    """
    db, valid_config = load_db_and_process_config(cfg, print_config=print_config)
    return Operator(db), valid_config

Using a Hydra DictConfig built from a TaskConfig, return an operator for that task as well as a validated config.

Takes in an option to print out the configuration before returning

#   def task_script( config: Union[Type[mephisto.operations.hydra_config.TaskConfig], NoneType] = None, default_config_file: Union[str, NoneType] = None, config_path: str = 'hydra_configs' ) -> Callable[[~TaskFunction], Any]:
View Source
def task_script(
    config: Optional[Type[TaskConfig]] = None,
    default_config_file: Optional[str] = None,
    config_path: str = "hydra_configs",  # Override if using a different dir
) -> Callable[[TaskFunction], Any]:
    """
    Create a decorator for the main of a Mephisto task script

    Must provide one of config (a TaskConfig dataclass) or default_config_file
    (the location of a default task config hydra yaml), the former
    will be preferred.

    May specify a config_path override if not using `hydra_configs` at the
    run script location.
    """
    if config is not None:
        used_config = config
    else:
        assert (
            default_config_file is not None
        ), "Must provide one of config or default_config_file"
        used_config = build_default_task_config(default_config_file)
    register_script_config(name="taskconfig", module=used_config)

    def task_script_wrapper(script_func: TaskFunction) -> TaskFunction:
        @functools.wraps(script_func)
        def process_config_and_run_main(cfg: "DictConfig"):
            operator, cfg = process_config_and_get_operator(cfg)
            try:
                ret_val = script_func(operator, cfg)
            except Exception as e:
                raise e
            finally:
                if not operator.is_shutdown:
                    operator.shutdown()
            return ret_val

        absolute_config_path = os.path.abspath(
            os.path.join(get_run_file_dir(), config_path)
        )
        hydra_wrapper = hydra.main(
            config_path=absolute_config_path, config_name="taskconfig"
        )
        return cast(TaskFunction, hydra_wrapper(process_config_and_run_main))

    return task_script_wrapper

Create a decorator for the main of a Mephisto task script

Must provide one of config (a TaskConfig dataclass) or default_config_file (the location of a default task config hydra yaml), the former will be preferred.

May specify a config_path override if not using hydra_configs at the run script location.

#   def get_db_from_config( cfg: omegaconf.dictconfig.DictConfig ) -> mephisto.abstractions.database.MephistoDB:
View Source
def get_db_from_config(cfg: DictConfig) -> "MephistoDB":
    """
    Get a MephistoDB from the given configuration. As of now
    this defaults to a LocalMephistoDB
    """
    datapath = cfg.mephisto.get("datapath", None)

    if datapath is None:
        datapath = get_root_data_dir()

    database_path = os.path.join(datapath, "database.db")

    database_type = cfg.mephisto.database._database_type

    if database_type == "local":
        return LocalMephistoDB(database_path=database_path)
    elif database_type == "singleton":
        return MephistoSingletonDB(database_path=database_path)
    else:
        raise AssertionError(f"Provided database_type {database_type} is not valid")

Get a MephistoDB from the given configuration. As of now this defaults to a LocalMephistoDB

#   def augment_config_from_db( script_cfg: omegaconf.dictconfig.DictConfig, db: mephisto.abstractions.database.MephistoDB ) -> omegaconf.dictconfig.DictConfig:
View Source
def augment_config_from_db(script_cfg: DictConfig, db: "MephistoDB") -> DictConfig:
    """
    Check the database for validity of the incoming MephistoConfig, ensure
    that the config has all the necessary fields set.
    """
    cfg = script_cfg.mephisto
    requester_name = cfg.provider.get("requester_name", None)
    provider_type = cfg.provider.get("_provider_type", None)
    architect_type = cfg.architect.get("_architect_type", None)

    if requester_name is None:
        if provider_type is None:
            print("No requester specified, defaulting to mock")
            provider_type = "mock"
        if provider_type == "mock":
            req = get_mock_requester(db)
            requester_name = req.requester_name
        else:
            reqs = db.find_requesters(provider_type=provider_type)
            if len(reqs) == 0:
                print(
                    f"No requesters found for provider type {provider_type}, please "
                    f"register one. You can register with `mephisto register {provider_type}`, "
                    f"or `python mephisto/client/cli.py register {provider_type}` if you haven't "
                    "installed Mephisto using poetry."
                )
                exit(1)
            elif len(reqs) == 1:
                req = reqs[0]
                requester_name = req.requester_name
                print(
                    f"Found one `{provider_type}` requester to launch with: {requester_name}"
                )
            else:
                req = reqs[-1]
                requester_name = req.requester_name
                print(
                    f"Found many `{provider_type}` requesters to launch with, "
                    f"choosing the most recent: {requester_name}"
                )
    else:
        # Ensure provided requester exists
        reqs = db.find_requesters(requester_name=requester_name)
        if len(reqs) == 0:
            print(
                f"No requesters found under name {requester_name}, "
                "have you registered with `mephisto register`?"
            )
            exit(1)
        provider_type = reqs[0].provider_type

    if provider_type in ["mturk"]:
        input(
            f"This task is going to launch live on {provider_type}, press enter to continue: "
        )
    if provider_type in ["mturk_sandbox", "mturk"] and architect_type not in [
        "heroku",
        "ec2",
    ]:
        input(
            f"This task is going to launch live on {provider_type}, but your "
            f"provided architect is {architect_type}, are you sure you "
            "want to do this? : "
        )

    cfg.provider.requester_name = requester_name
    cfg.provider._provider_type = provider_type
    return script_cfg

Check the database for validity of the incoming MephistoConfig, ensure that the config has all the necessary fields set.

#   def build_custom_bundle(custom_src_dir):
View Source
def build_custom_bundle(custom_src_dir):
    """Locate all of the custom files used for a custom build, create
    a prebuild directory containing all of them, then build the
    custom source.

    Check dates to only go through this build process when files have changes
    """
    IGNORE_FOLDERS = {"node_modules", "build"}

    prebuild_path = os.path.join(custom_src_dir, "webapp")

    IGNORE_FOLDERS = {os.path.join(prebuild_path, f) for f in IGNORE_FOLDERS}
    build_path = os.path.join(prebuild_path, "build", "bundle.js")

    # see if we need to rebuild
    if os.path.exists(build_path):
        created_date = os.path.getmtime(build_path)
        up_to_date = True

        for root, dirs, files in os.walk(prebuild_path):
            for igf in IGNORE_FOLDERS:
                should_ignore = False
                if igf in root:
                    should_ignore = True
            if should_ignore:
                continue
            if not up_to_date:
                break
            for fname in files:
                path = os.path.join(root, fname)
                if os.path.getmtime(path) > created_date:
                    up_to_date = False
                    break
        if up_to_date:
            return build_path

    # navigate and build
    return_dir = os.getcwd()
    os.chdir(prebuild_path)
    packages_installed = subprocess.call(["npm", "install"])
    if packages_installed != 0:
        raise Exception(
            "please make sure npm is installed, otherwise view "
            "the above error for more info."
        )

    webpack_complete = subprocess.call(["npm", "run", "dev"])
    if webpack_complete != 0:
        raise Exception(
            "Webpack appears to have failed to build your "
            "frontend. See the above error for more information."
        )

    # cleanup and return
    os.chdir(return_dir)
    return build_path

Locate all of the custom files used for a custom build, create a prebuild directory containing all of them, then build the custom source.

Check dates to only go through this build process when files have changes