
View Source
#!/usr/bin/env python3

# Copyright (c) Meta Platforms and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from flask import (  # type: ignore
from geventwebsocket import (  # type: ignore
from uuid import uuid4
import time
import json
import os
from werkzeug.utils import secure_filename  # type: ignore

from threading import Event

from typing import Dict, Tuple, List, Any, Optional, TYPE_CHECKING

    from geventwebsocket.handler import Client  # type: ignore
    from geventwebsocket.websocket import WebSocket  # type: ignore

# Constants


PACKET_TYPE_CLIENT_BOUND_LIVE_UPDATE = "client_bound_live_update"
PACKET_TYPE_MEPHISTO_BOUND_LIVE_UPDATE = "mephisto_bound_live_update"
PACKET_TYPE_ERROR = "log_error"

SYSTEM_CHANNEL_ID = "mephisto"

FAILED_PING_TIME = 15  # seconds
ALLOWED_EXTENSIONS = {"txt", "pdf", "png", "jpg", "jpeg", "gif"}

STATUS_INIT = "none"
STATUS_CONNECTED = "connected"
STATUS_COMPLETED = "completed"
STATUS_WAITING = "waiting"
STATUS_IN_TASK = "in task"
STATUS_ONBOARDING = "onboarding"


DEBUG = False

# Main application setup
mephisto_router = Blueprint(

def debug_log(*args):
    Log only if debugging is enabled

    Explicitly does not use the regular Mephisto logging framework as we
    may want to deploy this on a server that doesn't have Mephisto installed,
    and we can keep package size low this way.
    if DEBUG:

def js_time(python_time: float) -> int:
    """Convert python time to js time, as the mephisto-core package expects"""
    return int(python_time * 1000)

# Socket and agent details
class LocalAgentState:
    Keeps track of a connected agent over their lifecycle interacting with the router

    def __init__(self, agent_id: str):
        """Initialize an object to track the lifecycle of a connection"""
        self.status = STATUS_INIT
        self.agent_id = agent_id
        self.is_alive = False
        self.disconnect_time = 0
        self.last_ping = 0
        self.unsent_messages: List[Dict[str, Any]] = []

    def __str__(self):
        return f"Agent({self.agent_id}): {self.status}"

class MephistoRouterState:
    def __init__(self):
        self.agent_id_to_client: Dict[str, "Client"] = {}
        self.client_id_to_agent: Dict[str, LocalAgentState] = {}
        self.mephisto_socket: Optional["WebSocket"] = None
        self.agent_id_to_agent: Dict[str, LocalAgentState] = {}
        self.pending_agent_requests: Dict[str, bool] = {}
        self.received_agent_responses: Dict[str, Dict[str, Any]] = {}
        self.last_mephisto_ping: float = time.time()

mephisto_router_app: Optional["MephistoRouter"] = None
mephisto_router_state: Optional["MephistoRouterState"] = None

def register_router_application(router: "MephistoRouter") -> "MephistoRouterState":
    Register a routing application with the global state,
    such that HTTP requests can access it and so that
    all websocket routers share the same state.

    Returns the global router state
    global mephisto_router_app, mephisto_router_state
    mephisto_router_app = router
    if mephisto_router_state is None:
        mephisto_router_state = MephistoRouterState()
    return mephisto_router_state

class MephistoRouter(WebSocketApplication):
    Base implementation of a websocket server that handles
    all of the socket based IO for mephisto-core

    def __init__(self, *args, **kwargs):
        """Initialize with the gloabl state of MephistoRouters"""
        super().__init__(*args, **kwargs)
        self.mephisto_state = register_router_application(self)

    def _send_message(self, socket: "WebSocket", packet: Dict[str, Any]) -> None:
        """Send the given message through the given socket"""
        if not socket:
            # We should be passing a socket, even if it's closed...
            debug_log("No socket to send packet to", packet)

        if socket.closed:
            # Socket is already closed, noop

        packet["router_outgoing_timestamp"] = time.time()

    def _find_or_create_agent(self, agent_id: str) -> "LocalAgentState":
        """Get or create an agent state for the given id"""
        state = self.mephisto_state
        agent = state.agent_id_to_agent.get(agent_id)
        if agent is None:
            agent = LocalAgentState(agent_id)
            state.agent_id_to_agent[agent_id] = agent
        return agent

    def _handle_alive(self, client: "Client", alive_packet: Dict[str, Any]) -> None:
        On alive, find out who the sender is, and register
        them as correctly here.
        state = self.mephisto_state
        if alive_packet["subject_id"] == SYSTEM_CHANNEL_ID:
            state.mephisto_socket =
            agent_id = alive_packet["subject_id"]
            agent = self._find_or_create_agent(agent_id)
            agent.is_alive = True
            state.agent_id_to_client[agent_id] = client
            state.client_id_to_agent[client.mephisto_id] = agent

            # push out backlog
            while len(agent.unsent_messages) > 0:
                packet = agent.unsent_messages.pop(0)

    def _ensure_live_connection(self, agent: LocalAgentState) -> None:
        curr_status = agent.status
        last_ping = agent.last_ping
        if last_ping == 0:
            return  # Not a live task, nothing to check
        if curr_status not in [STATUS_ONBOARDING, STATUS_WAITING, STATUS_IN_TASK]:
            return  # not in a live state, no reason to check liveliness
        if time.time() - last_ping > FAILED_PING_TIME:
            agent.status = STATUS_DISCONNECTED

    def _handle_get_agent_status(self, agent_status_packet: Dict[str, Any]) -> None:
        On a get agent status request, check the status of all agents and
        respond to the core mephisto server with the current status of each.

        May return semi-stale information, but is non-blocking
        state = self.mephisto_state
        state.last_mephisto_ping = time.time()
        agent_statuses = {}
        for agent_id in state.agent_id_to_agent.keys():
            agent = self._find_or_create_agent(agent_id)
            if not agent.is_alive and agent.status != STATUS_DISCONNECTED:
            agent_statuses[agent_id] = state.agent_id_to_agent[agent_id].status
        packet = {
            "packet_type": PACKET_TYPE_RETURN_STATUSES,
            "subject_id": SYSTEM_CHANNEL_ID,
            "data": agent_statuses,
            "client_timestamp": agent_status_packet["server_timestamp"],
            "router_incoming_timestamp": agent_status_packet[

    def _handle_update_local_status(self, status_packet: Dict[str, Any]) -> None:
        """Update the local agent status given a status packet"""
        agent_id = status_packet["subject_id"]
        agent = self._find_or_create_agent(agent_id)
        if status_packet["data"].get("status") is not None:
            agent.status = status_packet["data"]["status"]

    def _handle_forward(self, packet: Dict[str, Any]) -> None:
        """Handle forwarding the given packet to the included subject_id"""
        if packet["subject_id"] == SYSTEM_CHANNEL_ID:
            debug_log("Sending message to Mephisto", packet)
            socket = self.mephisto_state.mephisto_socket
            debug_log("Sending message to agent", packet)
            agent_id = packet["subject_id"]
            agent = self._find_or_create_agent(agent_id)
            client = self.mephisto_state.agent_id_to_client.get(agent_id)
            if client is None:
            socket =
        self._send_message(socket, packet)

    def _followup_possible_disconnect(self, agent: LocalAgentState) -> None:
        """Check to see if the given agent is disconnected"""
        if agent.disconnect_time == 0:
            return  # Agent never disconnected, isn't live
        if time.time() - agent.disconnect_time > FAILED_RECONNECT_TIME:
            agent.status = STATUS_DISCONNECTED
            debug_log("Agent disconnected", agent)

    def _send_status_for_agent(self, agent_id: str) -> None:
        Send a packet that updates the client status for the given agent,
        pushing them the most recent local status.
        agent = self._find_or_create_agent(agent_id)
        packet = {
            "packet_type": PACKET_TYPE_UPDATE_STATUS,
            "subject_id": agent_id,
            "data": {
                "status": agent.status,

    def on_open(self) -> None:
        Initialize a new client connection, and give them a uuid to refer to
        current_client =
        debug_log("Some client connected!", current_client)
        current_client.mephisto_id = str(uuid4())

    def on_message(self, message: str) -> None:
        Determine the type of message, and then handle via the correct handler
        if message is None:

        state = self.mephisto_state
        current_client =
        client = current_client
        packet = json.loads(message)
        packet["router_incoming_timestamp"] = time.time()
        if packet["packet_type"] == PACKET_TYPE_REQUEST_STATUSES:
            debug_log("Mephisto requesting status")
        elif packet["packet_type"] == PACKET_TYPE_CLIENT_BOUND_LIVE_UPDATE:
        elif packet["packet_type"] == PACKET_TYPE_MEPHISTO_BOUND_LIVE_UPDATE:
            debug_log("Agent action: ", packet)
            self._send_message(self.mephisto_state.mephisto_socket, packet)
        elif packet["packet_type"] == PACKET_TYPE_ERROR:
        elif packet["packet_type"] == PACKET_TYPE_ALIVE:
            debug_log("Agent alive: ", packet)
            self._handle_alive(, packet)
        elif packet["packet_type"] == PACKET_TYPE_UPDATE_STATUS:
            debug_log("Update agent status", packet)
        elif packet["packet_type"] == PACKET_TYPE_AGENT_DETAILS:
            request_id = packet["data"].get("request_id")
            if request_id is None:
                request_id = packet["subject_id"]
            res_event = state.pending_agent_requests.get(request_id)
            if res_event is not None:
                state.received_agent_responses[request_id] = packet
                del state.pending_agent_requests[request_id]
        elif packet["packet_type"] == PACKET_TYPE_HEARTBEAT:
            packet["data"] = {"last_mephisto_ping": js_time(state.last_mephisto_ping)}
            agent_id = packet["subject_id"]
            agent = state.agent_id_to_agent.get(agent_id)
            if agent is not None:
                agent.is_alive = True
                packet["data"]["status"] = agent.status
                local_client = state.agent_id_to_client.get(agent.agent_id)
                if local_client != client and local_client is not None:
                    # Not communicating to the correct socket, update
                    debug_log("Updating client for ", agent)
                    state.agent_id_to_client[agent.agent_id] = client
                    state.client_id_to_agent[client.mephisto_id] = agent
            debug_log("Unknown message", packet)

    def on_close(self, reason: Any) -> None:
        """Mark a socket dead for a LocalAgentState, give time to reconnect"""
        client =
        debug_log("Some client disconnected!", client.mephisto_id)
        agent = self.mephisto_state.client_id_to_agent.get(client.mephisto_id)
        if agent is None:
            return  # Agent not being tracked
        agent.is_alive = False
        agent.disconnect_time = time.time()

    def make_agent_request(
        self, request_packet: Dict[str, Any]
    ) -> Optional[Dict[str, Any]]:
        """Make a request to the core Mephisto server, and then await the response"""
        request_id = request_packet["data"]["request_id"]

        self.mephisto_state.pending_agent_requests[request_id] = True
        self._send_message(self.mephisto_state.mephisto_socket, request_packet)
        start_time = time.time()
        res = None
        while time.time() - start_time < 30 and res is None:
            res = self.mephisto_state.received_agent_responses.get(request_id)
        if res is not None:
            del self.mephisto_state.received_agent_responses[request_id]
        return res

@mephisto_router.route("/request_agent", methods=["POST"])
def request_agent():
    router_incoming_timestamp = time.time()
    data = request.get_json()
    request_id = str(uuid4())
    provider_data = data["provider_data"]
    assert mephisto_router_app is not None, "Must initialize router before this call"
    packet = {
        "packet_type": PACKET_TYPE_REGISTER_AGENT,
        "subject_id": request_id,
        "data": {
            "provider_data": provider_data,
            "request_id": request_id,
        "client_timestamp": data["client_timestamp"],
        "router_incoming_timestamp": router_incoming_timestamp,
    res = mephisto_router_app.make_agent_request(packet)
    if res is not None:
        return jsonify(res)
        # Timed out waiting for Mephisto to respond
        return None

@mephisto_router.route("/submit_onboarding", methods=["POST"])
def submit_onboarding():
    Parse onboarding as if it were a request sent from the
    active agent, rather than coming as a request from the router.
    router_incoming_timestamp = time.time()
    data = request.get_json()
    provider_data = data["provider_data"]
    agent_id = provider_data["USED_AGENT_ID"]
    del provider_data["USED_AGENT_ID"]
    provider_data["request_id"] = str(uuid4())

    if "onboarding_data" not in provider_data:
        provider_data["onboarding_data"] = {}

    # Construct and send onboarding submission packet
    packet = {
        "packet_type": PACKET_TYPE_SUBMIT_ONBOARDING,
        "subject_id": agent_id,
        "data": provider_data,
        "client_timestamp": data["client_timestamp"],
        "router_incoming_timestamp": router_incoming_timestamp,
    res = mephisto_router_app.make_agent_request(packet)
    if res is not None:
        return jsonify(res)
        # Timed out waiting for Mephisto to respond
        return None

@mephisto_router.route("/submit_task", methods=["POST"])
def submit_task():
    """Parse task submission as if it were an act"""
    router_incoming_timestamp = time.time()
    provider_data = request.get_json()
    filenames = []
    if provider_data is None:
        # Multipart form submit
        provider_data = request.form.to_dict()
        files = request.files.to_dict()
        if len(files) > 0:
            timestamp = int(time.time())
            rand = str(uuid4())[:8]
            for filename, filepoint in files.items():
                full_name = f"{timestamp}-{rand}-{secure_filename(filename)}"
      "/tmp/", full_name))
                filenames.append({"filename": full_name})

    extracted_data = provider_data.get("final_data")
    if extracted_data is None and "final_string_data" in provider_data:
        extracted_data = json.loads(provider_data.final_string_data)

    if len(filenames) > 0:
        extracted_data.files = filenames

    agent_id = provider_data["USED_AGENT_ID"]

    packet = {
        "packet_type": PACKET_TYPE_SUBMIT_UNIT,
        "subject_id": agent_id,
        "data": extracted_data,
        "client_timestamp": data["client_timestamp"],
        "router_incoming_timestamp": router_incoming_timestamp,
    return jsonify({"status": "Error log sent!"})

@mephisto_router.route("/log_error", methods=["POST"])
def log_error():
    router_incoming_timestamp = time.time()
    data = request.get_json()
    packet = {
        "packet_type": PACKET_TYPE_ERROR,
        "subject_id": data["USED_AGENT_ID"],
        "data": data["error_data"],
        "client_timestamp": data["client_timestamp"],
        "router_incoming_timestamp": router_incoming_timestamp,
    return jsonify({"status": "Error log sent!"})

@mephisto_router.route("/is_alive", methods=["GET"])
def is_alive():
    return jsonify({"status": "Alive!"})

@mephisto_router.route("/get_timestamp", methods=["GET"])
def get_timestamp():
    return jsonify({"timestamp": time.time()})

@mephisto_router.route("/download_file/<filename>", methods=["GET"])
def download_file(filename):
        return send_from_directory("/tmp/", filename)

def show_index():
        return send_from_directory("static", "index.html")

def get_task_config(res):
    args = request.args
    mephisto_core_version = args.get("mephisto_core_version")
    if mephisto_core_version != CURR_MEPHISTO_CORE_PACKAGE_VERSION:
                "packet_type": PACKET_TYPE_ERROR,
                "subject_id": SYSTEM_CHANNEL_ID,
                "data": {
                    "error_type": "version-mismatch",
                    "text": (
                        "Package `mephisto-core` version mismatch. Expected "
                        f"version {CURR_MEPHISTO_CORE_PACKAGE_VERSION} but frontend is "
                        f"currently using {mephisto_core_version}. This may "
                        "cause unexpected errors, be sure to update your "
                        "`mephisto-core` dependency with `npm install "
                        f"mephisto-core@{CURR_MEPHISTO_CORE_PACKAGE_VERSION} --save`. "
                        "If this warning still persists or the version isn't found, "
                        "please open an issue at "
        return send_from_directory("static", "task_config.json")

def get_static(res):
        return send_from_directory("static", res)

def add_header(r):
    Add headers to prevent caching, as this server may be used in local
    development or with the same address but different contents
    r.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
    r.headers["Pragma"] = "no-cache"
    r.headers["Expires"] = "0"
    r.headers["Cache-Control"] = "public, max-age=0"
    return r
#   def debug_log(*args):
View Source
def debug_log(*args):
    Log only if debugging is enabled

    Explicitly does not use the regular Mephisto logging framework as we
    may want to deploy this on a server that doesn't have Mephisto installed,
    and we can keep package size low this way.
    if DEBUG:

Log only if debugging is enabled

Explicitly does not use the regular Mephisto logging framework as we may want to deploy this on a server that doesn't have Mephisto installed, and we can keep package size low this way.

#   def js_time(python_time: float) -> int:
View Source
def js_time(python_time: float) -> int:
    """Convert python time to js time, as the mephisto-core package expects"""
    return int(python_time * 1000)

Convert python time to js time, as the mephisto-core package expects

#   class LocalAgentState:
View Source
class LocalAgentState:
    Keeps track of a connected agent over their lifecycle interacting with the router

    def __init__(self, agent_id: str):
        """Initialize an object to track the lifecycle of a connection"""
        self.status = STATUS_INIT
        self.agent_id = agent_id
        self.is_alive = False
        self.disconnect_time = 0
        self.last_ping = 0
        self.unsent_messages: List[Dict[str, Any]] = []

    def __str__(self):
        return f"Agent({self.agent_id}): {self.status}"

Keeps track of a connected agent over their lifecycle interacting with the router

#   LocalAgentState(agent_id: str)
View Source
    def __init__(self, agent_id: str):
        """Initialize an object to track the lifecycle of a connection"""
        self.status = STATUS_INIT
        self.agent_id = agent_id
        self.is_alive = False
        self.disconnect_time = 0
        self.last_ping = 0
        self.unsent_messages: List[Dict[str, Any]] = []

Initialize an object to track the lifecycle of a connection

#   class MephistoRouterState:
View Source
class MephistoRouterState:
    def __init__(self):
        self.agent_id_to_client: Dict[str, "Client"] = {}
        self.client_id_to_agent: Dict[str, LocalAgentState] = {}
        self.mephisto_socket: Optional["WebSocket"] = None
        self.agent_id_to_agent: Dict[str, LocalAgentState] = {}
        self.pending_agent_requests: Dict[str, bool] = {}
        self.received_agent_responses: Dict[str, Dict[str, Any]] = {}
        self.last_mephisto_ping: float = time.time()
#   MephistoRouterState()
View Source
    def __init__(self):
        self.agent_id_to_client: Dict[str, "Client"] = {}
        self.client_id_to_agent: Dict[str, LocalAgentState] = {}
        self.mephisto_socket: Optional["WebSocket"] = None
        self.agent_id_to_agent: Dict[str, LocalAgentState] = {}
        self.pending_agent_requests: Dict[str, bool] = {}
        self.received_agent_responses: Dict[str, Dict[str, Any]] = {}
        self.last_mephisto_ping: float = time.time()
View Source
def register_router_application(router: "MephistoRouter") -> "MephistoRouterState":
    Register a routing application with the global state,
    such that HTTP requests can access it and so that
    all websocket routers share the same state.

    Returns the global router state
    global mephisto_router_app, mephisto_router_state
    mephisto_router_app = router
    if mephisto_router_state is None:
        mephisto_router_state = MephistoRouterState()
    return mephisto_router_state

Register a routing application with the global state, such that HTTP requests can access it and so that all websocket routers share the same state.

Returns the global router state

#   class MephistoRouter(geventwebsocket.resource.WebSocketApplication):
View Source
class MephistoRouter(WebSocketApplication):
    Base implementation of a websocket server that handles
    all of the socket based IO for mephisto-core

    def __init__(self, *args, **kwargs):
        """Initialize with the gloabl state of MephistoRouters"""
        super().__init__(*args, **kwargs)
        self.mephisto_state = register_router_application(self)

    def _send_message(self, socket: "WebSocket", packet: Dict[str, Any]) -> None:
        """Send the given message through the given socket"""
        if not socket:
            # We should be passing a socket, even if it's closed...
            debug_log("No socket to send packet to", packet)

        if socket.closed:
            # Socket is already closed, noop

        packet["router_outgoing_timestamp"] = time.time()

    def _find_or_create_agent(self, agent_id: str) -> "LocalAgentState":
        """Get or create an agent state for the given id"""
        state = self.mephisto_state
        agent = state.agent_id_to_agent.get(agent_id)
        if agent is None:
            agent = LocalAgentState(agent_id)
            state.agent_id_to_agent[agent_id] = agent
        return agent

    def _handle_alive(self, client: "Client", alive_packet: Dict[str, Any]) -> None:
        On alive, find out who the sender is, and register
        them as correctly here.
        state = self.mephisto_state
        if alive_packet["subject_id"] == SYSTEM_CHANNEL_ID:
            state.mephisto_socket =
            agent_id = alive_packet["subject_id"]
            agent = self._find_or_create_agent(agent_id)
            agent.is_alive = True
            state.agent_id_to_client[agent_id] = client
            state.client_id_to_agent[client.mephisto_id] = agent

            # push out backlog
            while len(agent.unsent_messages) > 0:
                packet = agent.unsent_messages.pop(0)

    def _ensure_live_connection(self, agent: LocalAgentState) -> None:
        curr_status = agent.status
        last_ping = agent.last_ping
        if last_ping == 0:
            return  # Not a live task, nothing to check
        if curr_status not in [STATUS_ONBOARDING, STATUS_WAITING, STATUS_IN_TASK]:
            return  # not in a live state, no reason to check liveliness
        if time.time() - last_ping > FAILED_PING_TIME:
            agent.status = STATUS_DISCONNECTED

    def _handle_get_agent_status(self, agent_status_packet: Dict[str, Any]) -> None:
        On a get agent status request, check the status of all agents and
        respond to the core mephisto server with the current status of each.

        May return semi-stale information, but is non-blocking
        state = self.mephisto_state
        state.last_mephisto_ping = time.time()
        agent_statuses = {}
        for agent_id in state.agent_id_to_agent.keys():
            agent = self._find_or_create_agent(agent_id)
            if not agent.is_alive and agent.status != STATUS_DISCONNECTED:
            agent_statuses[agent_id] = state.agent_id_to_agent[agent_id].status
        packet = {
            "packet_type": PACKET_TYPE_RETURN_STATUSES,
            "subject_id": SYSTEM_CHANNEL_ID,
            "data": agent_statuses,
            "client_timestamp": agent_status_packet["server_timestamp"],
            "router_incoming_timestamp": agent_status_packet[

    def _handle_update_local_status(self, status_packet: Dict[str, Any]) -> None:
        """Update the local agent status given a status packet"""
        agent_id = status_packet["subject_id"]
        agent = self._find_or_create_agent(agent_id)
        if status_packet["data"].get("status") is not None:
            agent.status = status_packet["data"]["status"]

    def _handle_forward(self, packet: Dict[str, Any]) -> None:
        """Handle forwarding the given packet to the included subject_id"""
        if packet["subject_id"] == SYSTEM_CHANNEL_ID:
            debug_log("Sending message to Mephisto", packet)
            socket = self.mephisto_state.mephisto_socket
            debug_log("Sending message to agent", packet)
            agent_id = packet["subject_id"]
            agent = self._find_or_create_agent(agent_id)
            client = self.mephisto_state.agent_id_to_client.get(agent_id)
            if client is None:
            socket =
        self._send_message(socket, packet)

    def _followup_possible_disconnect(self, agent: LocalAgentState) -> None:
        """Check to see if the given agent is disconnected"""
        if agent.disconnect_time == 0:
            return  # Agent never disconnected, isn't live
        if time.time() - agent.disconnect_time > FAILED_RECONNECT_TIME:
            agent.status = STATUS_DISCONNECTED
            debug_log("Agent disconnected", agent)

    def _send_status_for_agent(self, agent_id: str) -> None:
        Send a packet that updates the client status for the given agent,
        pushing them the most recent local status.
        agent = self._find_or_create_agent(agent_id)
        packet = {
            "packet_type": PACKET_TYPE_UPDATE_STATUS,
            "subject_id": agent_id,
            "data": {
                "status": agent.status,

    def on_open(self) -> None:
        Initialize a new client connection, and give them a uuid to refer to
        current_client =
        debug_log("Some client connected!", current_client)
        current_client.mephisto_id = str(uuid4())

    def on_message(self, message: str) -> None:
        Determine the type of message, and then handle via the correct handler
        if message is None:

        state = self.mephisto_state
        current_client =
        client = current_client
        packet = json.loads(message)
        packet["router_incoming_timestamp"] = time.time()
        if packet["packet_type"] == PACKET_TYPE_REQUEST_STATUSES:
            debug_log("Mephisto requesting status")
        elif packet["packet_type"] == PACKET_TYPE_CLIENT_BOUND_LIVE_UPDATE:
        elif packet["packet_type"] == PACKET_TYPE_MEPHISTO_BOUND_LIVE_UPDATE:
            debug_log("Agent action: ", packet)
            self._send_message(self.mephisto_state.mephisto_socket, packet)
        elif packet["packet_type"] == PACKET_TYPE_ERROR:
        elif packet["packet_type"] == PACKET_TYPE_ALIVE:
            debug_log("Agent alive: ", packet)
            self._handle_alive(, packet)
        elif packet["packet_type"] == PACKET_TYPE_UPDATE_STATUS:
            debug_log("Update agent status", packet)
        elif packet["packet_type"] == PACKET_TYPE_AGENT_DETAILS:
            request_id = packet["data"].get("request_id")
            if request_id is None:
                request_id = packet["subject_id"]
            res_event = state.pending_agent_requests.get(request_id)
            if res_event is not None:
                state.received_agent_responses[request_id] = packet
                del state.pending_agent_requests[request_id]
        elif packet["packet_type"] == PACKET_TYPE_HEARTBEAT:
            packet["data"] = {"last_mephisto_ping": js_time(state.last_mephisto_ping)}
            agent_id = packet["subject_id"]
            agent = state.agent_id_to_agent.get(agent_id)
            if agent is not None:
                agent.is_alive = True
                packet["data"]["status"] = agent.status
                local_client = state.agent_id_to_client.get(agent.agent_id)
                if local_client != client and local_client is not None:
                    # Not communicating to the correct socket, update
                    debug_log("Updating client for ", agent)
                    state.agent_id_to_client[agent.agent_id] = client
                    state.client_id_to_agent[client.mephisto_id] = agent
            debug_log("Unknown message", packet)

    def on_close(self, reason: Any) -> None:
        """Mark a socket dead for a LocalAgentState, give time to reconnect"""
        client =
        debug_log("Some client disconnected!", client.mephisto_id)
        agent = self.mephisto_state.client_id_to_agent.get(client.mephisto_id)
        if agent is None:
            return  # Agent not being tracked
        agent.is_alive = False
        agent.disconnect_time = time.time()

    def make_agent_request(
        self, request_packet: Dict[str, Any]
    ) -> Optional[Dict[str, Any]]:
        """Make a request to the core Mephisto server, and then await the response"""
        request_id = request_packet["data"]["request_id"]

        self.mephisto_state.pending_agent_requests[request_id] = True
        self._send_message(self.mephisto_state.mephisto_socket, request_packet)
        start_time = time.time()
        res = None
        while time.time() - start_time < 30 and res is None:
            res = self.mephisto_state.received_agent_responses.get(request_id)
        if res is not None:
            del self.mephisto_state.received_agent_responses[request_id]
        return res

Base implementation of a websocket server that handles all of the socket based IO for mephisto-core

#   MephistoRouter(*args, **kwargs)
View Source
    def __init__(self, *args, **kwargs):
        """Initialize with the gloabl state of MephistoRouters"""
        super().__init__(*args, **kwargs)
        self.mephisto_state = register_router_application(self)

Initialize with the gloabl state of MephistoRouters

#   def on_open(self) -> None:
View Source
    def on_open(self) -> None:
        Initialize a new client connection, and give them a uuid to refer to
        current_client =
        debug_log("Some client connected!", current_client)
        current_client.mephisto_id = str(uuid4())

Initialize a new client connection, and give them a uuid to refer to

#   def on_message(self, message: str) -> None:
View Source
    def on_message(self, message: str) -> None:
        Determine the type of message, and then handle via the correct handler
        if message is None:

        state = self.mephisto_state
        current_client =
        client = current_client
        packet = json.loads(message)
        packet["router_incoming_timestamp"] = time.time()
        if packet["packet_type"] == PACKET_TYPE_REQUEST_STATUSES:
            debug_log("Mephisto requesting status")
        elif packet["packet_type"] == PACKET_TYPE_CLIENT_BOUND_LIVE_UPDATE:
        elif packet["packet_type"] == PACKET_TYPE_MEPHISTO_BOUND_LIVE_UPDATE:
            debug_log("Agent action: ", packet)
            self._send_message(self.mephisto_state.mephisto_socket, packet)
        elif packet["packet_type"] == PACKET_TYPE_ERROR:
        elif packet["packet_type"] == PACKET_TYPE_ALIVE:
            debug_log("Agent alive: ", packet)
            self._handle_alive(, packet)
        elif packet["packet_type"] == PACKET_TYPE_UPDATE_STATUS:
            debug_log("Update agent status", packet)
        elif packet["packet_type"] == PACKET_TYPE_AGENT_DETAILS:
            request_id = packet["data"].get("request_id")
            if request_id is None:
                request_id = packet["subject_id"]
            res_event = state.pending_agent_requests.get(request_id)
            if res_event is not None:
                state.received_agent_responses[request_id] = packet
                del state.pending_agent_requests[request_id]
        elif packet["packet_type"] == PACKET_TYPE_HEARTBEAT:
            packet["data"] = {"last_mephisto_ping": js_time(state.last_mephisto_ping)}
            agent_id = packet["subject_id"]
            agent = state.agent_id_to_agent.get(agent_id)
            if agent is not None:
                agent.is_alive = True
                packet["data"]["status"] = agent.status
                local_client = state.agent_id_to_client.get(agent.agent_id)
                if local_client != client and local_client is not None:
                    # Not communicating to the correct socket, update
                    debug_log("Updating client for ", agent)
                    state.agent_id_to_client[agent.agent_id] = client
                    state.client_id_to_agent[client.mephisto_id] = agent
            debug_log("Unknown message", packet)

Determine the type of message, and then handle via the correct handler

#   def on_close(self, reason: Any) -> None:
View Source
    def on_close(self, reason: Any) -> None:
        """Mark a socket dead for a LocalAgentState, give time to reconnect"""
        client =
        debug_log("Some client disconnected!", client.mephisto_id)
        agent = self.mephisto_state.client_id_to_agent.get(client.mephisto_id)
        if agent is None:
            return  # Agent not being tracked
        agent.is_alive = False
        agent.disconnect_time = time.time()

Mark a socket dead for a LocalAgentState, give time to reconnect

#   def make_agent_request( self, request_packet: Dict[str, Any] ) -> Union[Dict[str, Any], NoneType]:
View Source
    def make_agent_request(
        self, request_packet: Dict[str, Any]
    ) -> Optional[Dict[str, Any]]:
        """Make a request to the core Mephisto server, and then await the response"""
        request_id = request_packet["data"]["request_id"]

        self.mephisto_state.pending_agent_requests[request_id] = True
        self._send_message(self.mephisto_state.mephisto_socket, request_packet)
        start_time = time.time()
        res = None
        while time.time() - start_time < 30 and res is None:
            res = self.mephisto_state.received_agent_responses.get(request_id)
        if res is not None:
            del self.mephisto_state.received_agent_responses[request_id]
        return res

Make a request to the core Mephisto server, and then await the response

Inherited Members
@mephisto_router.route('/request_agent', methods=['POST'])
def request_agent():
View Source
@mephisto_router.route("/request_agent", methods=["POST"])
def request_agent():
    router_incoming_timestamp = time.time()
    data = request.get_json()
    request_id = str(uuid4())
    provider_data = data["provider_data"]
    assert mephisto_router_app is not None, "Must initialize router before this call"
    packet = {
        "packet_type": PACKET_TYPE_REGISTER_AGENT,
        "subject_id": request_id,
        "data": {
            "provider_data": provider_data,
            "request_id": request_id,
        "client_timestamp": data["client_timestamp"],
        "router_incoming_timestamp": router_incoming_timestamp,
    res = mephisto_router_app.make_agent_request(packet)
    if res is not None:
        return jsonify(res)
        # Timed out waiting for Mephisto to respond
        return None
@mephisto_router.route('/submit_onboarding', methods=['POST'])
def submit_onboarding():
View Source
@mephisto_router.route("/submit_onboarding", methods=["POST"])
def submit_onboarding():
    Parse onboarding as if it were a request sent from the
    active agent, rather than coming as a request from the router.
    router_incoming_timestamp = time.time()
    data = request.get_json()
    provider_data = data["provider_data"]
    agent_id = provider_data["USED_AGENT_ID"]
    del provider_data["USED_AGENT_ID"]
    provider_data["request_id"] = str(uuid4())

    if "onboarding_data" not in provider_data:
        provider_data["onboarding_data"] = {}

    # Construct and send onboarding submission packet
    packet = {
        "packet_type": PACKET_TYPE_SUBMIT_ONBOARDING,
        "subject_id": agent_id,
        "data": provider_data,
        "client_timestamp": data["client_timestamp"],
        "router_incoming_timestamp": router_incoming_timestamp,
    res = mephisto_router_app.make_agent_request(packet)
    if res is not None:
        return jsonify(res)
        # Timed out waiting for Mephisto to respond
        return None

Parse onboarding as if it were a request sent from the active agent, rather than coming as a request from the router.

@mephisto_router.route('/submit_task', methods=['POST'])
def submit_task():
View Source
@mephisto_router.route("/submit_task", methods=["POST"])
def submit_task():
    """Parse task submission as if it were an act"""
    router_incoming_timestamp = time.time()
    provider_data = request.get_json()
    filenames = []
    if provider_data is None:
        # Multipart form submit
        provider_data = request.form.to_dict()
        files = request.files.to_dict()
        if len(files) > 0:
            timestamp = int(time.time())
            rand = str(uuid4())[:8]
            for filename, filepoint in files.items():
                full_name = f"{timestamp}-{rand}-{secure_filename(filename)}"
      "/tmp/", full_name))
                filenames.append({"filename": full_name})

    extracted_data = provider_data.get("final_data")
    if extracted_data is None and "final_string_data" in provider_data:
        extracted_data = json.loads(provider_data.final_string_data)

    if len(filenames) > 0:
        extracted_data.files = filenames

    agent_id = provider_data["USED_AGENT_ID"]

    packet = {
        "packet_type": PACKET_TYPE_SUBMIT_UNIT,
        "subject_id": agent_id,
        "data": extracted_data,
        "client_timestamp": data["client_timestamp"],
        "router_incoming_timestamp": router_incoming_timestamp,
    return jsonify({"status": "Error log sent!"})

Parse task submission as if it were an act

@mephisto_router.route('/log_error', methods=['POST'])
def log_error():
View Source
@mephisto_router.route("/log_error", methods=["POST"])
def log_error():
    router_incoming_timestamp = time.time()
    data = request.get_json()
    packet = {
        "packet_type": PACKET_TYPE_ERROR,
        "subject_id": data["USED_AGENT_ID"],
        "data": data["error_data"],
        "client_timestamp": data["client_timestamp"],
        "router_incoming_timestamp": router_incoming_timestamp,
    return jsonify({"status": "Error log sent!"})
@mephisto_router.route('/is_alive', methods=['GET'])
def is_alive():
View Source
@mephisto_router.route("/is_alive", methods=["GET"])
def is_alive():
    return jsonify({"status": "Alive!"})
@mephisto_router.route('/get_timestamp', methods=['GET'])
def get_timestamp():
View Source
@mephisto_router.route("/get_timestamp", methods=["GET"])
def get_timestamp():
    return jsonify({"timestamp": time.time()})
@mephisto_router.route('/download_file/<filename>', methods=['GET'])
def download_file(filename):
View Source
@mephisto_router.route("/download_file/<filename>", methods=["GET"])
def download_file(filename):
        return send_from_directory("/tmp/", filename)
def show_index():
View Source
def show_index():
        return send_from_directory("static", "index.html")
def get_task_config(res):
View Source
def get_task_config(res):
    args = request.args
    mephisto_core_version = args.get("mephisto_core_version")
    if mephisto_core_version != CURR_MEPHISTO_CORE_PACKAGE_VERSION:
                "packet_type": PACKET_TYPE_ERROR,
                "subject_id": SYSTEM_CHANNEL_ID,
                "data": {
                    "error_type": "version-mismatch",
                    "text": (
                        "Package `mephisto-core` version mismatch. Expected "
                        f"version {CURR_MEPHISTO_CORE_PACKAGE_VERSION} but frontend is "
                        f"currently using {mephisto_core_version}. This may "
                        "cause unexpected errors, be sure to update your "
                        "`mephisto-core` dependency with `npm install "
                        f"mephisto-core@{CURR_MEPHISTO_CORE_PACKAGE_VERSION} --save`. "
                        "If this warning still persists or the version isn't found, "
                        "please open an issue at "
        return send_from_directory("static", "task_config.json")
def get_static(res):
View Source
def get_static(res):
        return send_from_directory("static", res)
def add_header(r):
View Source
def add_header(r):
    Add headers to prevent caching, as this server may be used in local
    development or with the same address but different contents
    r.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
    r.headers["Pragma"] = "no-cache"
    r.headers["Expires"] = "0"
    r.headers["Cache-Control"] = "public, max-age=0"
    return r

Add headers to prevent caching, as this server may be used in local development or with the same address but different contents