print("Start of init")
import random
import gradio as gr
print("Gradio Installed")
import pandas as pd
import io, time
import micropip
from collections import defaultdict
## Install extra packages
#await micropip.install('regex')
#package_list = micropip.list()
#print(package_list)
print("Generic Import Starts")
#import urllib.request
import json, re
import os
import js, base64
print("Micropip Starts")
await micropip.install('plotly')
print("Plotly Installed")
#await micropip.install('pdfminer.six==20221105')
await micropip.install('python-docx==1.1.0')
print("Docx Installed")
await micropip.install("lzma")
print("LZMA Installed")
#await micropip.install('py3langid')
print("Langid Installed")
#await micropip.install('requests')
#print("Requests Installed")
#await micropip.install('beautifulsoup4')
#print("BeautifulSoup Installed")
#await micropip.install('certifi')
#await micropip.install('emoji')
#await micropip.install('nltk')
print("Micropip Finished")
micropip.list()
print("Packages Listed")
#import requests
#from bs4 import BeautifulSoup
#import certifi
#import nltk
#from nltk.tokenize import PunktTextTilingTokenizer
import plotly.graph_objects as go
#import pdfminer.high_level as hl
import docx
#import emoji
#import py3langid as langid
from datetime import datetime
from pyodide.http import pyfetch
#HF_TOKEN = "API_KEY"
#print("HF Token: ", HF_TOKEN)
HF_TOKEN = "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"
#HF_TOKEN = ""
#HF_TOKEN = ""
#print("HF Token: ", HF_TOKEN)
COGN_API_URL = "https://kpd7if9sc5qcx4oe.us-east-1.aws.endpoints.huggingface.cloud"
#v2 API_URL = "https://zry5b3k46dws8zor.us-east-1.aws.endpoints.huggingface.cloud"
API_URL = "https://iktfppaj5w29l0f3.us-east-1.aws.endpoints.huggingface.cloud"
headers = {
"Authorization": "Bearer "+HF_TOKEN+"",
"Content-Type": "application/json",
}
bp_ground_struct = {
"name": "Corporate Policies",
"gradients": [
{"Significant": 1.0},
{"High": 0.8},
{"Moderate": 0.6},
{"Low": 0.4},
{"Minimal": 0.2},
],
"blueprint": [
"Communication Policy",
"Information Handling",
"Conflicts of Interest",
"Fair Dealing"
],
"metamarkers": [
{
"Communication Policy": [
{"Casual conversation": 1},
{"Constructive feedback provided": 0.8},
{"Respectful tone even in disagreement": 0.8},
{"Occasional unprofessional remarks": 0.2},
{"Manipulative language": 0.1},
{"Abusive, inflammatory language": 0}
],
"Weight": 1
},
{
"Information Handling": [
{"Casual conversation": 1},
{"Proper access controls and encryption": 0.8},
{"Reasonable info protections in place": 0.6},
{"Inadvertent exposure addressed responsibly": 0.4},
{"Repeated minor unauthorized exposures": 0.2},
{"Sharing passwords": 0.2},
{"Intentional unauthorized exposure": 0}
],
"Weight": 1
},
{
"Conflicts of Interest": [
{"Casual conversation": 1},
{"Proactively discloses all conflicts": 0.8},
{"Abstains appropriately when conflicts arise": 0.6},
{"May neglect occasional minor conflicts": 0.4},
{"Fails to abstain from clear conflicts": 0.2},
{"Actively hides personal interests": 0}
],
"Weight": 1
},
{
"Fair Dealing": [
{"Casual conversation": 1},
{"Models equitable business practices": 0.8},
{"Demonstrates transparent process": 0.6},
{"Isolated minor process issues": 0.4},
{"Repeated unfair/non-transparent actions": 0.2},
{"Providing special treatment": 0.1},
{"Systematic discriminatory actions": 0}
],
"Weight": 1
}
]
}
# Global variable to store the start time
start_time = None
async def query(payload):
data = payload
#print("Data JSON: ", data)
response = await pyfetch(API_URL, method="POST", body=json.dumps(data), headers=headers)
#print("API Response: ", response)
response_json = await response.json()
#print("API Response JSON: ", response_json)
return response_json
async def query_summ(payload):
data = payload
#response = requests.request("POST", COGN_API_URL, data=data, headers=headers)
response = await pyfetch(COGN_API_URL, method="POST", body=json.dumps(data), headers=headers)
#print("API Response: ", response)
response = await response.json()
#print("API Response JSON: ", response)
return response
async def query_profile(input, blueprint, lang=False):
details= True
local = True
max_retries = 5
if lang:
output = ({
"inputs": input,
"blueprint": [blueprint, 'BlueprintScore'],
"parameters": [{"details":details}, {"local": local}, {"lang": lang}]
})
else:
output = ({
"inputs": input,
"blueprint": [blueprint, 'BlueprintScore'],
"parameters": [{"details":details}, {"local": local}]
})
#print("In query_profile with Output: ", output)
# Calling API with payload
#response = requests.post(COGN_API_URL, headers=headers, json=output)
for retry in range(max_retries):
try:
response = await pyfetch(COGN_API_URL, method="POST", body=json.dumps(output), headers=headers)
break
except:
print("API request failed, retrying attempt %d" % (retry+1))
if not response:
response = ""
print("API failed after %d retries" % max_retries)
response = await response.json()
#print("API Response JSON: ", response)
# Returning JSON output
return response
async def get_bp_structure(bp_id):
global blueprint_structure_global
try:
output = ({
"inputs": "",
"get_blueprint_structure": bp_id
})
# Calling API with payload
#response = requests.post(COGN_API_URL, headers=headers, json=output)
response = await pyfetch(COGN_API_URL, method="POST", body=json.dumps(output), headers=headers)
response = await response.json()
except Error as e:
print(e)
return "Pick a Blueprint from the list."
# Uncomment for dataframe
"""
data = response
df_rows = []
for section in data["m_structure"]:
for question, options in section.items():
if question != "Weight":
for option in options:
score = list(option.values())[0]
text = list(option.keys())[0]
row = [question, text, score]
df_rows.append(row)
df = pd.DataFrame(df_rows, columns=["Metamarkers", "Nuances", "Gradient Scale"])
"""
#return gr.DataFrame(df)
blueprint_structure_global = response["m_structure"]
return blueprint_structure_global
copilot_instruct_global = ""
blueprint_structure_global = None
def format_prompt(message, history, task, metamarker=None, proof=None, outcome=None):
global copilot_instruct_global
prompt = "<s>"
if task == "summarise":
print("Summarise task")
message = str(message)
prompt += f"[INST]Summarise the following Client's Suitability Review: {message}.[/INST]"
elif task == "checklist":
print("Checklist Prep task")
message = str(message)
prompt += f"[INST]Extract important information from this old report and list them into bullet points. Seperate it into 'Financial Profile' and 'Personal Circumanstances' categories. Old Report:\n {message} [/INST]"
elif task == "insight":
#print("Investment Goal Evolution task")
#print("Conversation: ", message)
#print("Summary History: ", history)
message = str(message)
prompt += f"[INST]In two sentences, Generate a summary of the Client's evolving investment goal appetite between this recent conversation: {message} \n and the past Client report with their old appetite: {history}[/INST]"
elif task == "genBP":
print("generate BP")
prompt += f"[INST]Generate a JSON following this exact structure:\n {bp_ground_struct} \n -and adapt name, blueprint, metamarkers cues parameters based on the following name and description: {message}. \n - The metamarkers cues should be detectable from written interactions.[/INST]"
elif task == "opti":
print("Experimental Opti")
prompt += f"[INST]Your task is to rewrite the given text in 5 sentences maximum, keeping only the most crucial information based on the provided blueprint. Follow these instructions carefully: Read through the entire blueprint. Familiarize yourself with all items, including questions, statements, and topics. Analyze the given text and identify ONLY the parts that are DIRECTLY relevant to the blueprint items with higher than 0 weights. Completely disregard any information that doesn't align with these high-weight items, even if it seems important in the original context. Rewrite the text in 5 sentences or fewer, ensuring that you: a. Include ONLY the most crucial information based on the blueprint items with higher weights. b. Maintain the original meaning and context of the relevant information. c. Use clear and concise language. d. Include important figures and specific details that are directly related to the high-weight blueprint items. Do not add any new information, commentary, or analysis that isn't present in the original text AND directly relevant to the high-weight blueprint items. If you find no relevant information for any high-weight blueprint item, return an empty string. Do not include any explanations, metadata about your process, or irrelevant details that doesn't directly address blueprint items above 0 weight. Remember: Focus EXCLUSIVELY on information that is directly relevant to the blueprint items with higher weights. Completely ignore any information, no matter how interesting or contextually important, if it doesn't directly address the high-weight blueprint items. Maintain the accuracy of the original information while optimizing for brevity. Do not make assumptions or inferences beyond what is explicitly stated in the text and directly relevant to the high-weight blueprint items. Ensure your output is exactly 5 sentences or fewer, or an empty string if no relevant information is found. Text: {message} Blueprint: {history}[/INST]"
elif task == "gen rationale":
print("gen rationale")
prompt += f"[INST] Do NOT directly quote the instructions. Criteria: {metamarker} Proof: {proof}. Base your answer on the Proof. In YOUR OWN WORDS, provide a 2 SENTENCE plain language rationale no more than 40 words long explaining why this Outcome occurred. Outcome: '{outcome}'. At the end, list the decision and important dates, cite the important figures and scores supporting the Outcome. [/INST]"
elif task == "copilot insight":
print("gen copilot insight")
#Format message
# Normalize whitespace
message = " ".join(message.split())
# Remove extra line breaks
message = re.sub(r"\n\s*", " ", message)
# Strip extra spaces between words
message = re.sub(r"\s\s+", " ", message)
# Remove hyphen bullet points
message = re.sub(r"\-\s*", "", message)
# Standardize space around punctuation
message = re.sub(r"\s+([\.?,!:])", r"\1", message)
prompt += f"[INST]{copilot_instruct_global} \n\n Background: {history}\n\n My Query: {message}\n\n[/INST]"
else:
print("Other Task")
#print("Conversation: ", message)
prompt += f"[INST]You are my dedicated advisor, You will try to help me find the best possible solution to my request. Let's have a chat.\n\n- Don't repeat yourself.\n- Keep your messages relatively short.\n{message}[/INST] "
return prompt
async def generate(prompt, history, task, temperature, max_new_tokens, top_p, repetition_penalty, presence_penalty, metamarker=None, proof=None, outcome=None):
print("In generate", prompt)
if temperature == None and task == None:
task = ""
temperature = 0.9
max_new_tokens = 250
top_p = 0.9
repetition_penalty = 1.0
presence_penalty = 1.5
payload = {
'inputs': format_prompt(prompt, history, task, metamarker, proof, outcome),
'parameters': {
'temperature': temperature,
'max_new_tokens': max_new_tokens,
'top_p': top_p,
'repetition_penalty': repetition_penalty,
'presence_penalty': presence_penalty,
'generator': True,
'do_sample': False,
'use_cache': True
}
}
print("Payload: ", payload)
#demo.update()
#state.profile_scores = scores
cleaned_text = ""
try:
output = await query(payload)
#print("Output: ", output[0]["generated_text"])
#Remove everything before and including [/INST]
cleaned_text = re.sub(r'^.*?\[/INST\]\s*', '', output[0]["generated_text"], flags=re.DOTALL)
# Remove any remaining [INST] or [/INST] tags
cleaned_text = re.sub(r'\[/?INST\]', '', cleaned_text)
# Remove any leading/trailing whitespace
cleaned_text = cleaned_text.strip()
#print("Cleaned Output: ", cleaned_text)
except Exception as e:
print(e)
return "Error: API request unsuccessful. Please wait a few minutes and try again."
try:
return cleaned_text
#return output[0]["generated_text"]
except Exception as e:
print(e)
return output[0]
### Generate Blueprint ###
async def generate_bp(blueprint_title, blueprint_description):
print("In Generate Blueprint")
try:
bp_direction = f"name: {blueprint_title}\nDescription: {blueprint_description}"
#bp_direction = f"{blueprint_description}"
blueprint_json = await generate(bp_direction, "", "genBP", 0.9, 1097, 0.9, 1, 1.1)
blueprint_json = re.sub(r'^[^{}]*', '', blueprint_json) # Remove before the first
blueprint_json = re.sub(r'}[^{}]*$', '}', blueprint_json) # Remove after the last
#print("Blueprint JSON: ", blueprint_json)
# load text to a json object
blueprint_json = json.loads(blueprint_json)
#print("Blueprint JSON loaded: ", blueprint_json)
return blueprint_json
except Exception as e:
print(e)
return "Add a Blueprint Title and Description"
# Create the initial figure
fig = go.Figure(
layout=dict(
title='Blueprint Scatter Graph',
xaxis_title='Timestamp',
yaxis_title='Score'),
)
# Define marker properties
marker_size = 10
marker_color = [0] # Generate random marker colors
marker_colorscale = 'Plasma' # Choose a color scale
show_marker_scale = True
# Create the initial trace with placeholder marker properties
initial_trace = go.Scatter(x=[datetime.now(), datetime.now()], y=[0, 10], mode='markers', showlegend=False,)
# Add the marker properties to the initial trace
initial_trace.update(marker=dict(
cmax=10,
cmin=0,
size=1,
color=marker_color,
colorscale=marker_colorscale,
showscale=show_marker_scale
))
async def ocr_file_base64(file_path):
print("In OCR")
with open(file_path, 'rb') as file:
encoded_file = base64.b64encode(file.read()).decode('utf-8')
print("Encoded File: ", encoded_file)
# Chunk encoded file as it's too large for API
#chunk_size = 1900000 # 1.9MB - Ensure this is divisible by 4
data = {
"inputs": "",
"extract_text": encoded_file
}
response = await pyfetch(COGN_API_URL, method="POST", body=json.dumps(data), headers=headers)
response = await response.json()
return response
async def nox_audio_base64(audio_file_path):
print("In Nox Audio")
# Open the audio file in binary mode
with open(audio_file_path, "rb") as audio_file:
# Read the contents of the file
encoded_audio = base64.b64encode(audio_file.read()).decode('utf-8')
#print("Encoded File: ", encoded_audio)
data = ({
"inputs": "",
"get_nox": encoded_audio
})
# Calling API with payload
response = await pyfetch(COGN_API_URL, method="POST", body=json.dumps(data), headers=headers)
response = await response.json()
return response[1]
# Add the initial trace to the figure
fig.add_trace(initial_trace)
async def update_scores(prompt, blueprint):
try:
print("In update_scores")
prnt("Prompt: ", prompt)
blueprint = json.loads(blueprint)
bp_result = await query_profile(prompt, blueprint)
#print("Blueprint Result", bp_result)
score = round((float(bp_result[0]["BlueprintScore"])),2)
#print("Scores: ", score)
if score < 0:
score = 0.00
# empty bp_result[1]["mw_score"] to avoid error
bp_result[1]["mw_score"] = {}
scores = [score]
#print("Scores: ", scores)
for i in range(len(scores)):
#print("Individual scores: ", scores[i])
new_trace = go.Scatter(x=[datetime.now()],
y=[float(scores[i])*10],
mode='markers',
showlegend=False,
marker=dict(
cmax=10,
cmin=0,
size=10,
color=[float(scores[i])*10] , #set color equal to a variable
colorscale='Plasma', # one of plotly colorscales
showscale=False
)
)
fig.add_trace(new_trace)
return score, bp_result[1]["mw_score"], gr.Plot(fig)
except Exception as e:
print(e)
return "Select a Blueprint First", "", gr.Plot(fig)
def plot_timeline(files):
if not files:
# If no files, return an empty figure
# Create the initial figure
fig = go.Figure(
layout=dict(
title='No Files Uploaded',
xaxis_title='Date',
yaxis_title='File Name',
yaxis_tickfont=dict(size=10),
xaxis_tickformat='%Y-%m-%d',
hovermode='closest'),
)
return fig
fig = go.Figure(
layout=dict(
title='File Timeline',
xaxis_title='Date',
yaxis_title='File Name',
yaxis_tickfont=dict(size=12),
xaxis_tickformat='%Y-%m-%d',
hovermode='closest'),
)
for file in files:
file_path = file.name
file_stats = os.stat(file_path)
creation_date = datetime.fromtimestamp(file_stats.st_ctime)
modification_date = datetime.fromtimestamp(file_stats.st_mtime)
print("Creation Date: ", creation_date)
print("Modification Date: ", modification_date)
fig.add_trace(go.Scatter(
x=[creation_date, modification_date],
y=[os.path.basename(file_path), os.path.basename(file_path)],
mode='markers',
showlegend=False,
marker=dict(color=['green', 'blue'], colorscale='Plasma'),
text=[f'Created: {creation_date.date()}', f'Modified: {modification_date.date()}'],
hoverinfo='text',
line=dict(color='black'),
name=os.path.basename(file_path)
))
fig.update_layout(showlegend=False, width=850, height=400, hovermode='closest')
return fig
def sliding_window_sentences(text, window_size=3, step_size=1):
print("In sliding_window")
"""
Args:
text (str): The text to split.
window_size (int): Number of sentences in each window.
step_size (int): Number of sentences to shift the window.
Yields:
list: A list of stripped sentences representing the current window.
"""
sentences = []
for i, char in enumerate(text):
if char in '.?!\n':
sentences.append(char)
if len(sentences) == window_size:
yield [sentence.strip() for sentence in sentences[:window_size]]
sentences = sentences[step_size:]
if sentences:
yield [sentence.strip() for sentence in sentences]
global_total_score = 0
global_num_sentences = 0
global_meta_scores = []
global_metamarker_averages = {}
scored_sentences = []
#Format Text and call model
async def format_text(text, fileName, blueprint):
print("In format_text")
global global_total_score
global global_num_sentences
global global_meta_scores
global global_metamarker_averages
global scored_sentences
#lowercase
#text = text.lower()
#remove punctuation
#text = text.translate(str.maketrans('', '', string.punctuation))
#remove whitespace
#remove empty strings
#text = list(filter(None, text))
# Define a regex pattern for splitting sentences
#sentence_pattern = re.compile(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=[.?!\n])|(?<=\n\n)')
#sentence_pattern = re.compile(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=[.?!\n])|(?<=\n\n(?![^\w\s]))')
#sentence_pattern = re.compile(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.)(?<!\d\.)(?=\d)|(?<=[.?!\n])|(?<=\n\n(?![^\w\s]))|(?<!\d)\.(?!\d)')
#sentence_pattern = re.compile(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?|!|\n)\s')
# Use the pattern to split the text into sentences
sentence_pattern = re.compile(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?|!|\n)(?!\d\.)(?=\d)\s|(?<=[?!\n])|(?<!\d)\.(?!\d)')
sentences = sentence_pattern.split(text)
# with nltk
#sentences = nltk.sent_tokenize(text)
#windows = sliding_window_sentences(text)
#sentences = list(windows)
print("Sentences: ", sentences)
scored_sentences_str = []
total_score = 0
num_sentences = 0
meta_sums = defaultdict(float)
meta_counts = defaultdict(int)
# for each sentence run Model
for sentence in sentences:
# Skip empty sentences
if not sentence.strip():
continue
bp_result = await query_profile(sentence, blueprint)
#print("Blueprint Result", bp_result)
#print("Blueprint Nuances Scores", bp_result[2]["n_scores"])
score = round((float(bp_result[0]["BlueprintScore"])),2)
#print("Scores: ", score)
if score >= 0:
total_score += score
num_sentences += 1
metamarkers_score = bp_result[1]["mw_score"]
#print("Metamarkers Scores: ", metamarkers_score)
for key, value in metamarkers_score.items():
#print("Metamarkers Key: ", key)
#print("Metamarkers Value: ", value)
meta_sums[key] += value
meta_counts[key] += 1
#print("Metamarkers Sums: ", meta_sums)
# Create a string with the metamarkers scores
meta_str = ""
for key, value in metamarkers_score.items():
meta_str += f"{key}: {value} | "
#print("Metamarkers String: ", meta_str)
nuances_score = bp_result[2]["n_scores"]
#print("Nuances Scores: ", nuances_score)
#Trim trailing |
meta_str = meta_str[:-3]
# append scored_sentences with the sentence and its score and metamarkers scores and nuances_scores and fileName
scored_sentences_str.append([sentence, score, meta_str])
scored_sentences.append([sentence, score, metamarkers_score, nuances_score, fileName])
# Calculate averages
## scores
if num_sentences > 0:
avg_score = total_score / num_sentences
else:
avg_score = 0
## metamarkers
meta_avgs = {}
for key in meta_sums.keys():
meta_avgs[key] = round(meta_sums[key] / meta_counts[key], 2)
#print("Metamarkers Averages: ", meta_avgs)
# Update globals
global_total_score += avg_score
global_meta_scores.append(meta_avgs)
meta_avg_str = ""
for key, value in meta_avgs.items():
meta_avg_str += f" {key}: {value} |</br>"
# Trim last |
meta_avg_str = meta_avg_str[:-3]
#text = f"Start of File: <b>{fileName}</b></br></br> <b>General Score: {avg_score:.2f}</b> </br></br><b>File Score per Metamarker: </b> </br> {meta_avg_str} </br></br> <details><summary><b>Insight Details </b></summary></br>"
text = f"Start of File: <b>{fileName}</b></br></br> <b>General Score: {avg_score:.2f}</b> </br></br><b>File Score per Metamarker: </b> </br> {meta_avg_str} </br></br> <details><summary"
# comment to hide details
#text += "".join([f"Sub-Segment: '{sentence}' -- <mark>-Weighted Score: <b>{score}</b></mark></br> -- MetaScores: <i>{meta}</i></br></br>" for sentence, score, meta in scored_sentences_str])
#text += "</details></br> <b>-- End of File -- </b></br></br></br>"
text = "Start of File: "+fileName+"\n\n"+re.sub('\s+',' ', text)+"\n\n -- End of File -- \n\n\n"
return text
triggers_list = []
num_triggers = 0
# Extract insight
async def extract_insight(files, blueprint, get_rationale = False, threshold_enabled = False, threshold_range = None, threshold_operator = None, n_dependency = None, flash_attn = False):
print("In extract_insight")
global start_time
# Start the timer
start_time = time.time()
blueprint = json.loads(blueprint)
# reset global values for new batch
global global_total_score
global global_meta_scores
global global_metamarker_averages
global scored_sentences
global_total_score = 0
global_meta_scores = []
global_metamarker_averages = {}
scored_sentences = []
global triggers_list
global num_triggers
#print("Triggers List: ", triggers_list)
#print("Num Triggers: ", num_triggers)
if files == None:
end_time = time.time() # Record end time
elapsed_time = end_time - start_time
# Print elapsed time to console (or update a Gradio component)
print(f"Function took {elapsed_time:.2f} seconds to execute")
return "Please Upload a Supported File First" , ""
try:
batched_text = ""
for file in files:
file_path = file.name
file_name = os.path.basename(file.name)
if file_path.endswith('.pdf'):
text = ""
text = await ocr_file_base64(file_path)
print("OCR Text extracted: ", text)
elif file_path.endswith('.docx'):
document = docx.Document(file_path)
text = ""
text = "\n".join(paragraph.text + " " for paragraph in document.paragraphs)
elif file_path.endswith('.mp3') or file_path.endswith('.wav'):
text = ""
text = await nox_audio_base64(file_path)
else:
end_time = time.time() # Record end time
elapsed_time = end_time - start_time
# Print elapsed time to console (or update a Gradio component)
print(f"Function took {elapsed_time:.2f} seconds to execute")
return "Unsupported File Format", "Supported: PDF, TXT, DOCX, MP3, WAV"
#print("Text extracted: ", text)
if flash_attn:
# text should go through Opti instructions first
print("Text before Opti: ", text)
# clean blueprint into digestable String
processed_blueprint = []
processed = []
# Process blueprint items and metamarkers
print("Blueprint: ", blueprint)
"""for metamarkers in blueprint['metamarkers']:
for marker, details in metamarkers.items():
if marker != 'Weight':
processed.append(f"{marker}")
for nuance in details:
for key, value in nuance.items():
processed.append(f"{key}, Score: {value}")
processed_blueprint = "\n".join(processed)
print("Processed Blueprint: ", processed_blueprint)"""
# if text longer than 100k charac, split it into chunks with end of sentence uncut
if len(text) > 100000:
print("Text longer than 100000 char")
chunks = []
current_chunk = ""
sentences = re.split(r'(?<=[.!?])\s+', text)
for sentence in sentences:
if len(current_chunk) + len(sentence) <= 100000:
current_chunk += sentence + " "
else:
chunks.append(current_chunk.strip())
current_chunk = sentence + " "
if current_chunk:
chunks.append(current_chunk.strip())
# Process each chunk
processed_text = ""
for chunk in chunks:
processed_chunk = await generate(chunk, blueprint, "opti", 0.1, 10000, 0.2, 1.1, 0.1)
processed_text += processed_chunk + " "
text = processed_text.strip()
print("Text after Opti: ", text)
else:
text = await generate(text, blueprint, "opti", 0.1, 10000, 0.2, 1.1, 0.1)
print("Text after Opti: ", text)
batched_text += await format_text(text, file_name, blueprint)
#bp_result = await query_profile(batched_text, blueprint)
#score = round((float(bp_result[0]["BlueprintScore"])),2)
# After processing all files
num_files = len(files)
# divide global total avg_score by number of files
global_avg_score = global_total_score / num_files
#print("Global Average Score: ", global_avg_score)
for meta_scores in global_meta_scores:
# Iterate over each metamarker in the file
for metamarker, score in meta_scores.items():
# Update the running total for each metamarker
global_metamarker_averages.setdefault(metamarker, 0)
# divide global total avg_score by number of files
global_metamarker_averages[metamarker] += score / num_files
# round to 2 decimals
global_metamarker_averages[metamarker] = round(global_metamarker_averages[metamarker], 2)
# Calculate the average for each metamarker
gradients = blueprint["gradients"]
sorted_gradients = sorted(blueprint["gradients"], key=lambda x: list(x.values())[0])
for metamarker in global_metamarker_averages:
#global_metamarker_averages[metamarker] = round(global_metamarker_averages[metamarker] / num_files, 2)
#print("Global Metamarkers Averages: ", global_metamarker_averages)
global_meta_avg_str = ""
rows = []
for key, value in global_metamarker_averages.items():
global_meta_avg_str += f" {key}: {value} |</br>"
trigger_outcome = "Uncat"
highest_gradient = 0
for gradient in gradients:
for gradient_key, gradient_value in gradient.items():
if value >= gradient_value and gradient_value >= highest_gradient:
trigger_outcome = gradient_key
highest_gradient = gradient_value
row = {
"Metamarkers": key,
"Output": value,
"Gradient Match": trigger_outcome,
"Rationale": "",
"Source": ""
}
rows.append(row)
# Iterate through each scored sentence and update the Rationale
lower_bound = 0.0
higher_bound = 1.0
rationale_gen = ""
document_proof = []
document_name = ""
previous_doc = ""
tracked_nuances = {}
nuance_weight = 0
global_avg_score = 0
metamarkers_global_weights = 0
for sentence_data in scored_sentences:
sentence = sentence_data[0]
sentence_scores = sentence_data[2]
sentence_n_scores = sentence_data[3]
document_name = sentence_data[4]
#print("Sentence nuances scores: ", sentence_n_scores)
#HACK to add to model handler as parameter
# for each sentence get the highest score of the metamarkers nuance score sum it to itself. if the nuance was already found as highest score from the previous sentence in this metamarker, don't sum it up
# add find and break option
# check if previous nuance is similar but current score is bigger then keep the biggest one overriding the previous one with the same nuance key by removing its score from the total.
if n_dependency:
# reset global avg score
for metamarker in sentence_scores:
#sentence_scores[metamarker] = 0
top_nuance = max(sentence_n_scores[metamarker],
key=sentence_n_scores[metamarker].get)
#print("Top Nuance Score: ", sentence_n_scores[metamarker][top_nuance])
# if sentence_n_scores[metamarker][top_nuance] exist and is bigger than previous sentence_n_scores[metamarker][top_nuance]
if top_nuance not in tracked_nuances.get(metamarker, []):
print("Top Nuance not tracked yet or new nuance score greater than previous")
tracked_nuances.setdefault(metamarker, []).append(top_nuance)
# this score is correct
#print(f"Sentence Score Before adding: {sentence_scores[metamarker]}")
#print(f"Metamarker: {metamarker} | Top Nuance: {top_nuance} | Score: {sentence_n_scores[metamarker][top_nuance]} | Sentence: {sentence}")
# get the nuance weight from the blueprint associated with the top_nuance and metamarker
#print("Blueprint metamarkers: ", blueprint["metamarkers"])
top_nuance_strip = top_nuance.rstrip('.')
for metamarker_dict in blueprint["metamarkers"]:
if metamarker in metamarker_dict:
categories = metamarker_dict[metamarker]
for category in categories:
if top_nuance_strip in category:
nuance_weight = category[top_nuance_strip]
break
break
#print(f"Nuance Weight {nuance_weight}")
sentence_scores[metamarker] += nuance_weight * sentence_n_scores[metamarker][top_nuance]
#sentence_scores[metamarker] += sentence_n_scores[metamarker][top_nuance]
#print(f"Sentence Score after adding: {sentence_scores} with Nuance weight: {nuance_weight}")
if sentence_n_scores[metamarker][top_nuance] > sentence_n_scores[metamarker][tracked_nuances[metamarker][0]]:
#print("Top Nuance not tracked yet or new nuance score greater than previous")
tracked_nuances.setdefault(metamarker, []).append(top_nuance)
# this score is correct
#print(f"Sentence Score Before adding: {sentence_scores[metamarker]}")
#print(f"Metamarker: {metamarker} | Top Nuance: {top_nuance} | Score: {sentence_n_scores[metamarker][top_nuance]} | Sentence: {sentence}")
# get the nuance weight from the blueprint associated with the top_nuance and metamarker
#print("Blueprint metamarkers: ", blueprint["metamarkers"])
top_nuance_strip = top_nuance.rstrip('.')
for metamarker_dict in blueprint["metamarkers"]:
if metamarker in metamarker_dict:
categories = metamarker_dict[metamarker]
for category in categories:
if top_nuance_strip in category:
nuance_weight = category[top_nuance_strip]
break
break
#print(f"Nuance Weight {nuance_weight}")
#TODO
# if new sentence_n_scores[metamarker][top_nuance] is bigger than the tracked previous one for the same nuance then remove the previous score from sentence_scores[metamarker] and then += the new one with the associated nuance weight
# add the score of the top nuance to the metamarker score multiplied by the blueprint nuance weight
# remove the previous sentence_n_scores[metamarker][top_nuance] from sentence_scores[metamarker] and then += the new one with the associated nuance weight
#print("New Nuance Score is bigger than previous one")
#print(f"Previous Nuance Score: {sentence_n_scores[metamarker][tracked_nuances[metamarker][0]]} for metamarker {metamarker} and nuance {tracked_nuances[metamarker][0]}")
#print(f"New Nuance Score: {sentence_n_scores[metamarker][top_nuance]} for metamarker {metamarker} and nuance {tracked_nuances[metamarker][0]}")
sentence_scores[metamarker] -= nuance_weight * sentence_n_scores[metamarker][tracked_nuances[metamarker][0]]
sentence_scores[metamarker] += nuance_weight * sentence_n_scores[metamarker][top_nuance]
#print(f"Sentence Score after adding: {sentence_scores} with Nuance weight: {nuance_weight}")
# Iterate through metamarkers
#print("Sentence Data Score before loop: ", sentence_scores)
for metamarker, score in sentence_scores.items():
#print("Sentence Data Score in loop to update Dataframe: ", sentence_scores)
# Find the corresponding row in the table
#print(f"Metamarker: {metamarker} | Score: {score} | Sentence: {sentence}")
for i, row in enumerate(rows):
if row["Metamarkers"] == metamarker:
# Check if the sentence score falls between the current and next gradient values
lower_bound, higher_bound = find_lower_and_higher(row["Output"], sorted_gradients.copy())
#print("Lower Bound: ", lower_bound)
#print("Higher Bound: ", higher_bound)
# if threhold enabled, if sentence score is above threshold_range then change Output value to sentence score, stop looping through metamarkers
if threshold_enabled:
print("Threshold Enabled")
#print("Threshold Range: ", threshold_range)
threshold_range_value = list(threshold_range.values())[0]
if eval(str(score) + threshold_operator + str(threshold_range_value)):
#print(f"Score {threshold_operator} {threshold_range_value}")
# Split source into docs
docs = row["Source"].split("\n")
# Remove empty strings
docs = list(filter(None, docs))
# Check if current doc seen
if document_name not in row["Source"]:
row["Source"] += f" File: {document_name} "
# Append the sentence to the "Rationale" with appropriate formatting
score = round(score, 3)
# Check if Output is already assigned
if "Output" not in row or eval(str(score) + threshold_operator + str(row["Output"])):
if score > 1:
score = 1
row["Output"] = score
trigger_outcome = "Uncat"
highest_gradient = 0
for gradient in gradients:
for gradient_key, gradient_value in gradient.items():
if row["Output"] >= gradient_value and gradient_value >= highest_gradient:
trigger_outcome = gradient_key
highest_gradient = gradient_value
#print("Trigger Outcome: ", trigger_outcome)
#print("Highest gradient: ", highest_gradient)
row["Gradient Match"] = trigger_outcome
#print("Score: ", score)
row["Rationale"] += f"{sentence} | \n "
#row["Rationale"] += f"{sentence} {score} | "
break
elif score >= lower_bound and score <= higher_bound:
# Split source into docs
docs = row["Source"].split("\n")
# Remove empty strings
docs = list(filter(None, docs))
# Check if current doc seen
if document_name not in row["Source"]:
row["Source"] += f" File: {document_name} "
# Append the sentence to the "Rationale" with appropriate formatting
score = round(score, 3)
row["Rationale"] += f"{sentence} {score} | "
print("Updated Rationale:", row["Rationale"])
# append to document_proof the sentence and its score for associated to the metamarker and the fileName
#document_proof.append([sentence, score, metamarker, sentence_data[3]])
#print("Document Proof: ", document_proof)
break # Stop searching for the row once found
#print("Sentence Scores: ", sentence_scores)
# Generate written rationale
if get_rationale or flash_attn:
for row in rows:
# if rationale is not empty
if row["Rationale"] != "":
#TODO count characters length, if more than 100000 , then split it and generate the first part of the rationale
# if rationale is longer than 100000 characters then split it in equal parts base don the total length and back to the previous punctuation and generate the first part of the rationale then loop through the rest of the parts and generate the rest of the rationale
max_tokens = 20000
buffer = 1000
tokens = re.split(r'\W+',row["Rationale"])
if len(tokens) > max_tokens:
print("Rationale is longer than 100000 characters")
print("Rationale Length: ", len(row["Rationale"]))
print("Rationale: ", row["Rationale"])
print("Rationale Type: ", type(row["Rationale"]))
#rationale_parts = [row["Rationale"][i:i+max_tokens] for i in range(0, len(row["Rationale"]), max_tokens)]
#print("Rationale Parts Split: ", rationale_parts)
rationale = row["Rationale"]
rationale_parts = []
while len(rationale) > 0:
part = rationale[:max_tokens]
rationale_parts.append(part)
rationale = rationale[max_tokens:]
generated_rationales = []
for part in rationale_parts:
generated = await generate("", "", "gen rationale", 0.3, 1300, 0.3, 1.15, 0, row["Metamarkers"], part, row["Gradient Match"])
generated_rationales.append(generated)
full_rationale = " ".join(generated_rationales)
row["Rationale"] = full_rationale
else:
row["Rationale"] = await generate("", "", "gen rationale", 0.3, 1300, 0.3, 1.15, 0, row["Metamarkers"], row["Rationale"], row["Gradient Match"])
#print("Global Metamarker Avg.: ", global_metamarker_averages.items())
# Uncomment to obtain manual Triggers
"""
if num_triggers>0:
for key, value in global_metamarker_averages.items():
trigger_outcome = ""
#print("Num Triggers: ", num_triggers)
for i in range(num_triggers):
trigger = triggers_list[i]
name = trigger[0]
operator = trigger[1]
threshold = trigger[2]
if eval(str(value) + operator + str(threshold)):
if trigger_outcome == "":
trigger_outcome = name
else:
trigger_outcome += ", " + name
row = {
"Metamarkers": key,
"Meta_Score": value,
"Trigger Outcome": trigger_outcome,
"Rationale": ""
}
rows.append(row)
else:
for key, value in global_metamarker_averages.items():
row = {
"Metamarkers": key,
"Meta_Score": value,
"Trigger Outcome": "",
"Rationale": ""
}
rows.append(row)
data_for_dataframe = pd.DataFrame(rows)
"""
# sum the total of row["Output"] from each rows
print("Rows: ", rows)
for row in rows:
print("Row: ", row)
for metamarkers in blueprint["metamarkers"]:
print("Metamarkers: ", metamarkers)
if row["Metamarkers"] in metamarkers:
metamarkers_global_weights+= metamarkers["Weight"]
print("Metamarkers Global Weights: ", metamarkers_global_weights)
global_avg_score += row["Output"] * metamarkers["Weight"]
print("Global Average Score: ", global_avg_score)
global_avg_score = global_avg_score/metamarkers_global_weights
global_avg_score_gradient = ""
# round global_avg_score to 2 decimals
global_avg_score = round(global_avg_score, 3)
#based on global_avg_score find the blueprint gradient it falls into
for gradient in sorted_gradients:
for gradient_key, gradient_value in gradient.items():
print("Gradient Key: ", gradient_key)
if global_avg_score >= gradient_value:
print("Global Average Score: ", global_avg_score)
print("Gradient Value: ", gradient_value)
global_avg_score_gradient = gradient_key
break
data_for_dataframe = pd.DataFrame(rows)
#print("Dataframe: ", data_for_dataframe)
# Trim last |
global_meta_avg_str = global_meta_avg_str[:-3]
print("Metamarkers Global Weights: ", metamarkers_global_weights)
insight = ""
insight = f"<h4>Gradient Match Outcome: <b>{global_avg_score_gradient}</b></br>Global Weighted Average: <b>{global_avg_score}</b></h4>"
#insight = f"<b>Batch General Score: {global_avg_score:.3f}</b> </br></br><b>Batch Metamarkers General Score: </b> </br> {global_meta_avg_str} </br></br></br></br> <details><summary><b>Insight Details </b></summary></br>{batched_text}</details>"
# make df_dl_btn visible
#dl_df_btn = gr.Button(value ="Download Data Frame π₯", visible=True, interactive= True)
# Save DataFrame to CSV file
#csv_filename = "./b_df_export.csv"
#data_for_dataframe.to_csv(csv_filename, index=True)
#print("ABS Path to CSV File: ", os.path.abspath(csv_filename))
#dfcsv = gr.File(csv_filename, label="Export DataFrame", scale=5, visible=False)
#copy_df_btn = gr.Button(value ="Copy Data Frame π", visible=True, interactive= True, scale=1)
save_df_btn = gr.Button(value ="Save Data Frame Result πΎ", visible=True, interactive= True, scale=1)
# End the timer and calculate the elapsed time
end_time = time.time()
elapsed_time = end_time - start_time
# Format the elapsed time to display it in the terminal
elapsed_time_str = f"Function execution time: {elapsed_time:.2f} seconds"
return insight, data_for_dataframe, save_df_btn, elapsed_time_str
except Exception as e:
print(e)
# End the timer and calculate the elapsed time
end_time = time.time()
elapsed_time = end_time - start_time
# Format the elapsed time to display it in the terminal
elapsed_time_str = f"Function execution time: {elapsed_time:.2f} seconds"
return "Pick a Blueprint"
def find_lower_and_higher(score, gradients):
# Sort gradients based on values
#print("In find_lower_and_higher")
lower_bound = 0
higher_bound = 1
for gradient in gradients:
label, value = list(gradient.items())[0]
if value <= score:
lower_bound = max(lower_bound, value)
elif value > score:
higher_bound = min(higher_bound, value)
return lower_bound, higher_bound
async def get_bp(bp_selector):
print("In get_bp")
#print("Selected Blueprint: ", bp_selector)
bp_structure = await get_bp_structure(bp_selector)
threshold_enabled = gr.Checkbox(label="Conditional Threshold π", info="Enable to isolate and focus on specific Outcomes", interactive= True)
extract_insight_btn = gr.Button(value ="Extract Insight π§ ", interactive= True)
# get blueprint name
print("Blueprint Structure: ", bp_structure)
bp_name = bp_structure['name']
#get blueprint's blueprint in a List
bp_blueprint = bp_structure['blueprint']
bp_gradient = bp_structure['gradients']
phantom_description = f"<center> <h3>Multilingual Interactions Analysis</h3><p> Detecting relevant interactions and events in a self-supervised manner based on a Protocol avoiding bias and allowing discovery of new unknown interactions. </br><b>π No data retained nor future training required.</b> </p> </center> </br><p><b>Protocole:</b> '{bp_name}' </p><p><b>Markers:</b> '{bp_blueprint}'</p><p><b>Scale:</b> '{bp_gradient}'</p>"
scale_gradients = bp_gradient
colors = {
0: "#F7FFF7",
0.25: "#FDE0DD",
0.5: "#21918c",
0.75: "#5ec962",
1: "#fde725"
}
html_content = "<div style='background-color: #0E1428; border-radius: 10px; display: flex; justify-content: space-between;'>"
for gradient in scale_gradients:
label, value = list(gradient.items())[0]
color = colors[min(colors.keys(), key=lambda x: abs(x - value))]
html_content += f"<div style='flex: 1; text-align: center;'><span style='color: {color};'>{label}</div>"
html_content += "</div>"
return bp_selector, gr.Code(value=json.dumps(bp_structure, indent=2, ensure_ascii=False)), phantom_description, html_content, threshold_enabled, extract_insight_btn
## Gradio Interface##
max_triggers = 3
pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
saved_insight_df = pd.DataFrame()
saved_insight_df_gr = gr.Dataframe(headers=["Metamarkers", "Output", "Gradient Match", "Rationale", "Source"], col_count=(5, "fixed"), wrap=True, visible= True, interactive= True)
def variable_outputs(k):
global num_triggers
k = int(k)
num_triggers = k
outputs = []
for i in range(max_triggers):
if i < k:
visible = True
else:
visible = False
outputs.append(gr.Textbox(visible=visible, interactive= True))
outputs.append(gr.Radio(["<", ">"], visible=visible, interactive= True))
outputs.append(gr.Slider(visible=visible, interactive= True))
return outputs
def get_triggers_list_values(trig1, trig2, trig3, trig4, trig5, trig6, trig7, trig8, trig9):
#print("Triggers: ", trig1, trig2, trig3, trig4, trig5, trig6, trig7, trig8, trig9)
global triggers_list
# add triggers to a 2d list 3 by 3
triggers_list = []
triggers_list.append([trig1, trig2, trig3])
triggers_list.append([trig4, trig5, trig6])
triggers_list.append([trig7, trig8, trig9])
#print("Triggers List: ", triggers_list)
def threshold_condition(tc, bp_structure):
print("In TC")
print("Threshold Condition: ", tc)
bp_structure = json.loads(bp_structure)
print("Blueprint Structure: ", bp_structure["gradients"])
threshold_range = gr.Dropdown(bp_structure["gradients"], label=f"Threshold Range",info="Select what should be filtered 'in' or 'out' from of the dataframe display", visible=True, interactive= tc)
threshold_operator = gr.Radio(["<", ">"], label=f"Threshold Operator",visible=True, interactive= tc)
n_dependency = gr.Checkbox(label="Additive Dependency", info="Enable to avoid overlaps between metamarkers.", visible=True, interactive= tc)
return threshold_range, threshold_operator, n_dependency
def build_structure(title, gradients, blueprint, metamarkers):
print("In BuildBP")
structure = {
"name": title,
"gradients": [{"Significant": 1.0}],
"blueprint": blueprint.splitlines(),
"metamarkers": [
{item.split(":")[0].strip(): [{"Casual conversation": 1}] for item in metamarkers.splitlines() if ":" in item}
]
}
print("Structure: ", structure)
# Function to handle button click and generate CSV
def df_to_csv(df):
print("In df_to_csv")
# Save DataFrame to CSV file
csv_filename = "b_df_export.csv"
df.to_csv(csv_filename, index=False)
fileobj = gr.File(csv_filename, label="Exported DataFrame", scale=5, visible=True)
return fileobj
def copy_df(df):
print("In copy_df")
# Copy DataFrame to clipboard
print("Dataframe: ", df)
df.to_clipboard(index=False)
return "DataFrame copied to clipboard"
def save_df(df):
print("In save_df with: ", df)
global saved_insight_df
# save DF in different DF in Copilot Tab
#saved_insight_df = gr.Dataframe(headers=["Metamarkers", "Output", "Gradient Match", "Rationale", "Source"], col_count=(5, "fixed"), wrap=True, visible= True, interactive= True)
#TODO Check if new_row exists in df
#saved_insight_df = saved_insight_df.append(df)
saved_insight_df = pd.concat([saved_insight_df, df]).drop_duplicates().reset_index(drop=True)
empty_df_btn = gr.Button(value ="Empty Data Frame ποΈ", visible=True, interactive= True, scale=1)
return saved_insight_df, empty_df_btn
def empty_saved_df():
print("In empty_saved_df")
global saved_insight_df
saved_insight_df = pd.DataFrame()
saved_insight_df_gr = gr.Dataframe(pd.DataFrame(), headers=["Metamarkers", "Output", "Gradient Match", "Rationale", "Source"], col_count=(5, "fixed"), wrap=True, visible= True, interactive= True)
return saved_insight_df_gr
async def get_copilot_insight(prompt, df, temp_slider, max_tokens_slider, top_p_slider, rep_pen_slider, pres_pen_slider, copilot_instruct):
global copilot_instruct_global
global blueprint_structure_global
copilot_instruct_global = copilot_instruct
#transform blueprint_structure_global dict to string
blueprint_structure_global = json.dumps(blueprint_structure_global)
# add the json blueprint as a string to the instruct
copilot_instruct = copilot_instruct + " \n Blueprint:" + blueprint_structure_global
print("In get_copilot_insight")
print("Prompt: ", prompt)
print("DF: ", df)
print("Blueprint in Get Insight: ", blueprint_structure_global)
print("Copilot Instruct: ", copilot_instruct)
# get the value of the column rationale from the dataframe
if df is None:
return "No Data Provided"
elif isinstance(df, str):
background = df
background = re.sub(r'<details>.*?</details>', '', background, flags=re.DOTALL)
print("Rationale From String: ", background)
else:
background = df['Rationale'].astype(str).reset_index(drop=True).values
print("Rationale From DF: ", background)
# generate insight from dataframe
copilot_insight = await generate(prompt, background, "copilot insight", temp_slider, max_tokens_slider, top_p_slider, rep_pen_slider, pres_pen_slider)
return copilot_insight
#HACK make redundant functions on API side and multithreaded
async def annotate(feed, blueprint):
fileName = "Feed"
try:
blueprint = json.loads(blueprint)
print("Blueprint: ", blueprint)
#results = await query_profile(feed, blueprint)
global global_total_score
global global_num_sentences
global global_meta_scores
global global_metamarker_averages
global scored_sentences
#lowercase
#text = text.lower()
#remove punctuation
#text = text.translate(str.maketrans('', '', string.punctuation))
#remove whitespace
#remove empty strings
#text = list(filter(None, text))
# Define a regex pattern for splitting sentences
sentence_pattern = re.compile(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?|!|\n)(?!\d\.)(?=\d)\s|(?<=[?!\n])|(?<!\d)\.(?!\d)')
# Use the pattern to split the text into sentences
sentences = sentence_pattern.split(feed)
# with nltk
#sentences = nltk.sent_tokenize(text)
#windows = sliding_window_sentences(text)
#sentences = list(windows)
scored_sentences_str = []
total_score = 0
num_sentences = 0
meta_sums = defaultdict(float)
meta_counts = defaultdict(int)
html = ""
sentences_grounded = ""
language_code = ""
# for each sentence run Model
for sentence in sentences:
# Skip empty sentences
if not sentence.strip():
continue
language_code, flag = detect_lang(sentence)
if language_code == "en":
bp_result = await query_profile(sentence, blueprint)
print("Text is English")
else:
bp_result = await query_profile(sentence, blueprint, lang=True)
# Get the top level blueprint score
blueprint_score = bp_result[0]["BlueprintScore"]
print("Blueprint Score: ", blueprint_score)
# Get the metamarker scores
metamarkers = bp_result[1]["mw_score"]
# Create the annotations
annotations = {}
mm_score_positive = False
for mm, score in metamarkers.items():
annotations[mm] = (str(round(score, 2)))
if score > 0:
mm_score_positive = True
print(annotations)
print("Sentence: ", feed)
# change the hexadecimal color based on the blueprint score going from gray-blue to green passing by red
colors = {
0: "#F7FFF7",
0.25: "#FDE0DD",
0.5: "#21918c",
0.75: "#5ec962",
1: "#fde725"
}
if score <= 0.05:
hex_color = colors[0]
text_color = "#000000"
elif score >= 1:
hex_color = colors[1]
text_color = "#FFFFFF"
else:
lower = max([x for x in colors.keys() if x <= score])
higher = min([x for x in colors.keys() if x >= score])
range = higher - lower
progress = (score - lower) / range
lower_color = colors[lower]
higher_color = colors[higher]
r1, g1, b1 = tuple(int(lower_color[i:i+2], 16) for i in (1, 3, 5))
r2, g2, b2 = tuple(int(higher_color[i:i+2], 16) for i in (1, 3, 5))
r = round(r1 + (r2 - r1) * progress)
g = round(g1 + (g2 - g1) * progress)
b = round(b1 + (b2 - b1) * progress)
hex_color = "#{:02x}{:02x}{:02x}".format(r, g, b)
# if hexa is dark then text color is white else black
text_color = "#FFFFFF" if (r*0.299 + g*0.587 + b*0.114) < 186 else "#000000"
highlighted = f"<mark style='background: {hex_color} !important; color: {text_color}'>{sentence}</mark>"
#highlighted = f"<mark style='background: #00ced1 !important'> {sentence} </mark>"
#print("Blueprint Result", bp_result)
#print("Blueprint Nuances Scores", bp_result[2]["n_scores"])
score = round((float(bp_result[0]["BlueprintScore"])),2)
#print("Scores: ", score)
# only count not grounded sentences if at least one annotation is greater than 0
if mm_score_positive:
html += f"{highlighted}<i> - Gradient Score: </i>{score}</br> - <i>Metamarkers: </i>{annotations}</br> <img src='{flag}' width='20'/></br>"
total_score += score
num_sentences += 1
metamarkers_score = bp_result[1]["mw_score"]
#print("Metamarkers Scores: ", metamarkers_score)
for key, value in metamarkers_score.items():
#print("Metamarkers Key: ", key)
#print("Metamarkers Value: ", value)
meta_sums[key] += value
meta_counts[key] += 1
#print("Metamarkers Sums: ", meta_sums)
# Create a string with the metamarkers scores
meta_str = ""
for key, value in metamarkers_score.items():
meta_str += f"{key}: {value} | "
#print("Metamarkers String: ", meta_str)
nuances_score = bp_result[2]["n_scores"]
#print("Nuances Scores: ", nuances_score)
#Trim trailing |
meta_str = meta_str[:-3]
# append scored_sentences with the sentence and its score and metamarkers scores and nuances_scores and fileName
scored_sentences_str.append([sentence, score, meta_str])
scored_sentences.append([sentence, score, metamarkers_score, nuances_score, fileName])
else:
# if score 0 or less, put the sentence in a list of sentences grounded
sentences_grounded += f"{highlighted}</br> - Gradient Score: </i>{score}</br> - <i>Metamarkers: </i>{annotations}</br>"
# Calculate averages
## scores
if num_sentences > 0:
avg_score = total_score / num_sentences
else:
avg_score = 0
## metamarkers
meta_avgs = {}
for key in meta_sums.keys():
meta_avgs[key] = round(meta_sums[key] / meta_counts[key], 2)
#print("Metamarkers Averages: ", meta_avgs)
# Update globals
global_total_score += avg_score
global_meta_scores.append(meta_avgs)
meta_avg_str = ""
for key, value in meta_avgs.items():
meta_avg_str += f" {key}: {value} |</br>"
# Trim last |
meta_avg_str = meta_avg_str[:-3]
#feed_details = f"Start of File: <b>{fileName}</b></br></br> <b>General Score: {avg_score:.2f}</b> </br></br><b>Global Feed Score per Metamarker: </b> </br> {meta_avg_str} </br></br> <details><summary><b>Insight Details </b></summary></br>"
feed_details = f"<b>General Score: {avg_score:.2f}</b> </br></br><b>File Score per Metamarker: </b> </br> {meta_avg_str} </br></br> <details><summary"
# comment to hide details
feed_details += "".join([f"Sub-Segment: '{sentence}' -- <mark>-Weighted Score: <b>{score}</b></mark></br> -- MetaScores: <i>{meta}</i></br></br>" for sentence, score, meta in scored_sentences_str])
feed_details += "</details></br> <b>-- End of Feed -- </b></br></br></br>"
feed_details = "\n\n"+re.sub('\s+',' ', feed_details)+"\n\n\n"
# format html in a better way with a header first
html = f"<h3>Feed Insight</h3></br>{html} </br> </br><details>"
html += f"<b>Grounded Segments: </b></br>{sentences_grounded}</details>"
return html, feed_details, round(avg_score,2), meta_avgs
except Exception as e:
print(e)
return sentence
async def annotate_url(url, blueprint):
try:
#response = requests.get(url)
#response.raise_for_status()
#soup = BeautifulSoup(response.content, 'html.parser')
# Extract text based on your HTML structure:
#all_text = soup.get_text() # Gets all visible text
#text_from_paragraphs = [p.get_text() for p in soup.find_all('p')] # Text from <p> tags
#print("All text:",all_text)
#print("Text from Ps in a list", text_from_paragraphs)
#concatenated_text = " ".join(text_from_paragraphs)
#print("Concatenated Text: ", concatenated_text)
#return await annotate(concatenated_text, blueprint)
return "Fetaure Coming"
except Exception as e:
print(e)
return "Error fetching URL"
def detect_lang(text):
"""Detect Language"""
try:
language_code, _ = langid.classify(text)
# Map language codes to emoji flags
lang_to_flag = {
'en': 'https://flagcdn.com/w40/us.png',
'es': 'https://flagcdn.com/w40/es.png',
'fr': 'https://flagcdn.com/w40/fr.png',
'de': 'https://flagcdn.com/w40/de.png',
'it': 'https://flagcdn.com/w40/it.png',
'pt': 'https://flagcdn.com/w40/br.png',
'ru': 'https://flagcdn.com/w40/ru.png',
'ja': 'https://flagcdn.com/w40/jp.png',
'ko': 'https://flagcdn.com/w40/kr.png',
'zh': 'https://flagcdn.com/w40/cn.png',
'hi': 'https://flagcdn.com/w40/in.png',
'pt-br': 'https://flagcdn.com/w40/br.png',
'uk': 'https://flagcdn.com/w40/gb.png',
}
flag = lang_to_flag.get(language_code, 'https://blankstate.ai/wp-content/uploads/2024/01/w.gif')
print("Flag: ", flag)
# use emoji library and code to display the actual emoji flag
#flag = emoji.emojize(flag, language='alias')
except:
language_code = 'en'
flag = 'https://blankstate.ai/wp-content/uploads/2024/01/w.gif'
return language_code, flag
additional_inputs = [
gr.Textbox(label="Task", lines=1, value="", interactive= False, visible= False),
gr.Slider(label="Temperature", value=0.9, minimum=0.0, maximum=1.0, step=0.05, interactive=True, visible= False),
gr.Slider(label="Max new tokens", value=250, minimum=0, maximum=1097, step=64, interactive=True, visible= False),
gr.Slider(label="Top-p", value=0.90, minimum=0.0, maximum=1, step=0.05, interactive=True, visible= False),
gr.Slider(label="Repetition penalty", value=1.0, minimum=1.0, maximum=2.0, step=0.05, interactive=True, visible= False),
gr.Slider(label="Presence penalty", value=1.5, minimum=1.0, maximum=2.0, step=0.05, interactive=True, visible= False),
]
css = "style.css"
company_logo = "https://blankstate.ai/wp-content/uploads/2023/11/logo_blankstate.ai_dark.png"
#ey_company_logo = "https://bberry.ai/wp-content/uploads/2023/09/EY.png"
company_banner = "https://blankstate.ai/wp-content/uploads/2023/11/Plan-de-travail-16@2x-8.png"
"""blueprint_list = [
["AML High Risk Client Identification","bp_10_AML"],
["Corporate Governance", "bp_4"],
["Corporate Policies", "bp_1"],
["Client Service Assessment", "bp_9_CSA"],
["Risk Profile", "bp_0"],
["Financial Reporting Compliance", "bp_3"],
["Legal Breach Monitoring","bp_2"],
["Portfolio Analysis", "bp_8_PA"],
["Reputation", "bp_5"],
["Client Satisfaction", "bp_11_CH"],
["Fact Finder Requirements", "bp_12_RR_FF"]
]"""
blueprint_list = [
["Fact Find", "bp_12_RR_FF"],
["Risk Profile", "bp_13_RR_RP"],
["Suitability Report", "bp_14_RR_SR"],
["Ongoing Services Review", "bp_23_OSR"],
["Financial Services Complaints Management", "bp_22_FSCM"],
["Emerging Markets Equity Risk Analysis", "bp_15_RR_EMERA"],
["Emerging Markets Equity Risk Analysis - BR", "bp_15_RR_EMERA_BR"],
["Emerging Markets Equity Risk Analysis - HI", "bp_15_RR_EMERA_HI"],
["Emerging Markets Equity Risk Analysis - ZH", "bp_15_RR_EMERA_ZH"],
["Corporate Policies", "bp_1"],
["Corporate Policies - BR", "bp_16_COP_BR"],
["Corporate Policies - ZH", "bp_16_COP_ZH"],
["DORA", "bp_17_DORA"],
["Behavioural Market Segmentation", "bp_21_BMS"]
]
"""scale_gradients = [
{"Significant": 0.9},
{"High": 0.7},
{"Moderate": 0.5},
{"Low": 0.3},
{"Grounded": 0.1}
]"""
scale_gradients =[{"Select a Blueprint": 0.0}]
colors = {
0: "#F7FFF7",
}
html_content = "<div style='background-color: #0E1428; border-radius: 10px; display: flex; justify-content: space-between;'>"
for gradient in scale_gradients:
label, value = list(gradient.items())[0]
color = colors[min(colors.keys(), key=lambda x: abs(x - value))]
html_content += f"<div style='flex: 1; text-align: center;'><span style='color: {color};'>{label}</div>"
html_content += "</div>"
# phantom description on market sentiment and behaviour analysis in emerging markets
phantom_description = f"<center> <h3>Multilingual Behaviour Analysis</h3><p> Detecting relevant interactions and events in a self-supervised manner based on a Protocol avoiding bias and allowing discovery of new unknown interactions. </br><b>π No data retained nor future training required.</b> </p> </center>"
with gr.Blocks(theme=gr.themes.Base()) as demo:
# add gr.header where error message will be displayed
with gr.Row():
with gr.Column(scale=2):
with gr.Tab('Blueprint Protocole'):
with gr.Row():
with gr.Column(scale=1):
with gr.Group():
bp_selector = gr.Dropdown(blueprint_list, value="blueprint_list", label="Available Blueprints", interactive= True, visible= True, allow_custom_value=True)
blueprint_textbox = gr.Textbox(label="Blueprint", value="", interactive= True, visible= False)
with gr.Accordion("See Blueprint Details", open=False, visible=True):
blueprint_structure = gr.Code(label="Selected Blueprint Structure", interactive=True, visible= True, language="json", lines=30)
#blueprint_structure = gr.Dataframe(label="Selected Blueprint Structure", interactive= False, visible= True)
"""with gr.Tab('Generate a Blueprint'):
with gr.Column(scale=2):
with gr.Tab('Blueprint by Definition π'):
with gr.Row():
blueprint_title = gr.Textbox(label="Title", value="Employees Wellness", interactive= True, visible= True)
blueprint_description = gr.Textbox(label="Description", value="Detecting employees feeling and sentiment", interactive= True, visible= True)
with gr.Tab('Blueprint by Query π€ (coming)'):
blueprint_query = gr.Textbox(label="Question", placeholder="[coming soon]", interactive= False, visible= True)
gen_gb_btn = gr.Button(value ="Generate Blueprint βοΈ", interactive= True, visible= True)
blueprint_generated = gr.JSON(label="Generated Structure", interactive= False, visible= True, lines=30)
with gr.Tab('User Blueprint Builderπ'):
blueprint_drafted = gr.JSON(label="Blueprint Made From Scratch", interactive= False, visible= True, lines=30)"""
with gr.Column(scale=6):
with gr.Tab('Phantom'):
phantom_html = gr.HTML(label="Description", value=phantom_description, visible=True)
with gr.Column(scale=6):
with gr.Group():
copilot_instruct_phantom = gr.Textbox(label="Instruct", visible= False, value="Read Carefully the Feed Insight. Base your answers and rationale on the Feed Insight provided. If no Feed Insight is provided, answer that you need more data to provide an answer. 1. Follow IMMEDIATELY with 2-3 sentence reasoning citing evidence based on the Blueprint structure and metamarker scores. You will be asked a PRIMARY core question, POSSIBLY accompanied by extra context. Analyze any additional context if present, BUT base your FINAL determination PRIMARILY on confirmation of the KEY DETAILS in the MAIN PRIMARY question itself.", placeholder="Instruct", lines=1, interactive= True)
copilot_entry_phantom = gr.Textbox(label="E.V.A", value="", placeholder="Gain Insight in Results", lines=1, interactive= True)
copilot_insight_result_phantom = gr.HTML(label="E.V.A Insight", value="")
with gr.Row():
with gr.Column(scale=5):
with gr.Row():
with gr.Column(scale=1):
with gr.Tab("Feed"):
feed = gr.Textbox(label="Text Feed", value="", lines=30, interactive= True)
annotate_btn = gr.Button(value="Run Phantom", interactive= True)
with gr.Tab("URL", interactive= False):
feed_url = gr.Textbox(label="URL Feed", value="", interactive= True)
annotate_url_btn = gr.Button(value="Run Phantom", interactive= True)
#lang_txt = gr.Textbox(label="Main Language Detected", value="π", lines=1, interactive= False)
with gr.Column(scale=1):
gradient_scale = gr.HTML(label="Blueprint Gradient Scale", value=html_content, visible=True)
feed_insight = gr.HTML(label="Feed Insight", value="Feed Insight")
with gr.Column(scale=2):
with gr.Group():
feed_score = gr.Textbox(label="Insight General Score", value="", lines=1, interactive= False)
feed_labels = gr.Label(label="Insight Metamarkers", value="", scale=1)
feed_details = gr.HTML(label="Insight Feed Details", value="", visible=False)
#chart_plot = gr.Plot(fig)
"""with gr.Column(scale=4):
profile_score = gr.Textbox(label="Insight General Score", value="", lines=1, interactive= False)
profile_labels = gr.Label(label="Insight Metamarkers", value="", scale=1)
chart_plot = gr.Plot(fig) """
with gr.Column(scale=6):
terminal = gr.Code(language="shell", lines=4, elem_id="terminal", interactive= False, label="Shell Log", visible=False)
with gr.Tab('Batch Insight'):
with gr.Tab('Historical Data'):
with gr.Column(scale=6):
with gr.Row():
with gr.Column(scale=6):
with gr.Row():
files_batch = gr.Files(label="Upload Files", height= 600, file_types=['txt', 'docx', 'pdf', 'wav', 'mp3'])
with gr.Accordion("See Files Timeline", open=False, visible=True):
timeline_plot = gr.Plot()
with gr.Column(scale=6):
with gr.Group():
with gr.Row():
threshold_enabled = gr.Checkbox(label="Conditional Threshold π", info="Enable to isolate and focus on specific Outcomes", value=False, interactive= True, scale = 3)
threshold_operator = gr.Radio(["<", ">"], label=f"Threshold Operator",visible=True, interactive= False, scale=1)
threshold_range = gr.Dropdown(label=f"Threshold Range", info="Select what should be filtered 'in' or 'out' from of the dataframe display", visible=True, interactive= False, scale=2)
with gr.Row():
n_dependency = gr.Checkbox(label="Additive Dependency", info="Enable if your metamarkers are likely to interact between each other so their combined appearance (de)escalates outcome", visible=True, interactive= False, scale=3)
flash_attn = gr.Checkbox(label="Flash Attention β‘", info="Enable to remove general noise and gain faster process for large contexts. (Experimental)", value=False, interactive= True, scale = 3)
with gr.Row():
get_rationale = gr.Checkbox(label="Generate Written Rationale", info="Outcome will take slightly longer.", value=False, interactive= True, scale=2)
with gr.Column(scale=6):
extract_insight_btn = gr.Button(value ="Extract Insight π§ ", interactive= False)
with gr.Column(scale=6):
insight_df = gr.Dataframe(headers=["Metamarkers", "Output", "Gradient Match", "Rationale", "Source"], col_count=(5, "fixed"), wrap=True, visible= True, interactive= True)
with gr.Group():
with gr.Row():
save_df_btn = gr.Button(value ="Save Data Frame Result πΎ", visible=True, interactive= False, scale=1)
with gr.Row():
csv_export = gr.File(label="Export DataFrame", scale=5, visible=False)
dl_df_btn = gr.Button(value ="Download Data Frame π₯", visible=False, interactive= True, scale=1)
copy_df_btn = gr.Button(value ="Copy Data Frame π", visible=False, interactive= True, scale=1)
files_insight_extract = gr.HTML(label="Insight Extracted", value="")
with gr.Column(scale=6):
terminal_batch = gr.Code(language="shell", lines=4, elem_id="terminal", interactive= False, label="Shell Log")
with gr.Tab('E.V.A'):
with gr.Column(scale=6):
temp_slider = gr.Slider(label="Temperature", value=0.3, minimum=0.0, maximum=1.0, step=0.05, interactive=True, visible= False)
max_new_tokens_slider = gr.Slider(label="Max new tokens", value=350, minimum=0, maximum=1097, step=64, interactive=True, visible= False)
top_p_slider =gr.Slider(label="Top-p", value=0.3, minimum=0.0, maximum=1, step=0.05, interactive=True, visible= False)
rep_pen_slider = gr.Slider(label="Repetition penalty", value=0.87, minimum=0, maximum=2.0, step=0.05, interactive=True, visible= False)
pres_pen_slider = gr.Slider(label="Presence penalty", value=0, minimum=0, maximum=2.0, step=0.05, interactive=True, visible= False)
copilot_instruct = gr.Textbox(label="Instruct", visible= False, value="Read Carefully the Background. Base your answers and rationale on the Background provided. If no Background provided, answer that you need more data to provide an answer. 1. FIRST ALWAYS state a concrete: Yes, Maybe/Unclear or No at the start of your answer. 2. Follow IMMEDIATELY with 1-2 sentence reasoning citing evidence. You will be asked a PRIMARY core question, POSSIBLY accompanied by extra context. Analyze any additional context if present, BUT base your FINAL determination PRIMARILY on confirmation of the KEY DETAILS in the MAIN PRIMARY question itself: If BACKGROUND confirms the KEY PRIMARY DETAILS, mark Yes and cite quote evidence. If SOME but NOT all PRIMARY DETAILS can be confirmed, mark Maybe/Unclear and cite partial evidence. ONLY mark No IF 20% or less of the PRIMARY question details have confirmation and provide a reasoning.", placeholder="Instruct", lines=1, interactive= True)
copilot_entry = gr.Textbox(label="How can I help?", value="", placeholder="Gain Insight in Results", lines=1, interactive= True)
copilot_insight_result = gr.HTML(label="blank_ Insight", value="")
with gr.Row():
with gr.Column(scale=6):
#rationale = gr.Textbox(label="Captured Rationale", value="", placeholder="Rationale", lines= 15, interactive= False)
saved_insight_df_gr = gr.Dataframe(headers=["Metamarkers", "Output", "Gradient Match", "Rationale", "Source"], col_count=(5, "fixed"), wrap=True, visible= True, interactive= True)
empty_df_btn = gr.Button(value ="Empty Data Frame ποΈ", visible=True, interactive= False, scale=1)
with gr.Column(scale=6):
terminal = gr.Code(language="shell", lines=4, elem_id="terminal", interactive= False, label="Shell Log")
with gr.Tab('Real-time Profiling'):
with gr.Column(scale=6):
with gr.Row():
with gr.Column(scale=4):
chatbot = gr.ChatInterface(generate,additional_inputs=additional_inputs)
#with gr.Row():
# profile_score = gr.Textbox(label="Insight General Score", value="", lines=1, interactive= False)
# profile_labels = gr.Label(label="Insight Metamarkers", value="", scale=1)
with gr.Column(scale=2):
profile_score = gr.Textbox(label="Insight General Score", value="", lines=1, interactive= False)
profile_labels = gr.Label(label="Insight Metamarkers", value="", scale=1)
chart_plot = gr.Plot(fig)
with gr.Column(scale=6):
terminal = gr.Code(language="shell", lines=4, elem_id="terminal", interactive= False, label="Shell Log")
#buildBP.click(build_structure, [title, gradients, blueprint, metamarkers])
bp_selector.select(fn=get_bp, inputs=bp_selector, outputs=[blueprint_textbox, blueprint_structure, phantom_html, gradient_scale, threshold_enabled, extract_insight_btn])
threshold_enabled.change(fn=threshold_condition, inputs=[threshold_enabled, blueprint_structure], outputs=[threshold_range, threshold_operator, n_dependency])
#dl_df_btn.click(df_to_csv, inputs=[insight_df], outputs=[fileobj])
#copy_df_btn.click(copy_df, inputs=[insight_df])
chatbot.textbox.submit(fn=update_scores,inputs=[chatbot.textbox, blueprint_structure], outputs=[profile_score, profile_labels, chart_plot])
"""gen_gb_btn.click(
generate_bp,
inputs=[blueprint_title, blueprint_description],
outputs=[blueprint_generated],
api_name=False,
queue=False,
)"""
# when files are uploaded to files_batch run plot_timeline
files_batch.change(
plot_timeline,
inputs=[files_batch],
outputs=[timeline_plot],
api_name=False,
queue=False,
)
extract_insight_btn.click(
extract_insight,
inputs=[files_batch, blueprint_structure, get_rationale, threshold_enabled, threshold_range, threshold_operator, n_dependency, flash_attn],
outputs=[files_insight_extract, insight_df, save_df_btn, terminal_batch],
api_name=False,
queue=False,
)
save_df_btn.click(
save_df,
inputs=[insight_df],
outputs=[saved_insight_df_gr, empty_df_btn],
api_name=False,
queue=False,
)
empty_df_btn.click(
empty_saved_df,
inputs=[],
outputs=[saved_insight_df_gr],
api_name=False,
queue=False,
)
# call funciton when copilot_entry is submitted
copilot_entry_phantom.submit(
get_copilot_insight,
inputs=[copilot_entry_phantom, feed_insight, temp_slider, max_new_tokens_slider, top_p_slider, rep_pen_slider, pres_pen_slider, copilot_instruct_phantom],
outputs=[copilot_insight_result_phantom],
api_name=False,
queue=False,
)
copilot_entry.submit(
get_copilot_insight,
inputs=[copilot_entry, saved_insight_df_gr, temp_slider, max_new_tokens_slider, top_p_slider, rep_pen_slider, pres_pen_slider, copilot_instruct],
outputs=[copilot_insight_result],
api_name=False,
queue=False,
)
feed.submit(
annotate,
inputs=[feed, blueprint_structure],
outputs=[feed_insight, feed_details, feed_score, feed_labels],
api_name=False,
queue=False,
)
annotate_btn.click(
annotate,
inputs=[feed, blueprint_structure],
outputs=[feed_insight, feed_details, feed_score, feed_labels],
api_name=False,
queue=False,
)
annotate_url_btn.click(
annotate_url,
inputs=[feed_url, blueprint_structure],
outputs=[feed_insight, feed_details, feed_score, feed_labels],
api_name=False,
queue=False,
)
if __name__ == "__main__":
demo.launch()