Skip to content

Commit

Permalink
Split streamlist specific component from common_functions
Browse files Browse the repository at this point in the history
  • Loading branch information
mmartial committed May 3, 2024
1 parent c6d628d commit 0cc7d4d
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 45 deletions.
66 changes: 21 additions & 45 deletions common_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
import shutil
import fnmatch

import streamlit as st

import json

from datetime import datetime

iti_version="0.9.4"

def isBlank (myString):
return not (myString and myString.strip())

Expand Down Expand Up @@ -81,17 +81,17 @@ def check_dir(dir, text="checked directory"):

##

def check_existing_dir_w(dir, text="checked directory"):
def check_existing_dir_w(dir, text="checked"):
err = check_dir(dir, text=text)
if isNotBlank(err):
return(err)
if os.access(dir, os.W_OK) is False:
return(text+" directory ("+dir+") unable to write to")
return(f"{text} directory ({dir}) unable to write to")
return("")

###

def make_wdir(dir, text="destination directory"):
def make_wdir(dir, text="destination"):
if os.path.isdir(dir) is True:
return(check_existing_dir_w(dir, text))

Expand All @@ -105,6 +105,18 @@ def make_wdir(dir, text="destination directory"):

###

def make_wdir_recursive(dir, text="destination"):
if os.path.isdir(dir) is True:
return(check_existing_dir_w(dir, text))

try:
os.makedirs(dir, 0o755)
except OSError as e:
x = str(e)
return(f"Problem creating {text} directories {dir}: {x}")

return("")

def make_wdir_error(dest_dir):
err = make_wdir(dest_dir)
if isNotBlank(err):
Expand All @@ -127,16 +139,6 @@ def get_timeUTC():

#####

def get_runid():
if 'webui_runid' not in st.session_state:
st.session_state['webui_runid'] = get_timeUTC()

runid = st.session_state['webui_runid']
return runid

#####


def get_run_file(run_file):
err = check_file_r(run_file)
if isNotBlank(err):
Expand Down Expand Up @@ -173,50 +175,24 @@ def get_history(search_dir):
hist = {}
err, listing = get_dirlist(search_dir, "save location")
if isNotBlank(err):
st.error(f"While getting directory listing from {search_dir}: {err}, history will be incomplete")
return hist
return f"While getting directory listing from {search_dir}: {err}, history will be incomplete", hist
for entry in listing:
entry_dir = os.path.join(search_dir, entry)
err = check_existing_dir_w(entry_dir)
if isNotBlank(err):
st.error(f"While checking {entry_dir}: {err}, history will be incomplete")
continue
return f"While checking {entry_dir}: {err}, history will be incomplete", hist
for file in os.listdir(entry_dir):
if fnmatch.fnmatch(file, 'run---*.json'):
if fnmatch.fnmatch(file, 'run.json'):
run_file = os.path.join(entry_dir, file)
run_json = get_run_file(run_file)
if 'prompt' in run_json:
prompt = run_json['prompt']
hist[entry] = [prompt, run_file]
break
return hist
return "", hist

#####

def show_history(hist, allow_history_deletion, last_prompt_key, last_query_key):
hk = [x for x in hist.keys() if isNotBlank(x)]
hk = sorted(hk, reverse=True)
hk_opt = [hist[x][0] for x in hk]
hk_q = {hist[x][0]: hist[x][1] for x in hk}
prev = st.selectbox("Prompt History (most recent first)", options=hk_opt, index=0, key="history")
if st.button("Load Selected Prompt", key="load_history"):
st.session_state[last_prompt_key] = prev
st.session_state[last_query_key] = hk_q[prev]
if allow_history_deletion:
if st.button("Delete Selected Prompt", key="delete_history"):
if isNotBlank(prev):
dir = os.path.dirname(hk_q[prev])
err = directory_rmtree(dir)
if isNotBlank(err):
st.error(f"While deleting {dir}: {err}")
else:
if os.path.exists(dir):
st.error(f"Directory {dir} still exists")
else:
st.success(f"Deleted")
else:
st.error("Please select a prompt to delete")

def read_json(file, txt=''):
err = check_file_r(file)
if err != "":
Expand Down
39 changes: 39 additions & 0 deletions common_functions_WUI.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import os

import common_functions as cf

import streamlit as st


def get_runid():
if 'webui_runid' not in st.session_state:
st.session_state['webui_runid'] = cf.get_timeUTC()

runid = st.session_state['webui_runid']
return runid

#####

def show_history(hist, allow_history_deletion, last_prompt_key, last_query_key):
hk = [x for x in hist.keys() if cf.isNotBlank(x)]
hk = sorted(hk, reverse=True)
hk_opt = [hist[x][0] for x in hk]
hk_q = {hist[x][0]: hist[x][1] for x in hk}
prev = st.selectbox("Prompt History (most recent first)", options=hk_opt, index=0, key="history")
if st.button("Load Selected Prompt", key="load_history"):
st.session_state[last_prompt_key] = prev
st.session_state[last_query_key] = hk_q[prev]
if allow_history_deletion:
if st.button("Delete Selected Prompt", key="delete_history"):
if cf.isNotBlank(prev):
dir = os.path.dirname(hk_q[prev])
err = cf.directory_rmtree(dir)
if cf.isNotBlank(err):
st.error(f"While deleting {dir}: {err}")
else:
if os.path.exists(dir):
st.error(f"Directory {dir} still exists")
else:
st.success(f"Deleted")
else:
st.error("Please select a prompt to delete")

0 comments on commit 0cc7d4d

Please sign in to comment.