mephisto.tools.data_browser
View Source
#!/usr/bin/env python3 # Copyright (c) Meta Platforms and its affiliates. # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. from mephisto.abstractions.database import MephistoDB from mephisto.data_model.unit import Unit from mephisto.data_model.task_run import TaskRun from mephisto.abstractions.blueprint import AgentState from mephisto.data_model.agent import Agent from mephisto.data_model.worker import Worker from mephisto.abstractions.databases.local_database import LocalMephistoDB from mephisto.data_model.constants.assignment_state import AssignmentState from typing import List, Optional, Any, Dict class DataBrowser: """ Class with convenience methods for getting completed data back from runs to parse and manage with other scripts """ def __init__(self, db=None): if db is None: db = LocalMephistoDB() self.db = db def _get_units_for_task_runs(self, task_runs: List[TaskRun]) -> List[Unit]: """ Return a list of all Units in a terminal completed state from all the provided TaskRuns. """ units = [] for task_run in task_runs: assignments = task_run.get_assignments() for assignment in assignments: found_units = assignment.get_units() for unit in found_units: if unit.get_status() in [ AssignmentState.COMPLETED, AssignmentState.ACCEPTED, AssignmentState.REJECTED, AssignmentState.SOFT_REJECTED, ]: units.append(unit) return units def get_task_name_list(self) -> List[str]: return [task.task_name for task in self.db.find_tasks()] def get_units_for_task_name(self, task_name: str) -> List[Unit]: """ Return a list of all Units in a terminal completed state from all task runs with the given task_name """ tasks = self.db.find_tasks(task_name=task_name) assert len(tasks) >= 1, f"No task found under name {task_name}" task_runs = self.db.find_task_runs(task_id=tasks[0].db_id) return self._get_units_for_task_runs(task_runs) def get_units_for_run_id(self, run_id: str) -> List[Unit]: """ Return a list of all Units in a terminal completed state from the task run with the given run_id """ task_run = TaskRun.get(self.db, run_id) return self._get_units_for_task_runs([task_run]) def get_data_from_unit(self, unit: Unit) -> Dict[str, Any]: """ Return a dict containing all data associated with the given unit, including its status, data, and start and end time. Also includes the DB ids for the worker, the unit, and the relevant assignment this unit was a part of. """ agent = unit.get_assigned_agent() assert ( agent is not None ), f"Trying to get completed data from unassigned unit {unit}" return { "worker_id": agent.worker_id, "unit_id": unit.db_id, "assignment_id": unit.assignment_id, "status": agent.db_status, "data": agent.state.get_parsed_data(), "task_start": agent.state.get_task_start(), "task_end": agent.state.get_task_end(), } def get_workers_with_qualification(self, qualification_name: str) -> List[Worker]: """ Returns a list of 'Worker's for workers who are qualified wrt `qualification_name`. """ qual_list = self.db.find_qualifications(qualification_name=qualification_name) assert len(qual_list) >= 1, f"No qualification found named {qualification_name}" qualification_id = qual_list[0].db_id qualifieds = self.db.check_granted_qualifications( qualification_id=qualification_id, value=1 ) return [Worker.get(self.db, qual.worker_id) for qual in qualifieds]
View Source
class DataBrowser: """ Class with convenience methods for getting completed data back from runs to parse and manage with other scripts """ def __init__(self, db=None): if db is None: db = LocalMephistoDB() self.db = db def _get_units_for_task_runs(self, task_runs: List[TaskRun]) -> List[Unit]: """ Return a list of all Units in a terminal completed state from all the provided TaskRuns. """ units = [] for task_run in task_runs: assignments = task_run.get_assignments() for assignment in assignments: found_units = assignment.get_units() for unit in found_units: if unit.get_status() in [ AssignmentState.COMPLETED, AssignmentState.ACCEPTED, AssignmentState.REJECTED, AssignmentState.SOFT_REJECTED, ]: units.append(unit) return units def get_task_name_list(self) -> List[str]: return [task.task_name for task in self.db.find_tasks()] def get_units_for_task_name(self, task_name: str) -> List[Unit]: """ Return a list of all Units in a terminal completed state from all task runs with the given task_name """ tasks = self.db.find_tasks(task_name=task_name) assert len(tasks) >= 1, f"No task found under name {task_name}" task_runs = self.db.find_task_runs(task_id=tasks[0].db_id) return self._get_units_for_task_runs(task_runs) def get_units_for_run_id(self, run_id: str) -> List[Unit]: """ Return a list of all Units in a terminal completed state from the task run with the given run_id """ task_run = TaskRun.get(self.db, run_id) return self._get_units_for_task_runs([task_run]) def get_data_from_unit(self, unit: Unit) -> Dict[str, Any]: """ Return a dict containing all data associated with the given unit, including its status, data, and start and end time. Also includes the DB ids for the worker, the unit, and the relevant assignment this unit was a part of. """ agent = unit.get_assigned_agent() assert ( agent is not None ), f"Trying to get completed data from unassigned unit {unit}" return { "worker_id": agent.worker_id, "unit_id": unit.db_id, "assignment_id": unit.assignment_id, "status": agent.db_status, "data": agent.state.get_parsed_data(), "task_start": agent.state.get_task_start(), "task_end": agent.state.get_task_end(), } def get_workers_with_qualification(self, qualification_name: str) -> List[Worker]: """ Returns a list of 'Worker's for workers who are qualified wrt `qualification_name`. """ qual_list = self.db.find_qualifications(qualification_name=qualification_name) assert len(qual_list) >= 1, f"No qualification found named {qualification_name}" qualification_id = qual_list[0].db_id qualifieds = self.db.check_granted_qualifications( qualification_id=qualification_id, value=1 ) return [Worker.get(self.db, qual.worker_id) for qual in qualifieds]
Class with convenience methods for getting completed data back from runs to parse and manage with other scripts
View Source
def __init__(self, db=None): if db is None: db = LocalMephistoDB() self.db = db
View Source
def get_task_name_list(self) -> List[str]: return [task.task_name for task in self.db.find_tasks()]
View Source
def get_units_for_task_name(self, task_name: str) -> List[Unit]: """ Return a list of all Units in a terminal completed state from all task runs with the given task_name """ tasks = self.db.find_tasks(task_name=task_name) assert len(tasks) >= 1, f"No task found under name {task_name}" task_runs = self.db.find_task_runs(task_id=tasks[0].db_id) return self._get_units_for_task_runs(task_runs)
Return a list of all Units in a terminal completed state from all task runs with the given task_name
View Source
def get_units_for_run_id(self, run_id: str) -> List[Unit]: """ Return a list of all Units in a terminal completed state from the task run with the given run_id """ task_run = TaskRun.get(self.db, run_id) return self._get_units_for_task_runs([task_run])
Return a list of all Units in a terminal completed state from the task run with the given run_id
View Source
def get_data_from_unit(self, unit: Unit) -> Dict[str, Any]: """ Return a dict containing all data associated with the given unit, including its status, data, and start and end time. Also includes the DB ids for the worker, the unit, and the relevant assignment this unit was a part of. """ agent = unit.get_assigned_agent() assert ( agent is not None ), f"Trying to get completed data from unassigned unit {unit}" return { "worker_id": agent.worker_id, "unit_id": unit.db_id, "assignment_id": unit.assignment_id, "status": agent.db_status, "data": agent.state.get_parsed_data(), "task_start": agent.state.get_task_start(), "task_end": agent.state.get_task_end(), }
Return a dict containing all data associated with the given unit, including its status, data, and start and end time.
Also includes the DB ids for the worker, the unit, and the relevant assignment this unit was a part of.
#  
def
get_workers_with_qualification(
self,
qualification_name: str
) -> List[mephisto.data_model.worker.Worker]:
View Source
def get_workers_with_qualification(self, qualification_name: str) -> List[Worker]: """ Returns a list of 'Worker's for workers who are qualified wrt `qualification_name`. """ qual_list = self.db.find_qualifications(qualification_name=qualification_name) assert len(qual_list) >= 1, f"No qualification found named {qualification_name}" qualification_id = qual_list[0].db_id qualifieds = self.db.check_granted_qualifications( qualification_id=qualification_id, value=1 ) return [Worker.get(self.db, qual.worker_id) for qual in qualifieds]
Returns a list of 'Worker's for workers who are qualified wrt qualification_name
.