mephisto.client.review.review_server

View Source
#!/usr/bin/env python3

# Copyright (c) Meta Platforms and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.

from flask import Flask, Blueprint, send_file, jsonify, request  # type: ignore
from datetime import datetime
import os
import atexit
import signal
import csv
import sys
import time
import threading
import urllib.parse
import collections
import math


def run(
    build_dir,
    port,
    output,
    csv_headers,
    json=False,
    database_task_name=None,
    all_data=False,
    debug=False,
    assets_dir=None,
):
    global index_file, app
    global ready_for_next, current_data, finished, index_file
    global counter
    global all_data_list, datalist_update_time

    RESULTS_PER_PAGE_DEFAULT = 10
    TIMEOUT_IN_SECONDS = 300
    USE_TIMEOUT = True
    MODE = "ALL" if all_data else "OBO"
    RESULT_SUCCESS = "SUCCESS"
    RESULT_ERROR = "ERROR"

    DataQueryResult = collections.namedtuple(
        "DataQueryResult", ["data_list", "total_pages"]
    )

    if not debug or output == "":
        # disable noisy logging of flask, https://stackoverflow.com/a/18379764
        import logging

        flask_log = logging.getLogger("werkzeug")
        flask_log.disabled = True
        flask_cli = sys.modules["flask.cli"]
        flask_cli.show_server_banner = lambda *x: None

    app = Flask(
        __name__,
        root_path=os.getcwd(),
        static_url_path="/static",
        static_folder=build_dir + "/static",
    )

    if assets_dir:
        assets_blueprint = Blueprint(
            "additional_assets",
            __name__,
            static_url_path="/assets",
            static_folder=assets_dir,
        )
        app.register_blueprint(assets_blueprint)

    def json_reader(f):
        import json

        for jsonline in f:
            yield json.loads(jsonline)

    def mephistoDBReader():
        from mephisto.abstractions.databases.local_database import LocalMephistoDB
        from mephisto.tools.data_browser import DataBrowser as MephistoDataBrowser

        db = LocalMephistoDB()
        mephisto_data_browser = MephistoDataBrowser(db=db)

        units = mephisto_data_browser.get_units_for_task_name(database_task_name)
        for unit in units:
            yield mephisto_data_browser.get_data_from_unit(unit)

    def consume_data():
        """For use in "one-by-one" or default mode. Runs on a seperate thread to consume mephisto review data line by line and update global variables to temporarily store this data"""
        global ready_for_next, current_data, finished, counter

        if database_task_name is not None:
            data_source = mephistoDBReader()
        elif json:
            data_source = json_reader(iter(sys.stdin.readline, ""))
        else:
            data_source = csv.reader(iter(sys.stdin.readline, ""))
            if csv_headers:
                next(data_source)

        finished = False
        counter = 0
        for row in data_source:
            ready_for_next = threading.Event()
            current_data = row
            counter += 1
            ready_for_next.wait()
        finished = True

    def consume_all_data(page, results_per_page=RESULTS_PER_PAGE_DEFAULT, filters=None):
        """
        For use in "all" mode.
        Returns:
            A DataQueryResult type namedtuple consisting of a filtered list of all data or a page of all data
            as well as the total pages of data available.
            The list of data is stored in DataQueryResult.data_list.
            The total number of pages is stored in DataQueryResult.total_pages.
        Params:
            page: 1 indexed page number integer
            results_per_page: maximum number of results per page
            filters: keywords or sentences to filter data for. must be a list
        """
        global all_data_list, datalist_update_time
        paginated = type(page) is int
        if paginated:
            assert page > 0, "Page number should be a positive 1 indexed integer."
            assert (
                type(results_per_page) is int and results_per_page > 0
            ), "results_per_page should be a positive integer"

        first_index = (page - 1) * results_per_page if paginated else 0

        if database_task_name is not None:
            # If differnce in time since the last update to the data list is over 5 minutes, update list again
            # This can only be done for usage with mephistoDB as standard input is exhausted when originally creating the list
            now = datetime.now()
            if (
                USE_TIMEOUT
                and (now - datalist_update_time).total_seconds() > TIMEOUT_IN_SECONDS
            ):
                refresh_all_list_data()

        filtered_data_list = all_data_list
        if type(filters) is list:
            filtered_data_list = [
                item
                for item in all_data_list
                if all(word.lower() in str(item["data"]).lower() for word in filters)
            ]
        list_len = len(filtered_data_list)
        total_pages = math.ceil(list_len / results_per_page) if paginated else 1

        if paginated:
            if first_index > list_len - 1:
                filtered_data_list = []
            else:
                results_per_page = (
                    min(first_index + results_per_page, list_len) - first_index
                )
                if results_per_page < 0:
                    filtered_data_list = []
                else:
                    filtered_data_list = filtered_data_list[
                        first_index : first_index + results_per_page
                    ]

        return DataQueryResult(filtered_data_list, total_pages)

    def refresh_all_list_data():
        """For use in "all" mode. Refreshes all data list when the data source is mephistoDB, allowing for new entries in the db to be included in the review"""
        global all_data_list, datalist_update_time
        data_source = mephistoDBReader()
        all_data_list = []
        count = 0
        for row in data_source:
            all_data_list.append({"data": row, "id": count})
            count += 1
        datalist_update_time = datetime.now()

    @app.route("/data_for_current_task")
    def data():
        """
        *** DEPRECATED ***
        For use in "one-by-one" or default mode.
        Based on global variables set by the consume_data method returns the piece of data currently being reviewed.
        If there is no more data being reviewed the app is shut down.
        """
        global current_data, finished
        if all_data:
            return jsonify(
                {
                    "error": 'mephisto review is in all mode, please get data by sending a GET request to "/data/:id"'
                }
            )
        if finished:
            func = request.environ.get("werkzeug.server.shutdown")
            if func is None:
                raise RuntimeError("Not running with the Werkzeug Server")
            func()

        return jsonify(
            {"finished": finished, "data": current_data if not finished else None}
        )

    @app.route("/submit_current_task", methods=["GET", "POST"])
    def next_task():
        """
        *** DEPRECATED ***
        For use in "one-by-one" or default mode.
        This route allows users to submit reviews for tasks.
        All review data must be contained within the body of the request.
        The review data is written directly to the output file specified in mephisto review.
        """
        global current_data, ready_for_next, finished, counter
        if all_data:
            return jsonify(
                {
                    "error": 'mephisto review is in all mode, please submit reviews by sending a POST request to "/data/:id"'
                }
            )
        result = (
            request.get_json(force=True)
            if request.method == "POST"
            else request.args.get("result")
        )

        if output == "":
            print("{}".format(result))
            sys.stdout.flush()
        else:
            with open(output, "a+") as f:
                f.write("{}\n".format(result))

        ready_for_next.set()
        time.sleep(0)
        return jsonify({"finished": finished, "counter": counter})

    @app.route("/data/<id>", methods=["GET", "POST"])
    def task_data_by_id(id):
        """
        This route takes a parameter of the id of an item being reviewed.
        This id represents the index (beginning at 0) of the item in the list of items being reviewed.
        If this route receives a GET request the data of the item at that position in the list of review items is returned.
        If this route receives a POST request a review is written for the item at the given index based on the body of JSON in the request.
        Accordingly for POST requests all review data must be in the JSON body of the request.
        The JSON for the review is written directly into the output file specified for mephisto review.
        """
        global finished, current_data, ready_for_next, counter, all_data_list
        id = int(id) if type(id) is int or (type(id) is str and id.isdigit()) else None
        if request.method == "GET":
            if all_data:
                list_len = len(all_data_list)
                if id is None or id < 0 or id >= list_len:
                    return jsonify(
                        {"error": f"Data with ID: {id} does not exist", "mode": MODE}
                    )
                return jsonify({"data": all_data_list[id], "mode": MODE})
            else:
                if id is None or id != counter - 1:
                    return jsonify(
                        {
                            "error": f"Please review the data point with ID: {counter - 1}",
                            "mode": MODE,
                        }
                    )
                data = {
                    "data": current_data if not finished else None,
                    "id": counter - 1,
                }
                if finished:
                    func = request.environ.get("werkzeug.server.shutdown")
                    if func is None:
                        raise RuntimeError("Not running with the Werkzeug Server")
                    func()
                return jsonify(
                    {
                        "finished": finished,
                        "data": data,
                        "mode": MODE,
                    }
                )
        else:
            review = request.get_json(force=True)
            if output == "":
                print("ID: {}, REVIEW: {}".format(id, review))
                sys.stdout.flush()
            else:
                with open(output, "a+") as f:
                    f.write("ID: {}, REVIEW: {}\n".format(id, review))
            if not all_data:
                ready_for_next.set()
                time.sleep(0)
            return jsonify(
                {"result": RESULT_SUCCESS, "finished": finished, "mode": MODE}
            )

    @app.route("/data")
    def all_task_data():
        """
        This route returns the list of all data being reviewed if the app is in "all" mode.
        Otherwise this route returns the id and data of the item currently being reviewed in "one-by-one" or standard mode.
        The id in the response refers to the index (beginning at 0) of the item being reviewed in the list of all items being reviewed.
        Params:
            page: 1 indexed page number for results
            results_per_page: number of results to show per page, must be positive integer
            filters: string representing keywords or senteces results must contain.
                Filters must be comma separated and spaced must be denoted by '%20'
        """
        global counter, current_data, all_data_list, finished
        if all_data:
            page = request.args.get("page", default=None, type=int)
            results_per_page = request.args.get(
                "results_per_page", default=RESULTS_PER_PAGE_DEFAULT, type=int
            )
            filters_str = request.args.get("filters", default=None, type=str)
            filters = None
            if type(filters_str) is str:
                filters_str = urllib.parse.unquote(filters_str)
                filters = filters_str.split(",")
                filters = [filt.strip() for filt in filters]
            try:
                data = consume_all_data(page, results_per_page, filters)
                return jsonify(
                    {
                        "data": data.data_list,
                        "mode": MODE,
                        "total_pages": data.total_pages,
                    }
                )
            except AssertionError as ae:
                print(f"Error: {ae.args[0]}")
                sys.stdout.flush()
                return jsonify({"error": ae.args[0], "mode": MODE})
        else:
            data = {"data": current_data if not finished else None, "id": counter - 1}
            return jsonify({"data": data, "mode": MODE, "finished": finished})

    @app.route("/", defaults={"id": None})
    @app.route("/<id>")
    def index(id):
        global index_file
        return send_file(build_dir + "/index.html")

    @app.after_request
    def after_request(response):
        response.headers.add("Access-Control-Allow-Origin", "*")
        response.headers.add(
            "Access-Control-Allow-Headers", "Content-Type,Authorization"
        )
        response.headers.add(
            "Access-Control-Allow-Methods", "GET,PUT,POST,DELETE,OPTIONS"
        )
        response.headers.add("Cache-Control", "no-store")
        return response

    if all_data:
        # if reading all data points, all data is loaded into memory before the app starts
        if database_task_name is not None:
            data_source = mephistoDBReader()
        elif json:
            data_source = json_reader(iter(sys.stdin.readline, ""))
        else:
            data_source = csv.reader(iter(sys.stdin.readline, ""))
            if csv_headers:
                next(data_source)

        all_data_list = []
        count = 0
        for row in data_source:
            all_data_list.append({"data": row, "id": count})
            count += 1
        datalist_update_time = datetime.now()
        finished = False
    else:
        thread = threading.Thread(target=consume_data, name="review-server-thread")
        thread.start()
    print("Running on http://127.0.0.1:{}/ (Press CTRL+C to quit)".format(port))
    sys.stdout.flush()
    app.run(debug=False, port=port)
#   def run( build_dir, port, output, csv_headers, json=False, database_task_name=None, all_data=False, debug=False, assets_dir=None ):
View Source
def run(
    build_dir,
    port,
    output,
    csv_headers,
    json=False,
    database_task_name=None,
    all_data=False,
    debug=False,
    assets_dir=None,
):
    global index_file, app
    global ready_for_next, current_data, finished, index_file
    global counter
    global all_data_list, datalist_update_time

    RESULTS_PER_PAGE_DEFAULT = 10
    TIMEOUT_IN_SECONDS = 300
    USE_TIMEOUT = True
    MODE = "ALL" if all_data else "OBO"
    RESULT_SUCCESS = "SUCCESS"
    RESULT_ERROR = "ERROR"

    DataQueryResult = collections.namedtuple(
        "DataQueryResult", ["data_list", "total_pages"]
    )

    if not debug or output == "":
        # disable noisy logging of flask, https://stackoverflow.com/a/18379764
        import logging

        flask_log = logging.getLogger("werkzeug")
        flask_log.disabled = True
        flask_cli = sys.modules["flask.cli"]
        flask_cli.show_server_banner = lambda *x: None

    app = Flask(
        __name__,
        root_path=os.getcwd(),
        static_url_path="/static",
        static_folder=build_dir + "/static",
    )

    if assets_dir:
        assets_blueprint = Blueprint(
            "additional_assets",
            __name__,
            static_url_path="/assets",
            static_folder=assets_dir,
        )
        app.register_blueprint(assets_blueprint)

    def json_reader(f):
        import json

        for jsonline in f:
            yield json.loads(jsonline)

    def mephistoDBReader():
        from mephisto.abstractions.databases.local_database import LocalMephistoDB
        from mephisto.tools.data_browser import DataBrowser as MephistoDataBrowser

        db = LocalMephistoDB()
        mephisto_data_browser = MephistoDataBrowser(db=db)

        units = mephisto_data_browser.get_units_for_task_name(database_task_name)
        for unit in units:
            yield mephisto_data_browser.get_data_from_unit(unit)

    def consume_data():
        """For use in "one-by-one" or default mode. Runs on a seperate thread to consume mephisto review data line by line and update global variables to temporarily store this data"""
        global ready_for_next, current_data, finished, counter

        if database_task_name is not None:
            data_source = mephistoDBReader()
        elif json:
            data_source = json_reader(iter(sys.stdin.readline, ""))
        else:
            data_source = csv.reader(iter(sys.stdin.readline, ""))
            if csv_headers:
                next(data_source)

        finished = False
        counter = 0
        for row in data_source:
            ready_for_next = threading.Event()
            current_data = row
            counter += 1
            ready_for_next.wait()
        finished = True

    def consume_all_data(page, results_per_page=RESULTS_PER_PAGE_DEFAULT, filters=None):
        """
        For use in "all" mode.
        Returns:
            A DataQueryResult type namedtuple consisting of a filtered list of all data or a page of all data
            as well as the total pages of data available.
            The list of data is stored in DataQueryResult.data_list.
            The total number of pages is stored in DataQueryResult.total_pages.
        Params:
            page: 1 indexed page number integer
            results_per_page: maximum number of results per page
            filters: keywords or sentences to filter data for. must be a list
        """
        global all_data_list, datalist_update_time
        paginated = type(page) is int
        if paginated:
            assert page > 0, "Page number should be a positive 1 indexed integer."
            assert (
                type(results_per_page) is int and results_per_page > 0
            ), "results_per_page should be a positive integer"

        first_index = (page - 1) * results_per_page if paginated else 0

        if database_task_name is not None:
            # If differnce in time since the last update to the data list is over 5 minutes, update list again
            # This can only be done for usage with mephistoDB as standard input is exhausted when originally creating the list
            now = datetime.now()
            if (
                USE_TIMEOUT
                and (now - datalist_update_time).total_seconds() > TIMEOUT_IN_SECONDS
            ):
                refresh_all_list_data()

        filtered_data_list = all_data_list
        if type(filters) is list:
            filtered_data_list = [
                item
                for item in all_data_list
                if all(word.lower() in str(item["data"]).lower() for word in filters)
            ]
        list_len = len(filtered_data_list)
        total_pages = math.ceil(list_len / results_per_page) if paginated else 1

        if paginated:
            if first_index > list_len - 1:
                filtered_data_list = []
            else:
                results_per_page = (
                    min(first_index + results_per_page, list_len) - first_index
                )
                if results_per_page < 0:
                    filtered_data_list = []
                else:
                    filtered_data_list = filtered_data_list[
                        first_index : first_index + results_per_page
                    ]

        return DataQueryResult(filtered_data_list, total_pages)

    def refresh_all_list_data():
        """For use in "all" mode. Refreshes all data list when the data source is mephistoDB, allowing for new entries in the db to be included in the review"""
        global all_data_list, datalist_update_time
        data_source = mephistoDBReader()
        all_data_list = []
        count = 0
        for row in data_source:
            all_data_list.append({"data": row, "id": count})
            count += 1
        datalist_update_time = datetime.now()

    @app.route("/data_for_current_task")
    def data():
        """
        *** DEPRECATED ***
        For use in "one-by-one" or default mode.
        Based on global variables set by the consume_data method returns the piece of data currently being reviewed.
        If there is no more data being reviewed the app is shut down.
        """
        global current_data, finished
        if all_data:
            return jsonify(
                {
                    "error": 'mephisto review is in all mode, please get data by sending a GET request to "/data/:id"'
                }
            )
        if finished:
            func = request.environ.get("werkzeug.server.shutdown")
            if func is None:
                raise RuntimeError("Not running with the Werkzeug Server")
            func()

        return jsonify(
            {"finished": finished, "data": current_data if not finished else None}
        )

    @app.route("/submit_current_task", methods=["GET", "POST"])
    def next_task():
        """
        *** DEPRECATED ***
        For use in "one-by-one" or default mode.
        This route allows users to submit reviews for tasks.
        All review data must be contained within the body of the request.
        The review data is written directly to the output file specified in mephisto review.
        """
        global current_data, ready_for_next, finished, counter
        if all_data:
            return jsonify(
                {
                    "error": 'mephisto review is in all mode, please submit reviews by sending a POST request to "/data/:id"'
                }
            )
        result = (
            request.get_json(force=True)
            if request.method == "POST"
            else request.args.get("result")
        )

        if output == "":
            print("{}".format(result))
            sys.stdout.flush()
        else:
            with open(output, "a+") as f:
                f.write("{}\n".format(result))

        ready_for_next.set()
        time.sleep(0)
        return jsonify({"finished": finished, "counter": counter})

    @app.route("/data/<id>", methods=["GET", "POST"])
    def task_data_by_id(id):
        """
        This route takes a parameter of the id of an item being reviewed.
        This id represents the index (beginning at 0) of the item in the list of items being reviewed.
        If this route receives a GET request the data of the item at that position in the list of review items is returned.
        If this route receives a POST request a review is written for the item at the given index based on the body of JSON in the request.
        Accordingly for POST requests all review data must be in the JSON body of the request.
        The JSON for the review is written directly into the output file specified for mephisto review.
        """
        global finished, current_data, ready_for_next, counter, all_data_list
        id = int(id) if type(id) is int or (type(id) is str and id.isdigit()) else None
        if request.method == "GET":
            if all_data:
                list_len = len(all_data_list)
                if id is None or id < 0 or id >= list_len:
                    return jsonify(
                        {"error": f"Data with ID: {id} does not exist", "mode": MODE}
                    )
                return jsonify({"data": all_data_list[id], "mode": MODE})
            else:
                if id is None or id != counter - 1:
                    return jsonify(
                        {
                            "error": f"Please review the data point with ID: {counter - 1}",
                            "mode": MODE,
                        }
                    )
                data = {
                    "data": current_data if not finished else None,
                    "id": counter - 1,
                }
                if finished:
                    func = request.environ.get("werkzeug.server.shutdown")
                    if func is None:
                        raise RuntimeError("Not running with the Werkzeug Server")
                    func()
                return jsonify(
                    {
                        "finished": finished,
                        "data": data,
                        "mode": MODE,
                    }
                )
        else:
            review = request.get_json(force=True)
            if output == "":
                print("ID: {}, REVIEW: {}".format(id, review))
                sys.stdout.flush()
            else:
                with open(output, "a+") as f:
                    f.write("ID: {}, REVIEW: {}\n".format(id, review))
            if not all_data:
                ready_for_next.set()
                time.sleep(0)
            return jsonify(
                {"result": RESULT_SUCCESS, "finished": finished, "mode": MODE}
            )

    @app.route("/data")
    def all_task_data():
        """
        This route returns the list of all data being reviewed if the app is in "all" mode.
        Otherwise this route returns the id and data of the item currently being reviewed in "one-by-one" or standard mode.
        The id in the response refers to the index (beginning at 0) of the item being reviewed in the list of all items being reviewed.
        Params:
            page: 1 indexed page number for results
            results_per_page: number of results to show per page, must be positive integer
            filters: string representing keywords or senteces results must contain.
                Filters must be comma separated and spaced must be denoted by '%20'
        """
        global counter, current_data, all_data_list, finished
        if all_data:
            page = request.args.get("page", default=None, type=int)
            results_per_page = request.args.get(
                "results_per_page", default=RESULTS_PER_PAGE_DEFAULT, type=int
            )
            filters_str = request.args.get("filters", default=None, type=str)
            filters = None
            if type(filters_str) is str:
                filters_str = urllib.parse.unquote(filters_str)
                filters = filters_str.split(",")
                filters = [filt.strip() for filt in filters]
            try:
                data = consume_all_data(page, results_per_page, filters)
                return jsonify(
                    {
                        "data": data.data_list,
                        "mode": MODE,
                        "total_pages": data.total_pages,
                    }
                )
            except AssertionError as ae:
                print(f"Error: {ae.args[0]}")
                sys.stdout.flush()
                return jsonify({"error": ae.args[0], "mode": MODE})
        else:
            data = {"data": current_data if not finished else None, "id": counter - 1}
            return jsonify({"data": data, "mode": MODE, "finished": finished})

    @app.route("/", defaults={"id": None})
    @app.route("/<id>")
    def index(id):
        global index_file
        return send_file(build_dir + "/index.html")

    @app.after_request
    def after_request(response):
        response.headers.add("Access-Control-Allow-Origin", "*")
        response.headers.add(
            "Access-Control-Allow-Headers", "Content-Type,Authorization"
        )
        response.headers.add(
            "Access-Control-Allow-Methods", "GET,PUT,POST,DELETE,OPTIONS"
        )
        response.headers.add("Cache-Control", "no-store")
        return response

    if all_data:
        # if reading all data points, all data is loaded into memory before the app starts
        if database_task_name is not None:
            data_source = mephistoDBReader()
        elif json:
            data_source = json_reader(iter(sys.stdin.readline, ""))
        else:
            data_source = csv.reader(iter(sys.stdin.readline, ""))
            if csv_headers:
                next(data_source)

        all_data_list = []
        count = 0
        for row in data_source:
            all_data_list.append({"data": row, "id": count})
            count += 1
        datalist_update_time = datetime.now()
        finished = False
    else:
        thread = threading.Thread(target=consume_data, name="review-server-thread")
        thread.start()
    print("Running on http://127.0.0.1:{}/ (Press CTRL+C to quit)".format(port))
    sys.stdout.flush()
    app.run(debug=False, port=port)