# -*- coding: utf-8; -*-
import datetime
import os
import sys
from unittest.mock import patch, MagicMock
from wuttaweb.views import upgrades as mod
from wuttjamaican.exc import ConfigurationError
from wuttaweb.progress import get_progress_session
from wuttaweb.testing import WebTestCase
class TestUpgradeView(WebTestCase):
    def make_view(self):
        return mod.UpgradeView(self.request)
    def test_includeme(self):
        self.pyramid_config.include("wuttaweb.views.upgrades")
    def test_configure_grid(self):
        model = self.app.model
        view = self.make_view()
        # sanity / coverage check
        grid = view.make_grid(model_class=model.Upgrade)
        view.configure_grid(grid)
    def test_grid_row_class(self):
        model = self.app.model
        enum = self.app.enum
        upgrade = model.Upgrade(description="test", status=enum.UpgradeStatus.PENDING)
        data = dict(upgrade)
        view = self.make_view()
        self.assertIsNone(view.grid_row_class(upgrade, data, 1))
        upgrade.status = enum.UpgradeStatus.EXECUTING
        self.assertEqual(
            view.grid_row_class(upgrade, data, 1), "has-background-warning"
        )
        upgrade.status = enum.UpgradeStatus.SUCCESS
        self.assertIsNone(view.grid_row_class(upgrade, data, 1))
        upgrade.status = enum.UpgradeStatus.FAILURE
        self.assertEqual(
            view.grid_row_class(upgrade, data, 1), "has-background-warning"
        )
    def test_configure_form(self):
        self.pyramid_config.add_route("upgrades.download", "/upgrades/{uuid}/download")
        model = self.app.model
        enum = self.app.enum
        user = model.User(username="barney")
        self.session.add(user)
        upgrade = model.Upgrade(
            description="test", created_by=user, status=enum.UpgradeStatus.PENDING
        )
        self.session.add(upgrade)
        self.session.commit()
        view = self.make_view()
        # some fields exist when viewing
        with patch.object(view, "viewing", new=True):
            form = view.make_form(model_class=model.Upgrade, model_instance=upgrade)
            self.assertIn("created", form)
            view.configure_form(form)
            self.assertIn("created", form)
        # but then are removed when creating
        with patch.object(view, "creating", new=True):
            form = view.make_form(model_class=model.Upgrade)
            self.assertIn("created", form)
            view.configure_form(form)
            self.assertNotIn("created", form)
        # test executed, stdout/stderr when viewing
        with patch.object(view, "viewing", new=True):
            # executed is *not* shown by default
            form = view.make_form(model_class=model.Upgrade, model_instance=upgrade)
            self.assertIn("executed", form)
            view.configure_form(form)
            self.assertNotIn("executed", form)
            self.assertNotIn("stdout_file", form)
            self.assertNotIn("stderr_file", form)
            # but it *is* shown if upgrade is executed
            upgrade.executed = datetime.datetime.now()
            upgrade.status = enum.UpgradeStatus.SUCCESS
            form = view.make_form(model_class=model.Upgrade, model_instance=upgrade)
            self.assertIn("executed", form)
            view.configure_form(form)
            self.assertIn("executed", form)
            self.assertIn("stdout_file", form)
            self.assertIn("stderr_file", form)
    def test_objectify(self):
        model = self.app.model
        enum = self.app.enum
        user = model.User(username="barney")
        self.session.add(user)
        self.session.commit()
        view = self.make_view()
        # user and status are auto-set when creating
        self.request.user = user
        self.request.method = "POST"
        self.request.POST = {"description": "new one"}
        with patch.object(view, "creating", new=True):
            form = view.make_model_form()
            self.assertTrue(form.validate())
            upgrade = view.objectify(form)
            self.assertEqual(upgrade.description, "new one")
            self.assertIs(upgrade.created_by, user)
            self.assertEqual(upgrade.status, enum.UpgradeStatus.PENDING)
    def test_download_path(self):
        model = self.app.model
        enum = self.app.enum
        appdir = self.mkdtemp()
        self.config.setdefault("wutta.appdir", appdir)
        self.assertEqual(self.app.get_appdir(), appdir)
        user = model.User(username="barney")
        upgrade = model.Upgrade(
            description="test", created_by=user, status=enum.UpgradeStatus.PENDING
        )
        self.session.add(upgrade)
        self.session.commit()
        view = self.make_view()
        uuid = str(upgrade.uuid)
        # no filename
        path = view.download_path(upgrade, None)
        self.assertIsNone(path)
        # with filename
        path = view.download_path(upgrade, "foo.txt")
        self.assertEqual(
            path,
            os.path.join(appdir, "data", "upgrades", uuid[:2], uuid[2:], "foo.txt"),
        )
    def test_get_upgrade_filepath(self):
        model = self.app.model
        enum = self.app.enum
        appdir = self.mkdtemp()
        self.config.setdefault("wutta.appdir", appdir)
        self.assertEqual(self.app.get_appdir(), appdir)
        user = model.User(username="barney")
        upgrade = model.Upgrade(
            description="test", created_by=user, status=enum.UpgradeStatus.PENDING
        )
        self.session.add(upgrade)
        self.session.commit()
        view = self.make_view()
        uuid = str(upgrade.uuid)
        # no filename
        path = view.get_upgrade_filepath(upgrade)
        self.assertEqual(
            path, os.path.join(appdir, "data", "upgrades", uuid[:2], uuid[2:])
        )
        # with filename
        path = view.get_upgrade_filepath(upgrade, "foo.txt")
        self.assertEqual(
            path,
            os.path.join(appdir, "data", "upgrades", uuid[:2], uuid[2:], "foo.txt"),
        )
    def test_delete_instance(self):
        model = self.app.model
        enum = self.app.enum
        appdir = self.mkdtemp()
        self.config.setdefault("wutta.appdir", appdir)
        self.assertEqual(self.app.get_appdir(), appdir)
        user = model.User(username="barney")
        upgrade = model.Upgrade(
            description="test", created_by=user, status=enum.UpgradeStatus.PENDING
        )
        self.session.add(upgrade)
        self.session.commit()
        view = self.make_view()
        # mock stdout/stderr files
        upgrade_dir = view.get_upgrade_filepath(upgrade)
        stdout = view.get_upgrade_filepath(upgrade, "stdout.log")
        with open(stdout, "w") as f:
            f.write("stdout")
        stderr = view.get_upgrade_filepath(upgrade, "stderr.log")
        with open(stderr, "w") as f:
            f.write("stderr")
        # both upgrade and files are deleted
        self.assertTrue(os.path.exists(upgrade_dir))
        self.assertTrue(os.path.exists(stdout))
        self.assertTrue(os.path.exists(stderr))
        self.assertEqual(self.session.query(model.Upgrade).count(), 1)
        with patch.object(view, "Session", return_value=self.session):
            view.delete_instance(upgrade)
        self.assertFalse(os.path.exists(upgrade_dir))
        self.assertFalse(os.path.exists(stdout))
        self.assertFalse(os.path.exists(stderr))
        self.assertEqual(self.session.query(model.Upgrade).count(), 0)
    def test_execute_instance(self):
        model = self.app.model
        enum = self.app.enum
        appdir = self.mkdtemp()
        self.config.setdefault("wutta.appdir", appdir)
        self.assertEqual(self.app.get_appdir(), appdir)
        user = model.User(username="barney")
        upgrade = model.Upgrade(
            description="test", created_by=user, status=enum.UpgradeStatus.PENDING
        )
        self.session.add(upgrade)
        self.session.commit()
        view = self.make_view()
        self.request.user = user
        python = sys.executable
        # script not yet confiugred
        self.assertRaises(ConfigurationError, view.execute_instance, upgrade, user)
        # script w/ success
        goodpy = self.write_file(
            "good.py",
            """
import sys
sys.stdout.write('hello from good.py')
sys.exit(0)
""",
        )
        self.app.save_setting(
            self.session, "wutta.upgrades.command", f"{python} {goodpy}"
        )
        self.assertIsNone(upgrade.executed)
        self.assertIsNone(upgrade.executed_by)
        self.assertEqual(upgrade.status, enum.UpgradeStatus.PENDING)
        with patch.object(view, "Session", return_value=self.session):
            with patch.object(self.config, "usedb", new=True):
                view.execute_instance(upgrade, user)
        self.assertIsNotNone(upgrade.executed)
        self.assertIs(upgrade.executed_by, user)
        self.assertEqual(upgrade.status, enum.UpgradeStatus.SUCCESS)
        with open(view.get_upgrade_filepath(upgrade, "stdout.log")) as f:
            self.assertEqual(f.read(), "hello from good.py")
        with open(view.get_upgrade_filepath(upgrade, "stderr.log")) as f:
            self.assertEqual(f.read(), "")
        # need a new record for next test
        upgrade = model.Upgrade(
            description="test", created_by=user, status=enum.UpgradeStatus.PENDING
        )
        self.session.add(upgrade)
        self.session.commit()
        # script w/ failure
        badpy = self.write_file(
            "bad.py",
            """
import sys
sys.stderr.write('hello from bad.py')
sys.exit(42)
""",
        )
        self.app.save_setting(
            self.session, "wutta.upgrades.command", f"{python} {badpy}"
        )
        self.assertIsNone(upgrade.executed)
        self.assertIsNone(upgrade.executed_by)
        self.assertEqual(upgrade.status, enum.UpgradeStatus.PENDING)
        with patch.object(view, "Session", return_value=self.session):
            with patch.object(self.config, "usedb", new=True):
                view.execute_instance(upgrade, user)
        self.assertIsNotNone(upgrade.executed)
        self.assertIs(upgrade.executed_by, user)
        self.assertEqual(upgrade.status, enum.UpgradeStatus.FAILURE)
        with open(view.get_upgrade_filepath(upgrade, "stdout.log")) as f:
            self.assertEqual(f.read(), "")
        with open(view.get_upgrade_filepath(upgrade, "stderr.log")) as f:
            self.assertEqual(f.read(), "hello from bad.py")
    def test_execute_progress(self):
        model = self.app.model
        enum = self.app.enum
        view = self.make_view()
        user = model.User(username="barney")
        self.session.add(user)
        upgrade = model.Upgrade(
            description="test", created_by=user, status=enum.UpgradeStatus.PENDING
        )
        self.session.add(upgrade)
        self.session.commit()
        stdout = self.write_file("stdout.log", "hello 001\n")
        self.request.matchdict = {"uuid": upgrade.uuid}
        with patch.multiple(
            mod.UpgradeView,
            Session=MagicMock(return_value=self.session),
            get_upgrade_filepath=MagicMock(return_value=stdout),
        ):
            # nb. this is used to identify progress tracker
            self.request.session.id = "mockid#1"
            # first call should get the full contents
            context = view.execute_progress()
            self.assertFalse(context.get("complete"))
            self.assertFalse(context.get("error"))
            # nb. newline is converted to 
            self.assertEqual(context["stdout"], "hello 001
")
            # next call should get any new contents
            with open(stdout, "a") as f:
                f.write("hello 002\n")
            context = view.execute_progress()
            self.assertFalse(context.get("complete"))
            self.assertFalse(context.get("error"))
            self.assertEqual(context["stdout"], "hello 002
")
            # nb. switch to a different progress tracker
            self.request.session.id = "mockid#2"
            # first call should get the full contents
            context = view.execute_progress()
            self.assertFalse(context.get("complete"))
            self.assertFalse(context.get("error"))
            self.assertEqual(context["stdout"], "hello 001
hello 002
")
            # mark progress complete
            session = get_progress_session(self.request, "upgrades.execute")
            session.load()
            session["complete"] = True
            session["success_msg"] = "yay!"
            session.save()
            # next call should reflect that
            self.assertEqual(self.request.session.pop_flash(), [])
            context = view.execute_progress()
            self.assertTrue(context.get("complete"))
            self.assertFalse(context.get("error"))
            # nb. this is missing b/c we already got all contents
            self.assertNotIn("stdout", context)
            self.assertEqual(self.request.session.pop_flash(), ["yay!"])
            # nb. switch to a different progress tracker
            self.request.session.id = "mockid#3"
            # first call should get the full contents
            context = view.execute_progress()
            self.assertFalse(context.get("complete"))
            self.assertFalse(context.get("error"))
            self.assertEqual(context["stdout"], "hello 001
hello 002
")
            # mark progress error
            session = get_progress_session(self.request, "upgrades.execute")
            session.load()
            session["error"] = True
            session["error_msg"] = "omg!"
            session.save()
            # next call should reflect that
            self.assertEqual(self.request.session.pop_flash("error"), [])
            context = view.execute_progress()
            self.assertFalse(context.get("complete"))
            self.assertTrue(context.get("error"))
            # nb. this is missing b/c we already got all contents
            self.assertNotIn("stdout", context)
            self.assertEqual(self.request.session.pop_flash("error"), ["omg!"])
    def test_configure_get_simple_settings(self):
        # sanity/coverage check
        view = self.make_view()
        simple = view.configure_get_simple_settings()