Make separate tasks for collect vs. transmit of upgrade progress data

This commit is contained in:
Lance Edgar 2022-08-20 22:40:16 -05:00
parent 0a113611e8
commit 2ca93a07e9
2 changed files with 72 additions and 31 deletions

View file

@ -116,7 +116,9 @@
<div class="form"> <div class="form">
<${form.component} <${form.component}
% if expose_websockets and master.has_perm('execute'): % if expose_websockets and master.has_perm('execute'):
% if instance_executable:
@execute-upgrade-click="executeUpgrade" @execute-upgrade-click="executeUpgrade"
% endif
:upgrade-executing="upgradeExecuting" :upgrade-executing="upgradeExecuting"
@declare-failure-click="declareFailureClick" @declare-failure-click="declareFailureClick"
:declare-failure-submitting="declareFailureSubmitting" :declare-failure-submitting="declareFailureSubmitting"

View file

@ -40,6 +40,8 @@ class UpgradeExecutionProgressWS(WebsocketView):
'upgrades': {}, 'upgrades': {},
} }
new_messages = asyncio.Queue()
async def __call__(self, scope, receive, send): async def __call__(self, scope, receive, send):
app = self.get_rattail_app() app = self.get_rattail_app()
@ -116,10 +118,34 @@ class UpgradeExecutionProgressWS(WebsocketView):
progress_session = get_basic_session(self.rattail_config, progress_session = get_basic_session(self.rattail_config,
id=progress_session_id) id=progress_session_id)
# start collecting status, textout messages
asyncio.create_task(self.collect_status(uuid, progress_session))
asyncio.create_task(self.collect_textout(uuid))
upgrade_state = self.global_state['upgrades'][uuid] upgrade_state = self.global_state['upgrades'][uuid]
clients = upgrade_state['clients'] clients = upgrade_state['clients']
while clients: while clients:
msg = await self.new_messages.get()
# send message to all clients
for client in clients.values():
await client['send']({
'type': 'websocket.send',
'subtype': 'upgrades.execute_progress',
'text': json.dumps(msg)})
await asyncio.sleep(0.1)
# no more clients, no more reason to track this upgrade
del self.global_state['upgrades'][uuid]
async def collect_status(self, uuid, progress_session):
upgrade_state = self.global_state['upgrades'][uuid]
clients = upgrade_state['clients']
while True:
# load latest progress data # load latest progress data
progress_session.load() progress_session.load()
@ -134,45 +160,58 @@ class UpgradeExecutionProgressWS(WebsocketView):
user_session.flash(msg) user_session.flash(msg)
user_session.persist() user_session.persist()
# tell clients progress is complete # push "complete" message to queue
for client in clients.values(): await self.new_messages.put({'complete': True})
await client['send']({
'type': 'websocket.send',
'subtype': 'upgrades.execute_progress',
'text': json.dumps({'complete': True})})
# this websocket is done, so remove all clients # there will be no more status coming
clients.clear()
break break
# we will send this data down to client await asyncio.sleep(0.1)
data = {}
# maybe add more lines from command output async def collect_textout(self, uuid):
path = self.rattail_config.upgrade_filepath(uuid, filename='stdout.log') path = self.rattail_config.upgrade_filepath(uuid, filename='stdout.log')
offset = progress_session.get('stdout.offset', 0)
if os.path.exists(path): # wait until stdout file exists
while not os.path.exists(path):
# bail if upgrade is complete
if uuid not in self.global_state['upgrades']:
return
await asyncio.sleep(0.1)
offset = 0
while True:
# wait until we have something new to read
size = os.path.getsize(path) - offset size = os.path.getsize(path) - offset
if size > 0: while not size:
# bail if upgrade is complete
if uuid not in self.global_state['upgrades']:
return
# wait a whole second, then look again
# (the less frequent we look, the bigger the chunk)
await asyncio.sleep(1)
size = os.path.getsize(path) - offset
# bail if upgrade is complete
if uuid not in self.global_state['upgrades']:
return
# read the latest chunk and bookmark new offset
with open(path, 'rb') as f: with open(path, 'rb') as f:
f.seek(offset) f.seek(offset)
chunk = f.read(size) chunk = f.read(size)
data['stdout'] = chunk.decode('utf8').replace('\n', '<br />') textout = chunk.decode('utf_8')
progress_session['stdout.offset'] = offset + size offset += size
progress_session.save()
# send data to clients # push new chunk onto message queue
for client in clients.values(): textout = textout.replace('\n', '<br />')
await client['send']({ await self.new_messages.put({'stdout': textout})
'type': 'websocket.send',
'subtype': 'upgrades.execute_progress',
'text': json.dumps(data)})
# pause for 1 second await asyncio.sleep(0.1)
await asyncio.sleep(1)
# no more clients, no more reason to track this upgrade
del self.global_state['upgrades'][uuid]
@classmethod @classmethod
def defaults(cls, config): def defaults(cls, config):