Blankstate Logo
Initialising Playground...
print("Start of init") import random import gradio as gr print("Gradio Installed") import pandas as pd import io, time import micropip from collections import defaultdict ## Install extra packages #await micropip.install('regex') #package_list = micropip.list() #print(package_list) print("Generic Import Starts") #import urllib.request import json, re import os import js, base64 print("Micropip Starts") await micropip.install('plotly') print("Plotly Installed") #await micropip.install('pdfminer.six==20221105') await micropip.install('python-docx==1.1.0') print("Docx Installed") await micropip.install("lzma") print("LZMA Installed") #await micropip.install('py3langid') print("Langid Installed") #await micropip.install('requests') #print("Requests Installed") #await micropip.install('beautifulsoup4') #print("BeautifulSoup Installed") #await micropip.install('certifi') #await micropip.install('emoji') #await micropip.install('nltk') print("Micropip Finished") micropip.list() print("Packages Listed") #import requests #from bs4 import BeautifulSoup #import certifi #import nltk #from nltk.tokenize import PunktTextTilingTokenizer import plotly.graph_objects as go #import pdfminer.high_level as hl import docx #import emoji #import py3langid as langid from datetime import datetime from pyodide.http import pyfetch #HF_TOKEN = "API_KEY" #print("HF Token: ", HF_TOKEN) HF_TOKEN = "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" #HF_TOKEN = "" #HF_TOKEN = "" #print("HF Token: ", HF_TOKEN) COGN_API_URL = "https://kpd7if9sc5qcx4oe.us-east-1.aws.endpoints.huggingface.cloud" #v2 API_URL = "https://zry5b3k46dws8zor.us-east-1.aws.endpoints.huggingface.cloud" API_URL = "https://iktfppaj5w29l0f3.us-east-1.aws.endpoints.huggingface.cloud" headers = { "Authorization": "Bearer "+HF_TOKEN+"", "Content-Type": "application/json", } bp_ground_struct = { "name": "Corporate Policies", "gradients": [ {"Significant": 1.0}, {"High": 0.8}, {"Moderate": 0.6}, {"Low": 0.4}, {"Minimal": 0.2}, ], "blueprint": [ "Communication Policy", "Information Handling", "Conflicts of Interest", "Fair Dealing" ], "metamarkers": [ { "Communication Policy": [ {"Casual conversation": 1}, {"Constructive feedback provided": 0.8}, {"Respectful tone even in disagreement": 0.8}, {"Occasional unprofessional remarks": 0.2}, {"Manipulative language": 0.1}, {"Abusive, inflammatory language": 0} ], "Weight": 1 }, { "Information Handling": [ {"Casual conversation": 1}, {"Proper access controls and encryption": 0.8}, {"Reasonable info protections in place": 0.6}, {"Inadvertent exposure addressed responsibly": 0.4}, {"Repeated minor unauthorized exposures": 0.2}, {"Sharing passwords": 0.2}, {"Intentional unauthorized exposure": 0} ], "Weight": 1 }, { "Conflicts of Interest": [ {"Casual conversation": 1}, {"Proactively discloses all conflicts": 0.8}, {"Abstains appropriately when conflicts arise": 0.6}, {"May neglect occasional minor conflicts": 0.4}, {"Fails to abstain from clear conflicts": 0.2}, {"Actively hides personal interests": 0} ], "Weight": 1 }, { "Fair Dealing": [ {"Casual conversation": 1}, {"Models equitable business practices": 0.8}, {"Demonstrates transparent process": 0.6}, {"Isolated minor process issues": 0.4}, {"Repeated unfair/non-transparent actions": 0.2}, {"Providing special treatment": 0.1}, {"Systematic discriminatory actions": 0} ], "Weight": 1 } ] } # Global variable to store the start time start_time = None async def query(payload): data = payload #print("Data JSON: ", data) response = await pyfetch(API_URL, method="POST", body=json.dumps(data), headers=headers) #print("API Response: ", response) response_json = await response.json() #print("API Response JSON: ", response_json) return response_json async def query_summ(payload): data = payload #response = requests.request("POST", COGN_API_URL, data=data, headers=headers) response = await pyfetch(COGN_API_URL, method="POST", body=json.dumps(data), headers=headers) #print("API Response: ", response) response = await response.json() #print("API Response JSON: ", response) return response async def query_profile(input, blueprint, lang=False): details= True local = True max_retries = 5 if lang: output = ({ "inputs": input, "blueprint": [blueprint, 'BlueprintScore'], "parameters": [{"details":details}, {"local": local}, {"lang": lang}] }) else: output = ({ "inputs": input, "blueprint": [blueprint, 'BlueprintScore'], "parameters": [{"details":details}, {"local": local}] }) #print("In query_profile with Output: ", output) # Calling API with payload #response = requests.post(COGN_API_URL, headers=headers, json=output) for retry in range(max_retries): try: response = await pyfetch(COGN_API_URL, method="POST", body=json.dumps(output), headers=headers) break except: print("API request failed, retrying attempt %d" % (retry+1)) if not response: response = "" print("API failed after %d retries" % max_retries) response = await response.json() #print("API Response JSON: ", response) # Returning JSON output return response async def get_bp_structure(bp_id): global blueprint_structure_global try: output = ({ "inputs": "", "get_blueprint_structure": bp_id }) # Calling API with payload #response = requests.post(COGN_API_URL, headers=headers, json=output) response = await pyfetch(COGN_API_URL, method="POST", body=json.dumps(output), headers=headers) response = await response.json() except Error as e: print(e) return "Pick a Blueprint from the list." # Uncomment for dataframe """ data = response df_rows = [] for section in data["m_structure"]: for question, options in section.items(): if question != "Weight": for option in options: score = list(option.values())[0] text = list(option.keys())[0] row = [question, text, score] df_rows.append(row) df = pd.DataFrame(df_rows, columns=["Metamarkers", "Nuances", "Gradient Scale"]) """ #return gr.DataFrame(df) blueprint_structure_global = response["m_structure"] return blueprint_structure_global copilot_instruct_global = "" blueprint_structure_global = None def format_prompt(message, history, task, metamarker=None, proof=None, outcome=None): global copilot_instruct_global prompt = "<s>" if task == "summarise": print("Summarise task") message = str(message) prompt += f"[INST]Summarise the following Client's Suitability Review: {message}.[/INST]" elif task == "checklist": print("Checklist Prep task") message = str(message) prompt += f"[INST]Extract important information from this old report and list them into bullet points. Seperate it into 'Financial Profile' and 'Personal Circumanstances' categories. Old Report:\n {message} [/INST]" elif task == "insight": #print("Investment Goal Evolution task") #print("Conversation: ", message) #print("Summary History: ", history) message = str(message) prompt += f"[INST]In two sentences, Generate a summary of the Client's evolving investment goal appetite between this recent conversation: {message} \n and the past Client report with their old appetite: {history}[/INST]" elif task == "genBP": print("generate BP") prompt += f"[INST]Generate a JSON following this exact structure:\n {bp_ground_struct} \n -and adapt name, blueprint, metamarkers cues parameters based on the following name and description: {message}. \n - The metamarkers cues should be detectable from written interactions.[/INST]" elif task == "opti": print("Experimental Opti") prompt += f"[INST]Your task is to rewrite the given text in 5 sentences maximum, keeping only the most crucial information based on the provided blueprint. Follow these instructions carefully: Read through the entire blueprint. Familiarize yourself with all items, including questions, statements, and topics. Analyze the given text and identify ONLY the parts that are DIRECTLY relevant to the blueprint items with higher than 0 weights. Completely disregard any information that doesn't align with these high-weight items, even if it seems important in the original context. Rewrite the text in 5 sentences or fewer, ensuring that you: a. Include ONLY the most crucial information based on the blueprint items with higher weights. b. Maintain the original meaning and context of the relevant information. c. Use clear and concise language. d. Include important figures and specific details that are directly related to the high-weight blueprint items. Do not add any new information, commentary, or analysis that isn't present in the original text AND directly relevant to the high-weight blueprint items. If you find no relevant information for any high-weight blueprint item, return an empty string. Do not include any explanations, metadata about your process, or irrelevant details that doesn't directly address blueprint items above 0 weight. Remember: Focus EXCLUSIVELY on information that is directly relevant to the blueprint items with higher weights. Completely ignore any information, no matter how interesting or contextually important, if it doesn't directly address the high-weight blueprint items. Maintain the accuracy of the original information while optimizing for brevity. Do not make assumptions or inferences beyond what is explicitly stated in the text and directly relevant to the high-weight blueprint items. Ensure your output is exactly 5 sentences or fewer, or an empty string if no relevant information is found. Text: {message} Blueprint: {history}[/INST]" elif task == "gen rationale": print("gen rationale") prompt += f"[INST] Do NOT directly quote the instructions. Criteria: {metamarker} Proof: {proof}. Base your answer on the Proof. In YOUR OWN WORDS, provide a 2 SENTENCE plain language rationale no more than 40 words long explaining why this Outcome occurred. Outcome: '{outcome}'. At the end, list the decision and important dates, cite the important figures and scores supporting the Outcome. [/INST]" elif task == "copilot insight": print("gen copilot insight") #Format message # Normalize whitespace message = " ".join(message.split()) # Remove extra line breaks message = re.sub(r"\n\s*", " ", message) # Strip extra spaces between words message = re.sub(r"\s\s+", " ", message) # Remove hyphen bullet points message = re.sub(r"\-\s*", "", message) # Standardize space around punctuation message = re.sub(r"\s+([\.?,!:])", r"\1", message) prompt += f"[INST]{copilot_instruct_global} \n\n Background: {history}\n\n My Query: {message}\n\n[/INST]" else: print("Other Task") #print("Conversation: ", message) prompt += f"[INST]You are my dedicated advisor, You will try to help me find the best possible solution to my request. Let's have a chat.\n\n- Don't repeat yourself.\n- Keep your messages relatively short.\n{message}[/INST] " return prompt async def generate(prompt, history, task, temperature, max_new_tokens, top_p, repetition_penalty, presence_penalty, metamarker=None, proof=None, outcome=None): print("In generate", prompt) if temperature == None and task == None: task = "" temperature = 0.9 max_new_tokens = 250 top_p = 0.9 repetition_penalty = 1.0 presence_penalty = 1.5 payload = { 'inputs': format_prompt(prompt, history, task, metamarker, proof, outcome), 'parameters': { 'temperature': temperature, 'max_new_tokens': max_new_tokens, 'top_p': top_p, 'repetition_penalty': repetition_penalty, 'presence_penalty': presence_penalty, 'generator': True, 'do_sample': False, 'use_cache': True } } print("Payload: ", payload) #demo.update() #state.profile_scores = scores cleaned_text = "" try: output = await query(payload) #print("Output: ", output[0]["generated_text"]) #Remove everything before and including [/INST] cleaned_text = re.sub(r'^.*?\[/INST\]\s*', '', output[0]["generated_text"], flags=re.DOTALL) # Remove any remaining [INST] or [/INST] tags cleaned_text = re.sub(r'\[/?INST\]', '', cleaned_text) # Remove any leading/trailing whitespace cleaned_text = cleaned_text.strip() #print("Cleaned Output: ", cleaned_text) except Exception as e: print(e) return "Error: API request unsuccessful. Please wait a few minutes and try again." try: return cleaned_text #return output[0]["generated_text"] except Exception as e: print(e) return output[0] ### Generate Blueprint ### async def generate_bp(blueprint_title, blueprint_description): print("In Generate Blueprint") try: bp_direction = f"name: {blueprint_title}\nDescription: {blueprint_description}" #bp_direction = f"{blueprint_description}" blueprint_json = await generate(bp_direction, "", "genBP", 0.9, 1097, 0.9, 1, 1.1) blueprint_json = re.sub(r'^[^{}]*', '', blueprint_json) # Remove before the first blueprint_json = re.sub(r'}[^{}]*$', '}', blueprint_json) # Remove after the last #print("Blueprint JSON: ", blueprint_json) # load text to a json object blueprint_json = json.loads(blueprint_json) #print("Blueprint JSON loaded: ", blueprint_json) return blueprint_json except Exception as e: print(e) return "Add a Blueprint Title and Description" # Create the initial figure fig = go.Figure( layout=dict( title='Blueprint Scatter Graph', xaxis_title='Timestamp', yaxis_title='Score'), ) # Define marker properties marker_size = 10 marker_color = [0] # Generate random marker colors marker_colorscale = 'Plasma' # Choose a color scale show_marker_scale = True # Create the initial trace with placeholder marker properties initial_trace = go.Scatter(x=[datetime.now(), datetime.now()], y=[0, 10], mode='markers', showlegend=False,) # Add the marker properties to the initial trace initial_trace.update(marker=dict( cmax=10, cmin=0, size=1, color=marker_color, colorscale=marker_colorscale, showscale=show_marker_scale )) async def ocr_file_base64(file_path): print("In OCR") with open(file_path, 'rb') as file: encoded_file = base64.b64encode(file.read()).decode('utf-8') print("Encoded File: ", encoded_file) # Chunk encoded file as it's too large for API #chunk_size = 1900000 # 1.9MB - Ensure this is divisible by 4 data = { "inputs": "", "extract_text": encoded_file } response = await pyfetch(COGN_API_URL, method="POST", body=json.dumps(data), headers=headers) response = await response.json() return response async def nox_audio_base64(audio_file_path): print("In Nox Audio") # Open the audio file in binary mode with open(audio_file_path, "rb") as audio_file: # Read the contents of the file encoded_audio = base64.b64encode(audio_file.read()).decode('utf-8') #print("Encoded File: ", encoded_audio) data = ({ "inputs": "", "get_nox": encoded_audio }) # Calling API with payload response = await pyfetch(COGN_API_URL, method="POST", body=json.dumps(data), headers=headers) response = await response.json() return response[1] # Add the initial trace to the figure fig.add_trace(initial_trace) async def update_scores(prompt, blueprint): try: print("In update_scores") prnt("Prompt: ", prompt) blueprint = json.loads(blueprint) bp_result = await query_profile(prompt, blueprint) #print("Blueprint Result", bp_result) score = round((float(bp_result[0]["BlueprintScore"])),2) #print("Scores: ", score) if score < 0: score = 0.00 # empty bp_result[1]["mw_score"] to avoid error bp_result[1]["mw_score"] = {} scores = [score] #print("Scores: ", scores) for i in range(len(scores)): #print("Individual scores: ", scores[i]) new_trace = go.Scatter(x=[datetime.now()], y=[float(scores[i])*10], mode='markers', showlegend=False, marker=dict( cmax=10, cmin=0, size=10, color=[float(scores[i])*10] , #set color equal to a variable colorscale='Plasma', # one of plotly colorscales showscale=False ) ) fig.add_trace(new_trace) return score, bp_result[1]["mw_score"], gr.Plot(fig) except Exception as e: print(e) return "Select a Blueprint First", "", gr.Plot(fig) def plot_timeline(files): if not files: # If no files, return an empty figure # Create the initial figure fig = go.Figure( layout=dict( title='No Files Uploaded', xaxis_title='Date', yaxis_title='File Name', yaxis_tickfont=dict(size=10), xaxis_tickformat='%Y-%m-%d', hovermode='closest'), ) return fig fig = go.Figure( layout=dict( title='File Timeline', xaxis_title='Date', yaxis_title='File Name', yaxis_tickfont=dict(size=12), xaxis_tickformat='%Y-%m-%d', hovermode='closest'), ) for file in files: file_path = file.name file_stats = os.stat(file_path) creation_date = datetime.fromtimestamp(file_stats.st_ctime) modification_date = datetime.fromtimestamp(file_stats.st_mtime) print("Creation Date: ", creation_date) print("Modification Date: ", modification_date) fig.add_trace(go.Scatter( x=[creation_date, modification_date], y=[os.path.basename(file_path), os.path.basename(file_path)], mode='markers', showlegend=False, marker=dict(color=['green', 'blue'], colorscale='Plasma'), text=[f'Created: {creation_date.date()}', f'Modified: {modification_date.date()}'], hoverinfo='text', line=dict(color='black'), name=os.path.basename(file_path) )) fig.update_layout(showlegend=False, width=850, height=400, hovermode='closest') return fig def sliding_window_sentences(text, window_size=3, step_size=1): print("In sliding_window") """ Args: text (str): The text to split. window_size (int): Number of sentences in each window. step_size (int): Number of sentences to shift the window. Yields: list: A list of stripped sentences representing the current window. """ sentences = [] for i, char in enumerate(text): if char in '.?!\n': sentences.append(char) if len(sentences) == window_size: yield [sentence.strip() for sentence in sentences[:window_size]] sentences = sentences[step_size:] if sentences: yield [sentence.strip() for sentence in sentences] global_total_score = 0 global_num_sentences = 0 global_meta_scores = [] global_metamarker_averages = {} scored_sentences = [] #Format Text and call model async def format_text(text, fileName, blueprint): print("In format_text") global global_total_score global global_num_sentences global global_meta_scores global global_metamarker_averages global scored_sentences #lowercase #text = text.lower() #remove punctuation #text = text.translate(str.maketrans('', '', string.punctuation)) #remove whitespace #remove empty strings #text = list(filter(None, text)) # Define a regex pattern for splitting sentences #sentence_pattern = re.compile(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=[.?!\n])|(?<=\n\n)') #sentence_pattern = re.compile(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=[.?!\n])|(?<=\n\n(?![^\w\s]))') #sentence_pattern = re.compile(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.)(?<!\d\.)(?=\d)|(?<=[.?!\n])|(?<=\n\n(?![^\w\s]))|(?<!\d)\.(?!\d)') #sentence_pattern = re.compile(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?|!|\n)\s') # Use the pattern to split the text into sentences sentence_pattern = re.compile(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?|!|\n)(?!\d\.)(?=\d)\s|(?<=[?!\n])|(?<!\d)\.(?!\d)') sentences = sentence_pattern.split(text) # with nltk #sentences = nltk.sent_tokenize(text) #windows = sliding_window_sentences(text) #sentences = list(windows) print("Sentences: ", sentences) scored_sentences_str = [] total_score = 0 num_sentences = 0 meta_sums = defaultdict(float) meta_counts = defaultdict(int) # for each sentence run Model for sentence in sentences: # Skip empty sentences if not sentence.strip(): continue bp_result = await query_profile(sentence, blueprint) #print("Blueprint Result", bp_result) #print("Blueprint Nuances Scores", bp_result[2]["n_scores"]) score = round((float(bp_result[0]["BlueprintScore"])),2) #print("Scores: ", score) if score >= 0: total_score += score num_sentences += 1 metamarkers_score = bp_result[1]["mw_score"] #print("Metamarkers Scores: ", metamarkers_score) for key, value in metamarkers_score.items(): #print("Metamarkers Key: ", key) #print("Metamarkers Value: ", value) meta_sums[key] += value meta_counts[key] += 1 #print("Metamarkers Sums: ", meta_sums) # Create a string with the metamarkers scores meta_str = "" for key, value in metamarkers_score.items(): meta_str += f"{key}: {value} | " #print("Metamarkers String: ", meta_str) nuances_score = bp_result[2]["n_scores"] #print("Nuances Scores: ", nuances_score) #Trim trailing | meta_str = meta_str[:-3] # append scored_sentences with the sentence and its score and metamarkers scores and nuances_scores and fileName scored_sentences_str.append([sentence, score, meta_str]) scored_sentences.append([sentence, score, metamarkers_score, nuances_score, fileName]) # Calculate averages ## scores if num_sentences > 0: avg_score = total_score / num_sentences else: avg_score = 0 ## metamarkers meta_avgs = {} for key in meta_sums.keys(): meta_avgs[key] = round(meta_sums[key] / meta_counts[key], 2) #print("Metamarkers Averages: ", meta_avgs) # Update globals global_total_score += avg_score global_meta_scores.append(meta_avgs) meta_avg_str = "" for key, value in meta_avgs.items(): meta_avg_str += f"    {key}: {value} |</br>" # Trim last | meta_avg_str = meta_avg_str[:-3] #text = f"Start of File: <b>{fileName}</b></br></br> <b>General Score: {avg_score:.2f}</b> </br></br><b>File Score per Metamarker: </b> </br> {meta_avg_str} </br></br> <details><summary><b>Insight Details </b></summary></br>" text = f"Start of File: <b>{fileName}</b></br></br> <b>General Score: {avg_score:.2f}</b> </br></br><b>File Score per Metamarker: </b> </br> {meta_avg_str} </br></br> <details><summary" # comment to hide details #text += "".join([f"Sub-Segment: '{sentence}' -- <mark>-Weighted Score: <b>{score}</b></mark></br>   -- MetaScores: <i>{meta}</i></br></br>" for sentence, score, meta in scored_sentences_str]) #text += "</details></br> <b>-- End of File -- </b></br></br></br>" text = "Start of File: "+fileName+"\n\n"+re.sub('\s+',' ', text)+"\n\n -- End of File -- \n\n\n" return text triggers_list = [] num_triggers = 0 # Extract insight async def extract_insight(files, blueprint, get_rationale = False, threshold_enabled = False, threshold_range = None, threshold_operator = None, n_dependency = None, flash_attn = False): print("In extract_insight") global start_time # Start the timer start_time = time.time() blueprint = json.loads(blueprint) # reset global values for new batch global global_total_score global global_meta_scores global global_metamarker_averages global scored_sentences global_total_score = 0 global_meta_scores = [] global_metamarker_averages = {} scored_sentences = [] global triggers_list global num_triggers #print("Triggers List: ", triggers_list) #print("Num Triggers: ", num_triggers) if files == None: end_time = time.time() # Record end time elapsed_time = end_time - start_time # Print elapsed time to console (or update a Gradio component) print(f"Function took {elapsed_time:.2f} seconds to execute") return "Please Upload a Supported File First" , "" try: batched_text = "" for file in files: file_path = file.name file_name = os.path.basename(file.name) if file_path.endswith('.pdf'): text = "" text = await ocr_file_base64(file_path) print("OCR Text extracted: ", text) elif file_path.endswith('.docx'): document = docx.Document(file_path) text = "" text = "\n".join(paragraph.text + " " for paragraph in document.paragraphs) elif file_path.endswith('.mp3') or file_path.endswith('.wav'): text = "" text = await nox_audio_base64(file_path) else: end_time = time.time() # Record end time elapsed_time = end_time - start_time # Print elapsed time to console (or update a Gradio component) print(f"Function took {elapsed_time:.2f} seconds to execute") return "Unsupported File Format", "Supported: PDF, TXT, DOCX, MP3, WAV" #print("Text extracted: ", text) if flash_attn: # text should go through Opti instructions first print("Text before Opti: ", text) # clean blueprint into digestable String processed_blueprint = [] processed = [] # Process blueprint items and metamarkers print("Blueprint: ", blueprint) """for metamarkers in blueprint['metamarkers']: for marker, details in metamarkers.items(): if marker != 'Weight': processed.append(f"{marker}") for nuance in details: for key, value in nuance.items(): processed.append(f"{key}, Score: {value}") processed_blueprint = "\n".join(processed) print("Processed Blueprint: ", processed_blueprint)""" # if text longer than 100k charac, split it into chunks with end of sentence uncut if len(text) > 100000: print("Text longer than 100000 char") chunks = [] current_chunk = "" sentences = re.split(r'(?<=[.!?])\s+', text) for sentence in sentences: if len(current_chunk) + len(sentence) <= 100000: current_chunk += sentence + " " else: chunks.append(current_chunk.strip()) current_chunk = sentence + " " if current_chunk: chunks.append(current_chunk.strip()) # Process each chunk processed_text = "" for chunk in chunks: processed_chunk = await generate(chunk, blueprint, "opti", 0.1, 10000, 0.2, 1.1, 0.1) processed_text += processed_chunk + " " text = processed_text.strip() print("Text after Opti: ", text) else: text = await generate(text, blueprint, "opti", 0.1, 10000, 0.2, 1.1, 0.1) print("Text after Opti: ", text) batched_text += await format_text(text, file_name, blueprint) #bp_result = await query_profile(batched_text, blueprint) #score = round((float(bp_result[0]["BlueprintScore"])),2) # After processing all files num_files = len(files) # divide global total avg_score by number of files global_avg_score = global_total_score / num_files #print("Global Average Score: ", global_avg_score) for meta_scores in global_meta_scores: # Iterate over each metamarker in the file for metamarker, score in meta_scores.items(): # Update the running total for each metamarker global_metamarker_averages.setdefault(metamarker, 0) # divide global total avg_score by number of files global_metamarker_averages[metamarker] += score / num_files # round to 2 decimals global_metamarker_averages[metamarker] = round(global_metamarker_averages[metamarker], 2) # Calculate the average for each metamarker gradients = blueprint["gradients"] sorted_gradients = sorted(blueprint["gradients"], key=lambda x: list(x.values())[0]) for metamarker in global_metamarker_averages: #global_metamarker_averages[metamarker] = round(global_metamarker_averages[metamarker] / num_files, 2) #print("Global Metamarkers Averages: ", global_metamarker_averages) global_meta_avg_str = "" rows = [] for key, value in global_metamarker_averages.items(): global_meta_avg_str += f"    {key}: {value} |</br>" trigger_outcome = "Uncat" highest_gradient = 0 for gradient in gradients: for gradient_key, gradient_value in gradient.items(): if value >= gradient_value and gradient_value >= highest_gradient: trigger_outcome = gradient_key highest_gradient = gradient_value row = { "Metamarkers": key, "Output": value, "Gradient Match": trigger_outcome, "Rationale": "", "Source": "" } rows.append(row) # Iterate through each scored sentence and update the Rationale lower_bound = 0.0 higher_bound = 1.0 rationale_gen = "" document_proof = [] document_name = "" previous_doc = "" tracked_nuances = {} nuance_weight = 0 global_avg_score = 0 metamarkers_global_weights = 0 for sentence_data in scored_sentences: sentence = sentence_data[0] sentence_scores = sentence_data[2] sentence_n_scores = sentence_data[3] document_name = sentence_data[4] #print("Sentence nuances scores: ", sentence_n_scores) #HACK to add to model handler as parameter # for each sentence get the highest score of the metamarkers nuance score sum it to itself. if the nuance was already found as highest score from the previous sentence in this metamarker, don't sum it up # add find and break option # check if previous nuance is similar but current score is bigger then keep the biggest one overriding the previous one with the same nuance key by removing its score from the total. if n_dependency: # reset global avg score for metamarker in sentence_scores: #sentence_scores[metamarker] = 0 top_nuance = max(sentence_n_scores[metamarker], key=sentence_n_scores[metamarker].get) #print("Top Nuance Score: ", sentence_n_scores[metamarker][top_nuance]) # if sentence_n_scores[metamarker][top_nuance] exist and is bigger than previous sentence_n_scores[metamarker][top_nuance] if top_nuance not in tracked_nuances.get(metamarker, []): print("Top Nuance not tracked yet or new nuance score greater than previous") tracked_nuances.setdefault(metamarker, []).append(top_nuance) # this score is correct #print(f"Sentence Score Before adding: {sentence_scores[metamarker]}") #print(f"Metamarker: {metamarker} | Top Nuance: {top_nuance} | Score: {sentence_n_scores[metamarker][top_nuance]} | Sentence: {sentence}") # get the nuance weight from the blueprint associated with the top_nuance and metamarker #print("Blueprint metamarkers: ", blueprint["metamarkers"]) top_nuance_strip = top_nuance.rstrip('.') for metamarker_dict in blueprint["metamarkers"]: if metamarker in metamarker_dict: categories = metamarker_dict[metamarker] for category in categories: if top_nuance_strip in category: nuance_weight = category[top_nuance_strip] break break #print(f"Nuance Weight {nuance_weight}") sentence_scores[metamarker] += nuance_weight * sentence_n_scores[metamarker][top_nuance] #sentence_scores[metamarker] += sentence_n_scores[metamarker][top_nuance] #print(f"Sentence Score after adding: {sentence_scores} with Nuance weight: {nuance_weight}") if sentence_n_scores[metamarker][top_nuance] > sentence_n_scores[metamarker][tracked_nuances[metamarker][0]]: #print("Top Nuance not tracked yet or new nuance score greater than previous") tracked_nuances.setdefault(metamarker, []).append(top_nuance) # this score is correct #print(f"Sentence Score Before adding: {sentence_scores[metamarker]}") #print(f"Metamarker: {metamarker} | Top Nuance: {top_nuance} | Score: {sentence_n_scores[metamarker][top_nuance]} | Sentence: {sentence}") # get the nuance weight from the blueprint associated with the top_nuance and metamarker #print("Blueprint metamarkers: ", blueprint["metamarkers"]) top_nuance_strip = top_nuance.rstrip('.') for metamarker_dict in blueprint["metamarkers"]: if metamarker in metamarker_dict: categories = metamarker_dict[metamarker] for category in categories: if top_nuance_strip in category: nuance_weight = category[top_nuance_strip] break break #print(f"Nuance Weight {nuance_weight}") #TODO # if new sentence_n_scores[metamarker][top_nuance] is bigger than the tracked previous one for the same nuance then remove the previous score from sentence_scores[metamarker] and then += the new one with the associated nuance weight # add the score of the top nuance to the metamarker score multiplied by the blueprint nuance weight # remove the previous sentence_n_scores[metamarker][top_nuance] from sentence_scores[metamarker] and then += the new one with the associated nuance weight #print("New Nuance Score is bigger than previous one") #print(f"Previous Nuance Score: {sentence_n_scores[metamarker][tracked_nuances[metamarker][0]]} for metamarker {metamarker} and nuance {tracked_nuances[metamarker][0]}") #print(f"New Nuance Score: {sentence_n_scores[metamarker][top_nuance]} for metamarker {metamarker} and nuance {tracked_nuances[metamarker][0]}") sentence_scores[metamarker] -= nuance_weight * sentence_n_scores[metamarker][tracked_nuances[metamarker][0]] sentence_scores[metamarker] += nuance_weight * sentence_n_scores[metamarker][top_nuance] #print(f"Sentence Score after adding: {sentence_scores} with Nuance weight: {nuance_weight}") # Iterate through metamarkers #print("Sentence Data Score before loop: ", sentence_scores) for metamarker, score in sentence_scores.items(): #print("Sentence Data Score in loop to update Dataframe: ", sentence_scores) # Find the corresponding row in the table #print(f"Metamarker: {metamarker} | Score: {score} | Sentence: {sentence}") for i, row in enumerate(rows): if row["Metamarkers"] == metamarker: # Check if the sentence score falls between the current and next gradient values lower_bound, higher_bound = find_lower_and_higher(row["Output"], sorted_gradients.copy()) #print("Lower Bound: ", lower_bound) #print("Higher Bound: ", higher_bound) # if threhold enabled, if sentence score is above threshold_range then change Output value to sentence score, stop looping through metamarkers if threshold_enabled: print("Threshold Enabled") #print("Threshold Range: ", threshold_range) threshold_range_value = list(threshold_range.values())[0] if eval(str(score) + threshold_operator + str(threshold_range_value)): #print(f"Score {threshold_operator} {threshold_range_value}") # Split source into docs docs = row["Source"].split("\n") # Remove empty strings docs = list(filter(None, docs)) # Check if current doc seen if document_name not in row["Source"]: row["Source"] += f" File: {document_name}  " # Append the sentence to the "Rationale" with appropriate formatting score = round(score, 3) # Check if Output is already assigned if "Output" not in row or eval(str(score) + threshold_operator + str(row["Output"])): if score > 1: score = 1 row["Output"] = score trigger_outcome = "Uncat" highest_gradient = 0 for gradient in gradients: for gradient_key, gradient_value in gradient.items(): if row["Output"] >= gradient_value and gradient_value >= highest_gradient: trigger_outcome = gradient_key highest_gradient = gradient_value #print("Trigger Outcome: ", trigger_outcome) #print("Highest gradient: ", highest_gradient) row["Gradient Match"] = trigger_outcome #print("Score: ", score) row["Rationale"] += f"{sentence} | \n " #row["Rationale"] += f"{sentence} {score} |  " break elif score >= lower_bound and score <= higher_bound: # Split source into docs docs = row["Source"].split("\n") # Remove empty strings docs = list(filter(None, docs)) # Check if current doc seen if document_name not in row["Source"]: row["Source"] += f" File: {document_name}  " # Append the sentence to the "Rationale" with appropriate formatting score = round(score, 3) row["Rationale"] += f"{sentence} {score} |  " print("Updated Rationale:", row["Rationale"]) # append to document_proof the sentence and its score for associated to the metamarker and the fileName #document_proof.append([sentence, score, metamarker, sentence_data[3]]) #print("Document Proof: ", document_proof) break # Stop searching for the row once found #print("Sentence Scores: ", sentence_scores) # Generate written rationale if get_rationale or flash_attn: for row in rows: # if rationale is not empty if row["Rationale"] != "": #TODO count characters length, if more than 100000 , then split it and generate the first part of the rationale # if rationale is longer than 100000 characters then split it in equal parts base don the total length and back to the previous punctuation and generate the first part of the rationale then loop through the rest of the parts and generate the rest of the rationale max_tokens = 20000 buffer = 1000 tokens = re.split(r'\W+',row["Rationale"]) if len(tokens) > max_tokens: print("Rationale is longer than 100000 characters") print("Rationale Length: ", len(row["Rationale"])) print("Rationale: ", row["Rationale"]) print("Rationale Type: ", type(row["Rationale"])) #rationale_parts = [row["Rationale"][i:i+max_tokens] for i in range(0, len(row["Rationale"]), max_tokens)] #print("Rationale Parts Split: ", rationale_parts) rationale = row["Rationale"] rationale_parts = [] while len(rationale) > 0: part = rationale[:max_tokens] rationale_parts.append(part) rationale = rationale[max_tokens:] generated_rationales = [] for part in rationale_parts: generated = await generate("", "", "gen rationale", 0.3, 1300, 0.3, 1.15, 0, row["Metamarkers"], part, row["Gradient Match"]) generated_rationales.append(generated) full_rationale = " ".join(generated_rationales) row["Rationale"] = full_rationale else: row["Rationale"] = await generate("", "", "gen rationale", 0.3, 1300, 0.3, 1.15, 0, row["Metamarkers"], row["Rationale"], row["Gradient Match"]) #print("Global Metamarker Avg.: ", global_metamarker_averages.items()) # Uncomment to obtain manual Triggers """ if num_triggers>0: for key, value in global_metamarker_averages.items(): trigger_outcome = "" #print("Num Triggers: ", num_triggers) for i in range(num_triggers): trigger = triggers_list[i] name = trigger[0] operator = trigger[1] threshold = trigger[2] if eval(str(value) + operator + str(threshold)): if trigger_outcome == "": trigger_outcome = name else: trigger_outcome += ", " + name row = { "Metamarkers": key, "Meta_Score": value, "Trigger Outcome": trigger_outcome, "Rationale": "" } rows.append(row) else: for key, value in global_metamarker_averages.items(): row = { "Metamarkers": key, "Meta_Score": value, "Trigger Outcome": "", "Rationale": "" } rows.append(row) data_for_dataframe = pd.DataFrame(rows) """ # sum the total of row["Output"] from each rows print("Rows: ", rows) for row in rows: print("Row: ", row) for metamarkers in blueprint["metamarkers"]: print("Metamarkers: ", metamarkers) if row["Metamarkers"] in metamarkers: metamarkers_global_weights+= metamarkers["Weight"] print("Metamarkers Global Weights: ", metamarkers_global_weights) global_avg_score += row["Output"] * metamarkers["Weight"] print("Global Average Score: ", global_avg_score) global_avg_score = global_avg_score/metamarkers_global_weights global_avg_score_gradient = "" # round global_avg_score to 2 decimals global_avg_score = round(global_avg_score, 3) #based on global_avg_score find the blueprint gradient it falls into for gradient in sorted_gradients: for gradient_key, gradient_value in gradient.items(): print("Gradient Key: ", gradient_key) if global_avg_score >= gradient_value: print("Global Average Score: ", global_avg_score) print("Gradient Value: ", gradient_value) global_avg_score_gradient = gradient_key break data_for_dataframe = pd.DataFrame(rows) #print("Dataframe: ", data_for_dataframe) # Trim last | global_meta_avg_str = global_meta_avg_str[:-3] print("Metamarkers Global Weights: ", metamarkers_global_weights) insight = "" insight = f"<h4>Gradient Match Outcome: <b>{global_avg_score_gradient}</b></br>Global Weighted Average: <b>{global_avg_score}</b></h4>" #insight = f"<b>Batch General Score: {global_avg_score:.3f}</b> </br></br><b>Batch Metamarkers General Score: </b> </br> {global_meta_avg_str} </br></br></br></br> <details><summary><b>Insight Details </b></summary></br>{batched_text}</details>" # make df_dl_btn visible #dl_df_btn = gr.Button(value ="Download Data Frame πŸ“₯", visible=True, interactive= True) # Save DataFrame to CSV file #csv_filename = "./b_df_export.csv" #data_for_dataframe.to_csv(csv_filename, index=True) #print("ABS Path to CSV File: ", os.path.abspath(csv_filename)) #dfcsv = gr.File(csv_filename, label="Export DataFrame", scale=5, visible=False) #copy_df_btn = gr.Button(value ="Copy Data Frame πŸ“‹", visible=True, interactive= True, scale=1) save_df_btn = gr.Button(value ="Save Data Frame Result πŸ’Ύ", visible=True, interactive= True, scale=1) # End the timer and calculate the elapsed time end_time = time.time() elapsed_time = end_time - start_time # Format the elapsed time to display it in the terminal elapsed_time_str = f"Function execution time: {elapsed_time:.2f} seconds" return insight, data_for_dataframe, save_df_btn, elapsed_time_str except Exception as e: print(e) # End the timer and calculate the elapsed time end_time = time.time() elapsed_time = end_time - start_time # Format the elapsed time to display it in the terminal elapsed_time_str = f"Function execution time: {elapsed_time:.2f} seconds" return "Pick a Blueprint" def find_lower_and_higher(score, gradients): # Sort gradients based on values #print("In find_lower_and_higher") lower_bound = 0 higher_bound = 1 for gradient in gradients: label, value = list(gradient.items())[0] if value <= score: lower_bound = max(lower_bound, value) elif value > score: higher_bound = min(higher_bound, value) return lower_bound, higher_bound async def get_bp(bp_selector): print("In get_bp") #print("Selected Blueprint: ", bp_selector) bp_structure = await get_bp_structure(bp_selector) threshold_enabled = gr.Checkbox(label="Conditional Threshold πŸ”", info="Enable to isolate and focus on specific Outcomes", interactive= True) extract_insight_btn = gr.Button(value ="Extract Insight 🧠", interactive= True) # get blueprint name print("Blueprint Structure: ", bp_structure) bp_name = bp_structure['name'] #get blueprint's blueprint in a List bp_blueprint = bp_structure['blueprint'] bp_gradient = bp_structure['gradients'] phantom_description = f"<center> <h3>Multilingual Interactions Analysis</h3><p> Detecting relevant interactions and events in a self-supervised manner based on a Protocol avoiding bias and allowing discovery of new unknown interactions. </br><b>πŸ”’ No data retained nor future training required.</b> </p> </center> </br><p><b>Protocole:</b> '{bp_name}' </p><p><b>Markers:</b> '{bp_blueprint}'</p><p><b>Scale:</b> '{bp_gradient}'</p>" scale_gradients = bp_gradient colors = { 0: "#F7FFF7", 0.25: "#FDE0DD", 0.5: "#21918c", 0.75: "#5ec962", 1: "#fde725" } html_content = "<div style='background-color: #0E1428; border-radius: 10px; display: flex; justify-content: space-between;'>" for gradient in scale_gradients: label, value = list(gradient.items())[0] color = colors[min(colors.keys(), key=lambda x: abs(x - value))] html_content += f"<div style='flex: 1; text-align: center;'><span style='color: {color};'>{label}</div>" html_content += "</div>" return bp_selector, gr.Code(value=json.dumps(bp_structure, indent=2, ensure_ascii=False)), phantom_description, html_content, threshold_enabled, extract_insight_btn ## Gradio Interface## max_triggers = 3 pd.set_option('display.max_colwidth', None) pd.set_option('display.max_rows', None) pd.set_option('display.max_columns', None) saved_insight_df = pd.DataFrame() saved_insight_df_gr = gr.Dataframe(headers=["Metamarkers", "Output", "Gradient Match", "Rationale", "Source"], col_count=(5, "fixed"), wrap=True, visible= True, interactive= True) def variable_outputs(k): global num_triggers k = int(k) num_triggers = k outputs = [] for i in range(max_triggers): if i < k: visible = True else: visible = False outputs.append(gr.Textbox(visible=visible, interactive= True)) outputs.append(gr.Radio(["<", ">"], visible=visible, interactive= True)) outputs.append(gr.Slider(visible=visible, interactive= True)) return outputs def get_triggers_list_values(trig1, trig2, trig3, trig4, trig5, trig6, trig7, trig8, trig9): #print("Triggers: ", trig1, trig2, trig3, trig4, trig5, trig6, trig7, trig8, trig9) global triggers_list # add triggers to a 2d list 3 by 3 triggers_list = [] triggers_list.append([trig1, trig2, trig3]) triggers_list.append([trig4, trig5, trig6]) triggers_list.append([trig7, trig8, trig9]) #print("Triggers List: ", triggers_list) def threshold_condition(tc, bp_structure): print("In TC") print("Threshold Condition: ", tc) bp_structure = json.loads(bp_structure) print("Blueprint Structure: ", bp_structure["gradients"]) threshold_range = gr.Dropdown(bp_structure["gradients"], label=f"Threshold Range",info="Select what should be filtered 'in' or 'out' from of the dataframe display", visible=True, interactive= tc) threshold_operator = gr.Radio(["<", ">"], label=f"Threshold Operator",visible=True, interactive= tc) n_dependency = gr.Checkbox(label="Additive Dependency", info="Enable to avoid overlaps between metamarkers.", visible=True, interactive= tc) return threshold_range, threshold_operator, n_dependency def build_structure(title, gradients, blueprint, metamarkers): print("In BuildBP") structure = { "name": title, "gradients": [{"Significant": 1.0}], "blueprint": blueprint.splitlines(), "metamarkers": [ {item.split(":")[0].strip(): [{"Casual conversation": 1}] for item in metamarkers.splitlines() if ":" in item} ] } print("Structure: ", structure) # Function to handle button click and generate CSV def df_to_csv(df): print("In df_to_csv") # Save DataFrame to CSV file csv_filename = "b_df_export.csv" df.to_csv(csv_filename, index=False) fileobj = gr.File(csv_filename, label="Exported DataFrame", scale=5, visible=True) return fileobj def copy_df(df): print("In copy_df") # Copy DataFrame to clipboard print("Dataframe: ", df) df.to_clipboard(index=False) return "DataFrame copied to clipboard" def save_df(df): print("In save_df with: ", df) global saved_insight_df # save DF in different DF in Copilot Tab #saved_insight_df = gr.Dataframe(headers=["Metamarkers", "Output", "Gradient Match", "Rationale", "Source"], col_count=(5, "fixed"), wrap=True, visible= True, interactive= True) #TODO Check if new_row exists in df #saved_insight_df = saved_insight_df.append(df) saved_insight_df = pd.concat([saved_insight_df, df]).drop_duplicates().reset_index(drop=True) empty_df_btn = gr.Button(value ="Empty Data Frame πŸ—‘οΈ", visible=True, interactive= True, scale=1) return saved_insight_df, empty_df_btn def empty_saved_df(): print("In empty_saved_df") global saved_insight_df saved_insight_df = pd.DataFrame() saved_insight_df_gr = gr.Dataframe(pd.DataFrame(), headers=["Metamarkers", "Output", "Gradient Match", "Rationale", "Source"], col_count=(5, "fixed"), wrap=True, visible= True, interactive= True) return saved_insight_df_gr async def get_copilot_insight(prompt, df, temp_slider, max_tokens_slider, top_p_slider, rep_pen_slider, pres_pen_slider, copilot_instruct): global copilot_instruct_global global blueprint_structure_global copilot_instruct_global = copilot_instruct #transform blueprint_structure_global dict to string blueprint_structure_global = json.dumps(blueprint_structure_global) # add the json blueprint as a string to the instruct copilot_instruct = copilot_instruct + " \n Blueprint:" + blueprint_structure_global print("In get_copilot_insight") print("Prompt: ", prompt) print("DF: ", df) print("Blueprint in Get Insight: ", blueprint_structure_global) print("Copilot Instruct: ", copilot_instruct) # get the value of the column rationale from the dataframe if df is None: return "No Data Provided" elif isinstance(df, str): background = df background = re.sub(r'<details>.*?</details>', '', background, flags=re.DOTALL) print("Rationale From String: ", background) else: background = df['Rationale'].astype(str).reset_index(drop=True).values print("Rationale From DF: ", background) # generate insight from dataframe copilot_insight = await generate(prompt, background, "copilot insight", temp_slider, max_tokens_slider, top_p_slider, rep_pen_slider, pres_pen_slider) return copilot_insight #HACK make redundant functions on API side and multithreaded async def annotate(feed, blueprint): fileName = "Feed" try: blueprint = json.loads(blueprint) print("Blueprint: ", blueprint) #results = await query_profile(feed, blueprint) global global_total_score global global_num_sentences global global_meta_scores global global_metamarker_averages global scored_sentences #lowercase #text = text.lower() #remove punctuation #text = text.translate(str.maketrans('', '', string.punctuation)) #remove whitespace #remove empty strings #text = list(filter(None, text)) # Define a regex pattern for splitting sentences sentence_pattern = re.compile(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?|!|\n)(?!\d\.)(?=\d)\s|(?<=[?!\n])|(?<!\d)\.(?!\d)') # Use the pattern to split the text into sentences sentences = sentence_pattern.split(feed) # with nltk #sentences = nltk.sent_tokenize(text) #windows = sliding_window_sentences(text) #sentences = list(windows) scored_sentences_str = [] total_score = 0 num_sentences = 0 meta_sums = defaultdict(float) meta_counts = defaultdict(int) html = "" sentences_grounded = "" language_code = "" # for each sentence run Model for sentence in sentences: # Skip empty sentences if not sentence.strip(): continue language_code, flag = detect_lang(sentence) if language_code == "en": bp_result = await query_profile(sentence, blueprint) print("Text is English") else: bp_result = await query_profile(sentence, blueprint, lang=True) # Get the top level blueprint score blueprint_score = bp_result[0]["BlueprintScore"] print("Blueprint Score: ", blueprint_score) # Get the metamarker scores metamarkers = bp_result[1]["mw_score"] # Create the annotations annotations = {} mm_score_positive = False for mm, score in metamarkers.items(): annotations[mm] = (str(round(score, 2))) if score > 0: mm_score_positive = True print(annotations) print("Sentence: ", feed) # change the hexadecimal color based on the blueprint score going from gray-blue to green passing by red colors = { 0: "#F7FFF7", 0.25: "#FDE0DD", 0.5: "#21918c", 0.75: "#5ec962", 1: "#fde725" } if score <= 0.05: hex_color = colors[0] text_color = "#000000" elif score >= 1: hex_color = colors[1] text_color = "#FFFFFF" else: lower = max([x for x in colors.keys() if x <= score]) higher = min([x for x in colors.keys() if x >= score]) range = higher - lower progress = (score - lower) / range lower_color = colors[lower] higher_color = colors[higher] r1, g1, b1 = tuple(int(lower_color[i:i+2], 16) for i in (1, 3, 5)) r2, g2, b2 = tuple(int(higher_color[i:i+2], 16) for i in (1, 3, 5)) r = round(r1 + (r2 - r1) * progress) g = round(g1 + (g2 - g1) * progress) b = round(b1 + (b2 - b1) * progress) hex_color = "#{:02x}{:02x}{:02x}".format(r, g, b) # if hexa is dark then text color is white else black text_color = "#FFFFFF" if (r*0.299 + g*0.587 + b*0.114) < 186 else "#000000" highlighted = f"<mark style='background: {hex_color} !important; color: {text_color}'>{sentence}</mark>" #highlighted = f"<mark style='background: #00ced1 !important'> {sentence} </mark>" #print("Blueprint Result", bp_result) #print("Blueprint Nuances Scores", bp_result[2]["n_scores"]) score = round((float(bp_result[0]["BlueprintScore"])),2) #print("Scores: ", score) # only count not grounded sentences if at least one annotation is greater than 0 if mm_score_positive: html += f"{highlighted}<i>    - Gradient Score: </i>{score}</br>    - <i>Metamarkers: </i>{annotations}</br> <img src='{flag}' width='20'/></br>" total_score += score num_sentences += 1 metamarkers_score = bp_result[1]["mw_score"] #print("Metamarkers Scores: ", metamarkers_score) for key, value in metamarkers_score.items(): #print("Metamarkers Key: ", key) #print("Metamarkers Value: ", value) meta_sums[key] += value meta_counts[key] += 1 #print("Metamarkers Sums: ", meta_sums) # Create a string with the metamarkers scores meta_str = "" for key, value in metamarkers_score.items(): meta_str += f"{key}: {value} | " #print("Metamarkers String: ", meta_str) nuances_score = bp_result[2]["n_scores"] #print("Nuances Scores: ", nuances_score) #Trim trailing | meta_str = meta_str[:-3] # append scored_sentences with the sentence and its score and metamarkers scores and nuances_scores and fileName scored_sentences_str.append([sentence, score, meta_str]) scored_sentences.append([sentence, score, metamarkers_score, nuances_score, fileName]) else: # if score 0 or less, put the sentence in a list of sentences grounded sentences_grounded += f"{highlighted}</br> - Gradient Score: </i>{score}</br>    - <i>Metamarkers: </i>{annotations}</br>" # Calculate averages ## scores if num_sentences > 0: avg_score = total_score / num_sentences else: avg_score = 0 ## metamarkers meta_avgs = {} for key in meta_sums.keys(): meta_avgs[key] = round(meta_sums[key] / meta_counts[key], 2) #print("Metamarkers Averages: ", meta_avgs) # Update globals global_total_score += avg_score global_meta_scores.append(meta_avgs) meta_avg_str = "" for key, value in meta_avgs.items(): meta_avg_str += f"    {key}: {value} |</br>" # Trim last | meta_avg_str = meta_avg_str[:-3] #feed_details = f"Start of File: <b>{fileName}</b></br></br> <b>General Score: {avg_score:.2f}</b> </br></br><b>Global Feed Score per Metamarker: </b> </br> {meta_avg_str} </br></br> <details><summary><b>Insight Details </b></summary></br>" feed_details = f"<b>General Score: {avg_score:.2f}</b> </br></br><b>File Score per Metamarker: </b> </br> {meta_avg_str} </br></br> <details><summary" # comment to hide details feed_details += "".join([f"Sub-Segment: '{sentence}' -- <mark>-Weighted Score: <b>{score}</b></mark></br>   -- MetaScores: <i>{meta}</i></br></br>" for sentence, score, meta in scored_sentences_str]) feed_details += "</details></br> <b>-- End of Feed -- </b></br></br></br>" feed_details = "\n\n"+re.sub('\s+',' ', feed_details)+"\n\n\n" # format html in a better way with a header first html = f"<h3>Feed Insight</h3></br>{html} </br> </br><details>" html += f"<b>Grounded Segments: </b></br>{sentences_grounded}</details>" return html, feed_details, round(avg_score,2), meta_avgs except Exception as e: print(e) return sentence async def annotate_url(url, blueprint): try: #response = requests.get(url) #response.raise_for_status() #soup = BeautifulSoup(response.content, 'html.parser') # Extract text based on your HTML structure: #all_text = soup.get_text() # Gets all visible text #text_from_paragraphs = [p.get_text() for p in soup.find_all('p')] # Text from <p> tags #print("All text:",all_text) #print("Text from Ps in a list", text_from_paragraphs) #concatenated_text = " ".join(text_from_paragraphs) #print("Concatenated Text: ", concatenated_text) #return await annotate(concatenated_text, blueprint) return "Fetaure Coming" except Exception as e: print(e) return "Error fetching URL" def detect_lang(text): """Detect Language""" try: language_code, _ = langid.classify(text) # Map language codes to emoji flags lang_to_flag = { 'en': 'https://flagcdn.com/w40/us.png', 'es': 'https://flagcdn.com/w40/es.png', 'fr': 'https://flagcdn.com/w40/fr.png', 'de': 'https://flagcdn.com/w40/de.png', 'it': 'https://flagcdn.com/w40/it.png', 'pt': 'https://flagcdn.com/w40/br.png', 'ru': 'https://flagcdn.com/w40/ru.png', 'ja': 'https://flagcdn.com/w40/jp.png', 'ko': 'https://flagcdn.com/w40/kr.png', 'zh': 'https://flagcdn.com/w40/cn.png', 'hi': 'https://flagcdn.com/w40/in.png', 'pt-br': 'https://flagcdn.com/w40/br.png', 'uk': 'https://flagcdn.com/w40/gb.png', } flag = lang_to_flag.get(language_code, 'https://blankstate.ai/wp-content/uploads/2024/01/w.gif') print("Flag: ", flag) # use emoji library and code to display the actual emoji flag #flag = emoji.emojize(flag, language='alias') except: language_code = 'en' flag = 'https://blankstate.ai/wp-content/uploads/2024/01/w.gif' return language_code, flag additional_inputs = [ gr.Textbox(label="Task", lines=1, value="", interactive= False, visible= False), gr.Slider(label="Temperature", value=0.9, minimum=0.0, maximum=1.0, step=0.05, interactive=True, visible= False), gr.Slider(label="Max new tokens", value=250, minimum=0, maximum=1097, step=64, interactive=True, visible= False), gr.Slider(label="Top-p", value=0.90, minimum=0.0, maximum=1, step=0.05, interactive=True, visible= False), gr.Slider(label="Repetition penalty", value=1.0, minimum=1.0, maximum=2.0, step=0.05, interactive=True, visible= False), gr.Slider(label="Presence penalty", value=1.5, minimum=1.0, maximum=2.0, step=0.05, interactive=True, visible= False), ] css = "style.css" company_logo = "https://blankstate.ai/wp-content/uploads/2023/11/logo_blankstate.ai_dark.png" #ey_company_logo = "https://bberry.ai/wp-content/uploads/2023/09/EY.png" company_banner = "https://blankstate.ai/wp-content/uploads/2023/11/Plan-de-travail-16@2x-8.png" """blueprint_list = [ ["AML High Risk Client Identification","bp_10_AML"], ["Corporate Governance", "bp_4"], ["Corporate Policies", "bp_1"], ["Client Service Assessment", "bp_9_CSA"], ["Risk Profile", "bp_0"], ["Financial Reporting Compliance", "bp_3"], ["Legal Breach Monitoring","bp_2"], ["Portfolio Analysis", "bp_8_PA"], ["Reputation", "bp_5"], ["Client Satisfaction", "bp_11_CH"], ["Fact Finder Requirements", "bp_12_RR_FF"] ]""" blueprint_list = [ ["Fact Find", "bp_12_RR_FF"], ["Risk Profile", "bp_13_RR_RP"], ["Suitability Report", "bp_14_RR_SR"], ["Ongoing Services Review", "bp_23_OSR"], ["Financial Services Complaints Management", "bp_22_FSCM"], ["Emerging Markets Equity Risk Analysis", "bp_15_RR_EMERA"], ["Emerging Markets Equity Risk Analysis - BR", "bp_15_RR_EMERA_BR"], ["Emerging Markets Equity Risk Analysis - HI", "bp_15_RR_EMERA_HI"], ["Emerging Markets Equity Risk Analysis - ZH", "bp_15_RR_EMERA_ZH"], ["Corporate Policies", "bp_1"], ["Corporate Policies - BR", "bp_16_COP_BR"], ["Corporate Policies - ZH", "bp_16_COP_ZH"], ["DORA", "bp_17_DORA"], ["Behavioural Market Segmentation", "bp_21_BMS"] ] """scale_gradients = [ {"Significant": 0.9}, {"High": 0.7}, {"Moderate": 0.5}, {"Low": 0.3}, {"Grounded": 0.1} ]""" scale_gradients =[{"Select a Blueprint": 0.0}] colors = { 0: "#F7FFF7", } html_content = "<div style='background-color: #0E1428; border-radius: 10px; display: flex; justify-content: space-between;'>" for gradient in scale_gradients: label, value = list(gradient.items())[0] color = colors[min(colors.keys(), key=lambda x: abs(x - value))] html_content += f"<div style='flex: 1; text-align: center;'><span style='color: {color};'>{label}</div>" html_content += "</div>" # phantom description on market sentiment and behaviour analysis in emerging markets phantom_description = f"<center> <h3>Multilingual Behaviour Analysis</h3><p> Detecting relevant interactions and events in a self-supervised manner based on a Protocol avoiding bias and allowing discovery of new unknown interactions. </br><b>πŸ”’ No data retained nor future training required.</b> </p> </center>" with gr.Blocks(theme=gr.themes.Base()) as demo: # add gr.header where error message will be displayed with gr.Row(): with gr.Column(scale=2): with gr.Tab('Blueprint Protocole'): with gr.Row(): with gr.Column(scale=1): with gr.Group(): bp_selector = gr.Dropdown(blueprint_list, value="blueprint_list", label="Available Blueprints", interactive= True, visible= True, allow_custom_value=True) blueprint_textbox = gr.Textbox(label="Blueprint", value="", interactive= True, visible= False) with gr.Accordion("See Blueprint Details", open=False, visible=True): blueprint_structure = gr.Code(label="Selected Blueprint Structure", interactive=True, visible= True, language="json", lines=30) #blueprint_structure = gr.Dataframe(label="Selected Blueprint Structure", interactive= False, visible= True) """with gr.Tab('Generate a Blueprint'): with gr.Column(scale=2): with gr.Tab('Blueprint by Definition πŸ”'): with gr.Row(): blueprint_title = gr.Textbox(label="Title", value="Employees Wellness", interactive= True, visible= True) blueprint_description = gr.Textbox(label="Description", value="Detecting employees feeling and sentiment", interactive= True, visible= True) with gr.Tab('Blueprint by Query πŸ€” (coming)'): blueprint_query = gr.Textbox(label="Question", placeholder="[coming soon]", interactive= False, visible= True) gen_gb_btn = gr.Button(value ="Generate Blueprint ✍️", interactive= True, visible= True) blueprint_generated = gr.JSON(label="Generated Structure", interactive= False, visible= True, lines=30) with gr.Tab('User Blueprint BuilderπŸ“'): blueprint_drafted = gr.JSON(label="Blueprint Made From Scratch", interactive= False, visible= True, lines=30)""" with gr.Column(scale=6): with gr.Tab('Phantom'): phantom_html = gr.HTML(label="Description", value=phantom_description, visible=True) with gr.Column(scale=6): with gr.Group(): copilot_instruct_phantom = gr.Textbox(label="Instruct", visible= False, value="Read Carefully the Feed Insight. Base your answers and rationale on the Feed Insight provided. If no Feed Insight is provided, answer that you need more data to provide an answer. 1. Follow IMMEDIATELY with 2-3 sentence reasoning citing evidence based on the Blueprint structure and metamarker scores. You will be asked a PRIMARY core question, POSSIBLY accompanied by extra context. Analyze any additional context if present, BUT base your FINAL determination PRIMARILY on confirmation of the KEY DETAILS in the MAIN PRIMARY question itself.", placeholder="Instruct", lines=1, interactive= True) copilot_entry_phantom = gr.Textbox(label="E.V.A", value="", placeholder="Gain Insight in Results", lines=1, interactive= True) copilot_insight_result_phantom = gr.HTML(label="E.V.A Insight", value="") with gr.Row(): with gr.Column(scale=5): with gr.Row(): with gr.Column(scale=1): with gr.Tab("Feed"): feed = gr.Textbox(label="Text Feed", value="", lines=30, interactive= True) annotate_btn = gr.Button(value="Run Phantom", interactive= True) with gr.Tab("URL", interactive= False): feed_url = gr.Textbox(label="URL Feed", value="", interactive= True) annotate_url_btn = gr.Button(value="Run Phantom", interactive= True) #lang_txt = gr.Textbox(label="Main Language Detected", value="🌐", lines=1, interactive= False) with gr.Column(scale=1): gradient_scale = gr.HTML(label="Blueprint Gradient Scale", value=html_content, visible=True) feed_insight = gr.HTML(label="Feed Insight", value="Feed Insight") with gr.Column(scale=2): with gr.Group(): feed_score = gr.Textbox(label="Insight General Score", value="", lines=1, interactive= False) feed_labels = gr.Label(label="Insight Metamarkers", value="", scale=1) feed_details = gr.HTML(label="Insight Feed Details", value="", visible=False) #chart_plot = gr.Plot(fig) """with gr.Column(scale=4): profile_score = gr.Textbox(label="Insight General Score", value="", lines=1, interactive= False) profile_labels = gr.Label(label="Insight Metamarkers", value="", scale=1) chart_plot = gr.Plot(fig) """ with gr.Column(scale=6): terminal = gr.Code(language="shell", lines=4, elem_id="terminal", interactive= False, label="Shell Log", visible=False) with gr.Tab('Batch Insight'): with gr.Tab('Historical Data'): with gr.Column(scale=6): with gr.Row(): with gr.Column(scale=6): with gr.Row(): files_batch = gr.Files(label="Upload Files", height= 600, file_types=['txt', 'docx', 'pdf', 'wav', 'mp3']) with gr.Accordion("See Files Timeline", open=False, visible=True): timeline_plot = gr.Plot() with gr.Column(scale=6): with gr.Group(): with gr.Row(): threshold_enabled = gr.Checkbox(label="Conditional Threshold πŸ”", info="Enable to isolate and focus on specific Outcomes", value=False, interactive= True, scale = 3) threshold_operator = gr.Radio(["<", ">"], label=f"Threshold Operator",visible=True, interactive= False, scale=1) threshold_range = gr.Dropdown(label=f"Threshold Range", info="Select what should be filtered 'in' or 'out' from of the dataframe display", visible=True, interactive= False, scale=2) with gr.Row(): n_dependency = gr.Checkbox(label="Additive Dependency", info="Enable if your metamarkers are likely to interact between each other so their combined appearance (de)escalates outcome", visible=True, interactive= False, scale=3) flash_attn = gr.Checkbox(label="Flash Attention ⚑", info="Enable to remove general noise and gain faster process for large contexts. (Experimental)", value=False, interactive= True, scale = 3) with gr.Row(): get_rationale = gr.Checkbox(label="Generate Written Rationale", info="Outcome will take slightly longer.", value=False, interactive= True, scale=2) with gr.Column(scale=6): extract_insight_btn = gr.Button(value ="Extract Insight 🧠", interactive= False) with gr.Column(scale=6): insight_df = gr.Dataframe(headers=["Metamarkers", "Output", "Gradient Match", "Rationale", "Source"], col_count=(5, "fixed"), wrap=True, visible= True, interactive= True) with gr.Group(): with gr.Row(): save_df_btn = gr.Button(value ="Save Data Frame Result πŸ’Ύ", visible=True, interactive= False, scale=1) with gr.Row(): csv_export = gr.File(label="Export DataFrame", scale=5, visible=False) dl_df_btn = gr.Button(value ="Download Data Frame πŸ“₯", visible=False, interactive= True, scale=1) copy_df_btn = gr.Button(value ="Copy Data Frame πŸ“‹", visible=False, interactive= True, scale=1) files_insight_extract = gr.HTML(label="Insight Extracted", value="") with gr.Column(scale=6): terminal_batch = gr.Code(language="shell", lines=4, elem_id="terminal", interactive= False, label="Shell Log") with gr.Tab('E.V.A'): with gr.Column(scale=6): temp_slider = gr.Slider(label="Temperature", value=0.3, minimum=0.0, maximum=1.0, step=0.05, interactive=True, visible= False) max_new_tokens_slider = gr.Slider(label="Max new tokens", value=350, minimum=0, maximum=1097, step=64, interactive=True, visible= False) top_p_slider =gr.Slider(label="Top-p", value=0.3, minimum=0.0, maximum=1, step=0.05, interactive=True, visible= False) rep_pen_slider = gr.Slider(label="Repetition penalty", value=0.87, minimum=0, maximum=2.0, step=0.05, interactive=True, visible= False) pres_pen_slider = gr.Slider(label="Presence penalty", value=0, minimum=0, maximum=2.0, step=0.05, interactive=True, visible= False) copilot_instruct = gr.Textbox(label="Instruct", visible= False, value="Read Carefully the Background. Base your answers and rationale on the Background provided. If no Background provided, answer that you need more data to provide an answer. 1. FIRST ALWAYS state a concrete: Yes, Maybe/Unclear or No at the start of your answer. 2. Follow IMMEDIATELY with 1-2 sentence reasoning citing evidence. You will be asked a PRIMARY core question, POSSIBLY accompanied by extra context. Analyze any additional context if present, BUT base your FINAL determination PRIMARILY on confirmation of the KEY DETAILS in the MAIN PRIMARY question itself: If BACKGROUND confirms the KEY PRIMARY DETAILS, mark Yes and cite quote evidence. If SOME but NOT all PRIMARY DETAILS can be confirmed, mark Maybe/Unclear and cite partial evidence. ONLY mark No IF 20% or less of the PRIMARY question details have confirmation and provide a reasoning.", placeholder="Instruct", lines=1, interactive= True) copilot_entry = gr.Textbox(label="How can I help?", value="", placeholder="Gain Insight in Results", lines=1, interactive= True) copilot_insight_result = gr.HTML(label="blank_ Insight", value="") with gr.Row(): with gr.Column(scale=6): #rationale = gr.Textbox(label="Captured Rationale", value="", placeholder="Rationale", lines= 15, interactive= False) saved_insight_df_gr = gr.Dataframe(headers=["Metamarkers", "Output", "Gradient Match", "Rationale", "Source"], col_count=(5, "fixed"), wrap=True, visible= True, interactive= True) empty_df_btn = gr.Button(value ="Empty Data Frame πŸ—‘οΈ", visible=True, interactive= False, scale=1) with gr.Column(scale=6): terminal = gr.Code(language="shell", lines=4, elem_id="terminal", interactive= False, label="Shell Log") with gr.Tab('Real-time Profiling'): with gr.Column(scale=6): with gr.Row(): with gr.Column(scale=4): chatbot = gr.ChatInterface(generate,additional_inputs=additional_inputs) #with gr.Row(): # profile_score = gr.Textbox(label="Insight General Score", value="", lines=1, interactive= False) # profile_labels = gr.Label(label="Insight Metamarkers", value="", scale=1) with gr.Column(scale=2): profile_score = gr.Textbox(label="Insight General Score", value="", lines=1, interactive= False) profile_labels = gr.Label(label="Insight Metamarkers", value="", scale=1) chart_plot = gr.Plot(fig) with gr.Column(scale=6): terminal = gr.Code(language="shell", lines=4, elem_id="terminal", interactive= False, label="Shell Log") #buildBP.click(build_structure, [title, gradients, blueprint, metamarkers]) bp_selector.select(fn=get_bp, inputs=bp_selector, outputs=[blueprint_textbox, blueprint_structure, phantom_html, gradient_scale, threshold_enabled, extract_insight_btn]) threshold_enabled.change(fn=threshold_condition, inputs=[threshold_enabled, blueprint_structure], outputs=[threshold_range, threshold_operator, n_dependency]) #dl_df_btn.click(df_to_csv, inputs=[insight_df], outputs=[fileobj]) #copy_df_btn.click(copy_df, inputs=[insight_df]) chatbot.textbox.submit(fn=update_scores,inputs=[chatbot.textbox, blueprint_structure], outputs=[profile_score, profile_labels, chart_plot]) """gen_gb_btn.click( generate_bp, inputs=[blueprint_title, blueprint_description], outputs=[blueprint_generated], api_name=False, queue=False, )""" # when files are uploaded to files_batch run plot_timeline files_batch.change( plot_timeline, inputs=[files_batch], outputs=[timeline_plot], api_name=False, queue=False, ) extract_insight_btn.click( extract_insight, inputs=[files_batch, blueprint_structure, get_rationale, threshold_enabled, threshold_range, threshold_operator, n_dependency, flash_attn], outputs=[files_insight_extract, insight_df, save_df_btn, terminal_batch], api_name=False, queue=False, ) save_df_btn.click( save_df, inputs=[insight_df], outputs=[saved_insight_df_gr, empty_df_btn], api_name=False, queue=False, ) empty_df_btn.click( empty_saved_df, inputs=[], outputs=[saved_insight_df_gr], api_name=False, queue=False, ) # call funciton when copilot_entry is submitted copilot_entry_phantom.submit( get_copilot_insight, inputs=[copilot_entry_phantom, feed_insight, temp_slider, max_new_tokens_slider, top_p_slider, rep_pen_slider, pres_pen_slider, copilot_instruct_phantom], outputs=[copilot_insight_result_phantom], api_name=False, queue=False, ) copilot_entry.submit( get_copilot_insight, inputs=[copilot_entry, saved_insight_df_gr, temp_slider, max_new_tokens_slider, top_p_slider, rep_pen_slider, pres_pen_slider, copilot_instruct], outputs=[copilot_insight_result], api_name=False, queue=False, ) feed.submit( annotate, inputs=[feed, blueprint_structure], outputs=[feed_insight, feed_details, feed_score, feed_labels], api_name=False, queue=False, ) annotate_btn.click( annotate, inputs=[feed, blueprint_structure], outputs=[feed_insight, feed_details, feed_score, feed_labels], api_name=False, queue=False, ) annotate_url_btn.click( annotate_url, inputs=[feed_url, blueprint_structure], outputs=[feed_insight, feed_details, feed_score, feed_labels], api_name=False, queue=False, ) if __name__ == "__main__": demo.launch()