Blankstate Logo
Initialising Playground...
import random import gradio as gr import pandas as pd import io import micropip from collections import defaultdict ## Install extra packages #await micropip.install('regex') #package_list = micropip.list() #print(package_list) #import requests import json, re import os import js, base64 await micropip.install('plotly') #await micropip.install('pdfminer.six==20221105') await micropip.install('python-docx') await micropip.install("lzma") await micropip.install('py3langid') #await micropip.install('emoji') #await micropip.install('nltk') #import nltk #from nltk.tokenize import PunktTextTilingTokenizer import plotly.graph_objects as go #import pdfminer.high_level as hl import docx #import emoji import py3langid as langid from datetime import datetime from pyodide.http import pyfetch #HF_TOKEN = "API_KEY" #print("HF Token: ", HF_TOKEN) HF_TOKEN = "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" #HF_TOKEN = "" #HF_TOKEN = "" #print("HF Token: ", HF_TOKEN) COGN_API_URL = "https://kpd7if9sc5qcx4oe.us-east-1.aws.endpoints.huggingface.cloud" API_URL = "https://zry5b3k46dws8zor.us-east-1.aws.endpoints.huggingface.cloud" headers = { "Authorization": "Bearer "+HF_TOKEN+"", "Content-Type": "application/json", } bp_ground_struct = { "name": "Corporate Policies", "gradients": [ {"Significant": 1.0}, {"High": 0.8}, {"Moderate": 0.6}, {"Low": 0.4}, {"Minimal": 0.2}, ], "blueprint": [ "Communication Policy", "Information Handling", "Conflicts of Interest", "Fair Dealing" ], "metamarkers": [ { "Communication Policy": [ {"Casual conversation": 1}, {"Constructive feedback provided": 0.8}, {"Respectful tone even in disagreement": 0.8}, {"Occasional unprofessional remarks": 0.2}, {"Manipulative language": 0.1}, {"Abusive, inflammatory language": 0} ], "Weight": 1 }, { "Information Handling": [ {"Casual conversation": 1}, {"Proper access controls and encryption": 0.8}, {"Reasonable info protections in place": 0.6}, {"Inadvertent exposure addressed responsibly": 0.4}, {"Repeated minor unauthorized exposures": 0.2}, {"Sharing passwords": 0.2}, {"Intentional unauthorized exposure": 0} ], "Weight": 1 }, { "Conflicts of Interest": [ {"Casual conversation": 1}, {"Proactively discloses all conflicts": 0.8}, {"Abstains appropriately when conflicts arise": 0.6}, {"May neglect occasional minor conflicts": 0.4}, {"Fails to abstain from clear conflicts": 0.2}, {"Actively hides personal interests": 0} ], "Weight": 1 }, { "Fair Dealing": [ {"Casual conversation": 1}, {"Models equitable business practices": 0.8}, {"Demonstrates transparent process": 0.6}, {"Isolated minor process issues": 0.4}, {"Repeated unfair/non-transparent actions": 0.2}, {"Providing special treatment": 0.1}, {"Systematic discriminatory actions": 0} ], "Weight": 1 } ] } async def query(payload): data = payload #print("Data JSON: ", data) response = await pyfetch(API_URL, method="POST", body=json.dumps(data), headers=headers) #print("API Response: ", response) response_json = await response.json() #print("API Response JSON: ", response_json) return response_json async def query_summ(payload): data = payload #response = requests.request("POST", COGN_API_URL, data=data, headers=headers) response = await pyfetch(COGN_API_URL, method="POST", body=json.dumps(data), headers=headers) #print("API Response: ", response) response = await response.json() #print("API Response JSON: ", response) return response async def query_profile(input, blueprint, lang=False): details= True local = True max_retries = 5 if lang: output = ({ "inputs": input, "blueprint": [blueprint, 'BlueprintScore'], "parameters": [{"details":details}, {"local": local}, {"lang": lang}] }) else: output = ({ "inputs": input, "blueprint": [blueprint, 'BlueprintScore'], "parameters": [{"details":details}, {"local": local}] }) #print("In query_profile with Output: ", output) # Calling API with payload #response = requests.post(COGN_API_URL, headers=headers, json=output) for retry in range(max_retries): try: response = await pyfetch(COGN_API_URL, method="POST", body=json.dumps(output), headers=headers) break except: print("API request failed, retrying attempt %d" % (retry+1)) if not response: response = "" print("API failed after %d retries" % max_retries) response = await response.json() #print("API Response JSON: ", response) # Returning JSON output return response async def get_bp_structure(bp_id): try: output = ({ "inputs": "", "get_blueprint_structure": bp_id }) # Calling API with payload #response = requests.post(COGN_API_URL, headers=headers, json=output) response = await pyfetch(COGN_API_URL, method="POST", body=json.dumps(output), headers=headers) response = await response.json() except Error as e: print(e) return "Pick a Blueprint from the list." # Uncomment for dataframe """ data = response df_rows = [] for section in data["m_structure"]: for question, options in section.items(): if question != "Weight": for option in options: score = list(option.values())[0] text = list(option.keys())[0] row = [question, text, score] df_rows.append(row) df = pd.DataFrame(df_rows, columns=["Metamarkers", "Nuances", "Gradient Scale"]) """ #return gr.DataFrame(df) return response["m_structure"] copilot_instruct_global = "" def format_prompt(message, history, task, metamarker=None, proof=None, outcome=None): global copilot_instruct_global prompt = "<s>" if task == "summarise": print("Summarise task") message = str(message) prompt += f"[INST]Summarise the following Client's Suitability Review: {message}.[/INST]" elif task == "checklist": print("Checklist Prep task") message = str(message) prompt += f"[INST]Extract important information from this old report and list them into bullet points. Seperate it into 'Financial Profile' and 'Personal Circumanstances' categories. Old Report:\n {message} [/INST]" elif task == "insight": #print("Investment Goal Evolution task") #print("Conversation: ", message) #print("Summary History: ", history) message = str(message) prompt += f"[INST]In two sentences, Generate a summary of the Client's evolving investment goal appetite between this recent conversation: {message} \n and the past Client report with their old appetite: {history}[/INST]" elif task == "genBP": print("generate BP") prompt += f"[INST]Generate a JSON following this exact structure:\n {bp_ground_struct} \n -and adapt name, blueprint, metamarkers cues parameters based on the following name and description: {message}. \n - The metamarkers cues should be detectable from written interactions.[/INST]" elif task == "gen rationale": print("gen rationale") prompt += f"[INST] Do NOT directly quote the instructions. Criteria: {metamarker} Proof: {proof}. Base your answer on the Proof. In YOUR OWN WORDS, provide a 2 SENTENCE plain language rationale no more than 40 words long explaining why this Outcome occurred. Outcome: '{outcome}'. At the end, list the decision and important dates, cite the important figures and scores supporting the Outcome. [/INST]" elif task == "copilot insight": print("gen copilot insight") #Format message # Normalize whitespace message = " ".join(message.split()) # Remove extra line breaks message = re.sub(r"\n\s*", " ", message) # Strip extra spaces between words message = re.sub(r"\s\s+", " ", message) # Remove hyphen bullet points message = re.sub(r"\-\s*", "", message) # Standardize space around punctuation message = re.sub(r"\s+([\.?,!:])", r"\1", message) prompt += f"[INST]{copilot_instruct_global} \n\n Background: {history}\n\n Question: {message}\n\n[/INST]" else: print("Other Task") #print("Conversation: ", message) prompt += f"[INST]You are my dedicated advisor from HSBC, You will try to help me find the best possible investment solution. Let's have a chat.\n\n- Don't repeat yourself.\n- Keep your messages relatively short.\n {message} [/INST]" return prompt async def generate(prompt, history, task, temperature, max_new_tokens, top_p, repetition_penalty, presence_penalty, metamarker=None, proof=None, outcome=None): print("In generate", prompt) if temperature == None and task == None: task = "" temperature = 0.9 max_new_tokens = 250 top_p = 0.9 repetition_penalty = 1.0 presence_penalty = 1.5 payload = { 'inputs': format_prompt(prompt, history, task, metamarker, proof, outcome), 'parameters': { 'temperature': temperature, 'max_new_tokens': max_new_tokens, 'top_p': top_p, 'repetition_penalty': repetition_penalty, 'presence_penalty': presence_penalty, 'generator': True, 'do_sample': False, 'use_cache': True } } print("Payload: ", payload) #demo.update() #state.profile_scores = scores try: output = await query(payload) #print("Output: ", output) except Exception as e: print(e) return "Error: API request unsuccessful. Please wait a few minutes and try again." try: return output[0]["generated_text"] except Exception as e: print(e) return output[0] ### Generate Blueprint ### async def generate_bp(blueprint_title, blueprint_description): print("In Generate Blueprint") try: bp_direction = f"name: {blueprint_title}\nDescription: {blueprint_description}" #bp_direction = f"{blueprint_description}" blueprint_json = await generate(bp_direction, "", "genBP", 0.9, 1097, 0.9, 1, 1.1) blueprint_json = re.sub(r'^[^{}]*', '', blueprint_json) # Remove before the first blueprint_json = re.sub(r'}[^{}]*$', '}', blueprint_json) # Remove after the last #print("Blueprint JSON: ", blueprint_json) # load text to a json object blueprint_json = json.loads(blueprint_json) #print("Blueprint JSON loaded: ", blueprint_json) return blueprint_json except Exception as e: print(e) return "Add a Blueprint Title and Description" # Create the initial figure fig = go.Figure( layout=dict( title='Blueprint Scatter Graph', xaxis_title='Timestamp', yaxis_title='Score'), ) # Define marker properties marker_size = 10 marker_color = [0] # Generate random marker colors marker_colorscale = 'Plasma' # Choose a color scale show_marker_scale = True # Create the initial trace with placeholder marker properties initial_trace = go.Scatter(x=[datetime.now(), datetime.now()], y=[0, 10], mode='markers', showlegend=False,) # Add the marker properties to the initial trace initial_trace.update(marker=dict( cmax=10, cmin=0, size=1, color=marker_color, colorscale=marker_colorscale, showscale=show_marker_scale )) async def ocr_file_base64(file_path): print("In OCR") with open(file_path, 'rb') as file: encoded_file = base64.b64encode(file.read()).decode('utf-8') data = { "inputs": "", "extract_text": encoded_file } print("Data JSON: ", data) response = await pyfetch(COGN_API_URL, method="POST", body=json.dumps(data), headers=headers) print("API Response: ", response) response = await response.json() print("API Response JSON: ", response) # Returning JSON output return response # Add the initial trace to the figure fig.add_trace(initial_trace) async def update_scores(prompt, blueprint): try: blueprint = json.loads(blueprint) bp_result = await query_profile(prompt, blueprint) #print("Blueprint Result", bp_result) score = round((float(bp_result[0]["BlueprintScore"])),2) #print("Scores: ", score) if score < 0: score = 0.00 # empty bp_result[1]["mw_score"] to avoid error bp_result[1]["mw_score"] = {} scores = [score] #print("Scores: ", scores) for i in range(len(scores)): #print("Individual scores: ", scores[i]) new_trace = go.Scatter(x=[datetime.now()], y=[float(scores[i])*10], mode='markers', showlegend=False, marker=dict( cmax=10, cmin=0, size=10, color=[float(scores[i])*10] , #set color equal to a variable colorscale='Plasma', # one of plotly colorscales showscale=False ) ) fig.add_trace(new_trace) return score, bp_result[1]["mw_score"], gr.Plot(fig) except Exception as e: print(e) return "Select a Blueprint First", "", gr.Plot(fig) def sliding_window_sentences(text, window_size=3, step_size=1): print("In sliding_window") """ Args: text (str): The text to split. window_size (int): Number of sentences in each window. step_size (int): Number of sentences to shift the window. Yields: list: A list of stripped sentences representing the current window. """ sentences = [] for i, char in enumerate(text): if char in '.?!\n': sentences.append(char) if len(sentences) == window_size: yield [sentence.strip() for sentence in sentences[:window_size]] sentences = sentences[step_size:] if sentences: yield [sentence.strip() for sentence in sentences] global_total_score = 0 global_num_sentences = 0 global_meta_scores = [] global_metamarker_averages = {} scored_sentences = [] #Format Text and call model async def format_text(text, fileName, blueprint): print("In format_text") global global_total_score global global_num_sentences global global_meta_scores global global_metamarker_averages global scored_sentences #lowercase #text = text.lower() #remove punctuation #text = text.translate(str.maketrans('', '', string.punctuation)) #remove whitespace #remove empty strings #text = list(filter(None, text)) # Define a regex pattern for splitting sentences #sentence_pattern = re.compile(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=[.?!\n])|(?<=\n\n)') #sentence_pattern = re.compile(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=[.?!\n])|(?<=\n\n(?![^\w\s]))') #sentence_pattern = re.compile(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.)(?<!\d\.)(?=\d)|(?<=[.?!\n])|(?<=\n\n(?![^\w\s]))|(?<!\d)\.(?!\d)') #sentence_pattern = re.compile(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?|!|\n)\s') # Use the pattern to split the text into sentences sentence_pattern = re.compile(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?|!|\n)(?!\d\.)(?=\d)\s|(?<=[?!\n])|(?<!\d)\.(?!\d)') sentences = sentence_pattern.split(text) # with nltk #sentences = nltk.sent_tokenize(text) #windows = sliding_window_sentences(text) #sentences = list(windows) print("Sentences: ", sentences) scored_sentences_str = [] total_score = 0 num_sentences = 0 meta_sums = defaultdict(float) meta_counts = defaultdict(int) # for each sentence run Model for sentence in sentences: # Skip empty sentences if not sentence.strip(): continue bp_result = await query_profile(sentence, blueprint) #print("Blueprint Result", bp_result) #print("Blueprint Nuances Scores", bp_result[2]["n_scores"]) score = round((float(bp_result[0]["BlueprintScore"])),2) #print("Scores: ", score) if score >= 0: total_score += score num_sentences += 1 metamarkers_score = bp_result[1]["mw_score"] #print("Metamarkers Scores: ", metamarkers_score) for key, value in metamarkers_score.items(): #print("Metamarkers Key: ", key) #print("Metamarkers Value: ", value) meta_sums[key] += value meta_counts[key] += 1 #print("Metamarkers Sums: ", meta_sums) # Create a string with the metamarkers scores meta_str = "" for key, value in metamarkers_score.items(): meta_str += f"{key}: {value} | " #print("Metamarkers String: ", meta_str) nuances_score = bp_result[2]["n_scores"] #print("Nuances Scores: ", nuances_score) #Trim trailing | meta_str = meta_str[:-3] # append scored_sentences with the sentence and its score and metamarkers scores and nuances_scores and fileName scored_sentences_str.append([sentence, score, meta_str]) scored_sentences.append([sentence, score, metamarkers_score, nuances_score, fileName]) # Calculate averages ## scores if num_sentences > 0: avg_score = total_score / num_sentences else: avg_score = 0 ## metamarkers meta_avgs = {} for key in meta_sums.keys(): meta_avgs[key] = round(meta_sums[key] / meta_counts[key], 2) #print("Metamarkers Averages: ", meta_avgs) # Update globals global_total_score += avg_score global_meta_scores.append(meta_avgs) meta_avg_str = "" for key, value in meta_avgs.items(): meta_avg_str += f"    {key}: {value} |</br>" # Trim last | meta_avg_str = meta_avg_str[:-3] #text = f"Start of File: <b>{fileName}</b></br></br> <b>General Score: {avg_score:.2f}</b> </br></br><b>File Score per Metamarker: </b> </br> {meta_avg_str} </br></br> <details><summary><b>Insight Details </b></summary></br>" text = f"Start of File: <b>{fileName}</b></br></br> <b>General Score: {avg_score:.2f}</b> </br></br><b>File Score per Metamarker: </b> </br> {meta_avg_str} </br></br> <details><summary" # comment to hide details #text += "".join([f"Sub-Segment: '{sentence}' -- <mark>-Weighted Score: <b>{score}</b></mark></br>   -- MetaScores: <i>{meta}</i></br></br>" for sentence, score, meta in scored_sentences_str]) #text += "</details></br> <b>-- End of File -- </b></br></br></br>" text = "Start of File: "+fileName+"\n\n"+re.sub('\s+',' ', text)+"\n\n -- End of File -- \n\n\n" return text triggers_list = [] num_triggers = 0 # Extract text from files async def extract_insight(files, blueprint, get_rationale = False, threshold_enabled = False, threshold_range = None, threshold_operator = None, n_dependency = None): print("In extract_insight") blueprint = json.loads(blueprint) # reset global values for new batch global global_total_score global global_meta_scores global global_metamarker_averages global scored_sentences global_total_score = 0 global_meta_scores = [] global_metamarker_averages = {} scored_sentences = [] global triggers_list global num_triggers #print("Triggers List: ", triggers_list) #print("Num Triggers: ", num_triggers) if files == None: return "Please Upload a Supported File First" , "" try: batched_text = "" for file in files: file_path = file.name file_name = os.path.basename(file.name) if file_path.endswith('.pdf'): text = "" text = await ocr_file_base64(file_path) print("OCR Text extracted: ", text) elif file_path.endswith('.docx'): document = docx.Document(file_path) text = "" text = "\n".join(paragraph.text + " " for paragraph in document.paragraphs) #print("Text extracted: ", text) batched_text += await format_text(text, file_name, blueprint) #bp_result = await query_profile(batched_text, blueprint) #score = round((float(bp_result[0]["BlueprintScore"])),2) # After processing all files num_files = len(files) # divide global total avg_score by number of files global_avg_score = global_total_score / num_files #print("Global Average Score: ", global_avg_score) for meta_scores in global_meta_scores: # Iterate over each metamarker in the file for metamarker, score in meta_scores.items(): # Update the running total for each metamarker global_metamarker_averages.setdefault(metamarker, 0) # divide global total avg_score by number of files global_metamarker_averages[metamarker] += score / num_files # round to 2 decimals global_metamarker_averages[metamarker] = round(global_metamarker_averages[metamarker], 2) # Calculate the average for each metamarker gradients = blueprint["gradients"] sorted_gradients = sorted(blueprint["gradients"], key=lambda x: list(x.values())[0]) for metamarker in global_metamarker_averages: #global_metamarker_averages[metamarker] = round(global_metamarker_averages[metamarker] / num_files, 2) #print("Global Metamarkers Averages: ", global_metamarker_averages) global_meta_avg_str = "" rows = [] for key, value in global_metamarker_averages.items(): global_meta_avg_str += f"    {key}: {value} |</br>" trigger_outcome = "Uncat" highest_gradient = 0 for gradient in gradients: for gradient_key, gradient_value in gradient.items(): if value >= gradient_value and gradient_value >= highest_gradient: trigger_outcome = gradient_key highest_gradient = gradient_value row = { "Metamarkers": key, "Output": value, "Gradient Match": trigger_outcome, "Rationale": "", "Source": "" } rows.append(row) # Iterate through each scored sentence and update the Rationale lower_bound = 0.0 higher_bound = 1.0 rationale_gen = "" document_proof = [] document_name = "" previous_doc = "" tracked_nuances = {} nuance_weight = 0 global_avg_score = 0 metamarkers_global_weights = 0 for sentence_data in scored_sentences: sentence = sentence_data[0] sentence_scores = sentence_data[2] sentence_n_scores = sentence_data[3] document_name = sentence_data[4] #print("Sentence nuances scores: ", sentence_n_scores) #HACK to add to model handler as parameter # for each sentence get the highest score of the metamarkers nuance score sum it to itself. if the nuance was already found as highest score from the previous sentence in this metamarker, don't sum it up # add find and break option # check if previous nuance is similar but current score is bigger then keep the biggest one overriding the previous one with the same nuance key by removing its score from the total. if n_dependency: # reset global avg score for metamarker in sentence_scores: #sentence_scores[metamarker] = 0 top_nuance = max(sentence_n_scores[metamarker], key=sentence_n_scores[metamarker].get) #print("Top Nuance Score: ", sentence_n_scores[metamarker][top_nuance]) # if sentence_n_scores[metamarker][top_nuance] exist and is bigger than previous sentence_n_scores[metamarker][top_nuance] if top_nuance not in tracked_nuances.get(metamarker, []): print("Top Nuance not tracked yet or new nuance score greater than previous") tracked_nuances.setdefault(metamarker, []).append(top_nuance) # this score is correct #print(f"Sentence Score Before adding: {sentence_scores[metamarker]}") #print(f"Metamarker: {metamarker} | Top Nuance: {top_nuance} | Score: {sentence_n_scores[metamarker][top_nuance]} | Sentence: {sentence}") # get the nuance weight from the blueprint associated with the top_nuance and metamarker #print("Blueprint metamarkers: ", blueprint["metamarkers"]) top_nuance_strip = top_nuance.rstrip('.') for metamarker_dict in blueprint["metamarkers"]: if metamarker in metamarker_dict: categories = metamarker_dict[metamarker] for category in categories: if top_nuance_strip in category: nuance_weight = category[top_nuance_strip] break break #print(f"Nuance Weight {nuance_weight}") sentence_scores[metamarker] += nuance_weight * sentence_n_scores[metamarker][top_nuance] #sentence_scores[metamarker] += sentence_n_scores[metamarker][top_nuance] #print(f"Sentence Score after adding: {sentence_scores} with Nuance weight: {nuance_weight}") if sentence_n_scores[metamarker][top_nuance] > sentence_n_scores[metamarker][tracked_nuances[metamarker][0]]: #print("Top Nuance not tracked yet or new nuance score greater than previous") tracked_nuances.setdefault(metamarker, []).append(top_nuance) # this score is correct #print(f"Sentence Score Before adding: {sentence_scores[metamarker]}") #print(f"Metamarker: {metamarker} | Top Nuance: {top_nuance} | Score: {sentence_n_scores[metamarker][top_nuance]} | Sentence: {sentence}") # get the nuance weight from the blueprint associated with the top_nuance and metamarker #print("Blueprint metamarkers: ", blueprint["metamarkers"]) top_nuance_strip = top_nuance.rstrip('.') for metamarker_dict in blueprint["metamarkers"]: if metamarker in metamarker_dict: categories = metamarker_dict[metamarker] for category in categories: if top_nuance_strip in category: nuance_weight = category[top_nuance_strip] break break #print(f"Nuance Weight {nuance_weight}") #TODO # if new sentence_n_scores[metamarker][top_nuance] is bigger than the tracked previous one for the same nuance then remove the previous score from sentence_scores[metamarker] and then += the new one with the associated nuance weight # add the score of the top nuance to the metamarker score multiplied by the blueprint nuance weight # remove the previous sentence_n_scores[metamarker][top_nuance] from sentence_scores[metamarker] and then += the new one with the associated nuance weight #print("New Nuance Score is bigger than previous one") #print(f"Previous Nuance Score: {sentence_n_scores[metamarker][tracked_nuances[metamarker][0]]} for metamarker {metamarker} and nuance {tracked_nuances[metamarker][0]}") #print(f"New Nuance Score: {sentence_n_scores[metamarker][top_nuance]} for metamarker {metamarker} and nuance {tracked_nuances[metamarker][0]}") sentence_scores[metamarker] -= nuance_weight * sentence_n_scores[metamarker][tracked_nuances[metamarker][0]] sentence_scores[metamarker] += nuance_weight * sentence_n_scores[metamarker][top_nuance] #print(f"Sentence Score after adding: {sentence_scores} with Nuance weight: {nuance_weight}") # Iterate through metamarkers #print("Sentence Data Score before loop: ", sentence_scores) for metamarker, score in sentence_scores.items(): #print("Sentence Data Score in loop to update Dataframe: ", sentence_scores) # Find the corresponding row in the table #print(f"Metamarker: {metamarker} | Score: {score} | Sentence: {sentence}") for i, row in enumerate(rows): if row["Metamarkers"] == metamarker: # Check if the sentence score falls between the current and next gradient values lower_bound, higher_bound = find_lower_and_higher(row["Output"], sorted_gradients.copy()) #print("Lower Bound: ", lower_bound) #print("Higher Bound: ", higher_bound) # if threhold enabled, if sentence score is above threshold_range then change Output value to sentence score, stop looping through metamarkers if threshold_enabled: print("Threshold Enabled") #print("Threshold Range: ", threshold_range) threshold_range_value = list(threshold_range.values())[0] if eval(str(score) + threshold_operator + str(threshold_range_value)): #print(f"Score {threshold_operator} {threshold_range_value}") # Split source into docs docs = row["Source"].split("\n") # Remove empty strings docs = list(filter(None, docs)) # Check if current doc seen if document_name not in row["Source"]: row["Source"] += f" File: {document_name}  " # Append the sentence to the "Rationale" with appropriate formatting score = round(score, 3) # Check if Output is already assigned if "Output" not in row or eval(str(score) + threshold_operator + str(row["Output"])): if score > 1: score = 1 row["Output"] = score trigger_outcome = "Uncat" highest_gradient = 0 for gradient in gradients: for gradient_key, gradient_value in gradient.items(): if row["Output"] >= gradient_value and gradient_value >= highest_gradient: trigger_outcome = gradient_key highest_gradient = gradient_value #print("Trigger Outcome: ", trigger_outcome) #print("Highest gradient: ", highest_gradient) row["Gradient Match"] = trigger_outcome #print("Score: ", score) row["Rationale"] += f"{sentence} | \n " #row["Rationale"] += f"{sentence} {score} |  " break elif score >= lower_bound and score <= higher_bound: # Split source into docs docs = row["Source"].split("\n") # Remove empty strings docs = list(filter(None, docs)) # Check if current doc seen if document_name not in row["Source"]: row["Source"] += f" File: {document_name}  " # Append the sentence to the "Rationale" with appropriate formatting score = round(score, 3) row["Rationale"] += f"{sentence} {score} |  " print("Updated Rationale:", row["Rationale"]) # append to document_proof the sentence and its score for associated to the metamarker and the fileName #document_proof.append([sentence, score, metamarker, sentence_data[3]]) #print("Document Proof: ", document_proof) break # Stop searching for the row once found #print("Sentence Scores: ", sentence_scores) # Generate written rationale if get_rationale: for row in rows: # if rationale is not empty if row["Rationale"] != "": #TODO count characters length, if more than 10000 , then split it and generate the first part of the rationale # if rationale is longer than 10000 characters then split it in equal parts base don the total length and back to the previous punctuation and generate the first part of the rationale then loop through the rest of the parts and generate the rest of the rationale max_tokens = 3900 buffer = 100 tokens = re.split(r'\W+',row["Rationale"]) if len(tokens) > max_tokens: print("Rationale is longer than 10000 characters") print("Rationale Length: ", len(row["Rationale"])) print("Rationale: ", row["Rationale"]) print("Rationale Type: ", type(row["Rationale"])) #rationale_parts = [row["Rationale"][i:i+max_tokens] for i in range(0, len(row["Rationale"]), max_tokens)] #print("Rationale Parts Split: ", rationale_parts) rationale = row["Rationale"] rationale_parts = [] while len(rationale) > 0: part = rationale[:max_tokens] rationale_parts.append(part) rationale = rationale[max_tokens:] generated_rationales = [] for part in rationale_parts: generated = await generate("", "", "gen rationale", 0.3, 250, 0.3, 1.15, 0, row["Metamarkers"], part, row["Gradient Match"]) generated_rationales.append(generated) full_rationale = " ".join(generated_rationales) row["Rationale"] = full_rationale else: row["Rationale"] = await generate("", "", "gen rationale", 0.3, 250, 0.3, 1.15, 0, row["Metamarkers"], row["Rationale"], row["Gradient Match"]) #print("Global Metamarker Avg.: ", global_metamarker_averages.items()) # Uncomment to obtain manual Triggers """ if num_triggers>0: for key, value in global_metamarker_averages.items(): trigger_outcome = "" #print("Num Triggers: ", num_triggers) for i in range(num_triggers): trigger = triggers_list[i] name = trigger[0] operator = trigger[1] threshold = trigger[2] if eval(str(value) + operator + str(threshold)): if trigger_outcome == "": trigger_outcome = name else: trigger_outcome += ", " + name row = { "Metamarkers": key, "Meta_Score": value, "Trigger Outcome": trigger_outcome, "Rationale": "" } rows.append(row) else: for key, value in global_metamarker_averages.items(): row = { "Metamarkers": key, "Meta_Score": value, "Trigger Outcome": "", "Rationale": "" } rows.append(row) data_for_dataframe = pd.DataFrame(rows) """ # sum the total of row["Output"] from each rows print("Rows: ", rows) for row in rows: print("Row: ", row) for metamarkers in blueprint["metamarkers"]: print("Metamarkers: ", metamarkers) if row["Metamarkers"] in metamarkers: metamarkers_global_weights+= metamarkers["Weight"] print("Metamarkers Global Weights: ", metamarkers_global_weights) global_avg_score += row["Output"] * metamarkers["Weight"] print("Global Average Score: ", global_avg_score) global_avg_score = global_avg_score/metamarkers_global_weights global_avg_score_gradient = "" # round global_avg_score to 2 decimals global_avg_score = round(global_avg_score, 3) #based on global_avg_score find the blueprint gradient it falls into for gradient in sorted_gradients: for gradient_key, gradient_value in gradient.items(): print("Gradient Key: ", gradient_key) if global_avg_score >= gradient_value: print("Global Average Score: ", global_avg_score) print("Gradient Value: ", gradient_value) global_avg_score_gradient = gradient_key break data_for_dataframe = pd.DataFrame(rows) #print("Dataframe: ", data_for_dataframe) # Trim last | global_meta_avg_str = global_meta_avg_str[:-3] print("Metamarkers Global Weights: ", metamarkers_global_weights) insight = "" insight = f"<h4>Gradient Match Outcome: <b>{global_avg_score_gradient}</b></br>Global Weighted Average: <b>{global_avg_score}</b></h4>" #insight = f"<b>Batch General Score: {global_avg_score:.3f}</b> </br></br><b>Batch Metamarkers General Score: </b> </br> {global_meta_avg_str} </br></br></br></br> <details><summary><b>Insight Details </b></summary></br>{batched_text}</details>" # make df_dl_btn visible #dl_df_btn = gr.Button(value ="Download Data Frame πŸ“₯", visible=True, interactive= True) # Save DataFrame to CSV file #csv_filename = "./b_df_export.csv" #data_for_dataframe.to_csv(csv_filename, index=True) #print("ABS Path to CSV File: ", os.path.abspath(csv_filename)) #dfcsv = gr.File(csv_filename, label="Export DataFrame", scale=5, visible=False) #copy_df_btn = gr.Button(value ="Copy Data Frame πŸ“‹", visible=True, interactive= True, scale=1) save_df_btn = gr.Button(value ="Save Data Frame Result πŸ’Ύ", visible=True, interactive= True, scale=1) return insight, data_for_dataframe, save_df_btn except Exception as e: print(e) return "Pick a Blueprint" def find_lower_and_higher(score, gradients): # Sort gradients based on values #print("In find_lower_and_higher") lower_bound = 0 higher_bound = 1 for gradient in gradients: label, value = list(gradient.items())[0] if value <= score: lower_bound = max(lower_bound, value) elif value > score: higher_bound = min(higher_bound, value) return lower_bound, higher_bound async def get_bp(bp_selector): print("In get_bp") #print("Selected Blueprint: ", bp_selector) bp_structure = await get_bp_structure(bp_selector) threshold_enabled = gr.Checkbox(label="Conditional Threshold πŸ”", info="Enable to isolate and focus on specific Outcomes", interactive= True) extract_insight_btn = gr.Button(value ="Extract Insight 🧠", interactive= True) # get blueprint name print("Blueprint Structure: ", bp_structure) bp_name = bp_structure['name'] #get blueprint's blueprint in a List bp_blueprint = bp_structure['blueprint'] bp_gradient = bp_structure['gradients'] phantom_description = f"<center> <h3>Multilingual Behaviour Analysis</h3><p> Detecting relevant interactions and events in a self-supervised manner based on a Protocol avoiding bias and allowing discovery of new unknown interactions. </br><b>πŸ”’ No data retained nor future training required.</b> </p> </center> </br><p><b>Protocole:</b> '{bp_name}' </p><p><b>Markers:</b> '{bp_blueprint}'</p><p><b>Scale:</b> '{bp_gradient}'</p>" scale_gradients = bp_gradient colors = { 0: "#F7FFF7", 0.25: "#FDE0DD", 0.5: "#21918c", 0.75: "#5ec962", 1: "#fde725" } html_content = "<div style='background-color: #0E1428; border-radius: 10px; display: flex; justify-content: space-between;'>" for gradient in scale_gradients: label, value = list(gradient.items())[0] color = colors[min(colors.keys(), key=lambda x: abs(x - value))] html_content += f"<div style='flex: 1; text-align: center;'><span style='color: {color};'>{label}</div>" html_content += "</div>" return bp_selector, gr.Code(value=json.dumps(bp_structure, indent=2, ensure_ascii=False)), phantom_description, html_content, threshold_enabled, extract_insight_btn ## Gradio Interface## max_triggers = 3 pd.set_option('display.max_colwidth', None) pd.set_option('display.max_rows', None) pd.set_option('display.max_columns', None) saved_insight_df = pd.DataFrame() saved_insight_df_gr = gr.Dataframe(headers=["Metamarkers", "Output", "Gradient Match", "Rationale", "Source"], col_count=(5, "fixed"), wrap=True, visible= True, interactive= True) def variable_outputs(k): global num_triggers k = int(k) num_triggers = k outputs = [] for i in range(max_triggers): if i < k: visible = True else: visible = False outputs.append(gr.Textbox(visible=visible, interactive= True)) outputs.append(gr.Radio(["<", ">"], visible=visible, interactive= True)) outputs.append(gr.Slider(visible=visible, interactive= True)) return outputs def get_triggers_list_values(trig1, trig2, trig3, trig4, trig5, trig6, trig7, trig8, trig9): #print("Triggers: ", trig1, trig2, trig3, trig4, trig5, trig6, trig7, trig8, trig9) global triggers_list # add triggers to a 2d list 3 by 3 triggers_list = [] triggers_list.append([trig1, trig2, trig3]) triggers_list.append([trig4, trig5, trig6]) triggers_list.append([trig7, trig8, trig9]) #print("Triggers List: ", triggers_list) def threshold_condition(tc, bp_structure): print("In TC") print("Threshold Condition: ", tc) bp_structure = json.loads(bp_structure) print("Blueprint Structure: ", bp_structure["gradients"]) threshold_range = gr.Dropdown(bp_structure["gradients"], label=f"Threshold Range",info="Select what should be filtered 'in' or 'out' from of the dataframe display", visible=True, interactive= tc) threshold_operator = gr.Radio(["<", ">"], label=f"Threshold Operator",visible=True, interactive= tc) n_dependency = gr.Checkbox(label="Additive Dependency", info="Enable if your metamarkers are likely to interact between each other so their combined appearance (de)escalates outcome", visible=True, interactive= tc) return threshold_range, threshold_operator, n_dependency def build_structure(title, gradients, blueprint, metamarkers): print("In BuildBP") structure = { "name": title, "gradients": [{"Significant": 1.0}], "blueprint": blueprint.splitlines(), "metamarkers": [ {item.split(":")[0].strip(): [{"Casual conversation": 1}] for item in metamarkers.splitlines() if ":" in item} ] } print("Structure: ", structure) # Function to handle button click and generate CSV def df_to_csv(df): print("In df_to_csv") # Save DataFrame to CSV file csv_filename = "b_df_export.csv" df.to_csv(csv_filename, index=False) fileobj = gr.File(csv_filename, label="Exported DataFrame", scale=5, visible=True) return fileobj def copy_df(df): print("In copy_df") # Copy DataFrame to clipboard print("Dataframe: ", df) df.to_clipboard(index=False) return "DataFrame copied to clipboard" def save_df(df): print("In save_df with: ", df) global saved_insight_df # save DF in different DF in Copilot Tab #saved_insight_df = gr.Dataframe(headers=["Metamarkers", "Output", "Gradient Match", "Rationale", "Source"], col_count=(5, "fixed"), wrap=True, visible= True, interactive= True) #TODO Check if new_row exists in df #saved_insight_df = saved_insight_df.append(df) saved_insight_df = pd.concat([saved_insight_df, df]).drop_duplicates().reset_index(drop=True) empty_df_btn = gr.Button(value ="Empty Data Frame πŸ—‘οΈ", visible=True, interactive= True, scale=1) return saved_insight_df, empty_df_btn def empty_saved_df(): print("In empty_saved_df") global saved_insight_df saved_insight_df = pd.DataFrame() saved_insight_df_gr = gr.Dataframe(pd.DataFrame(), headers=["Metamarkers", "Output", "Gradient Match", "Rationale", "Source"], col_count=(5, "fixed"), wrap=True, visible= True, interactive= True) return saved_insight_df_gr async def get_copilot_insight(prompt, df, temp_slider, max_tokens_slider, top_p_slider, rep_pen_slider, pres_pen_slider, copilot_instruct): global copilot_instruct_global copilot_instruct_global = copilot_instruct print("In get_copilot_insight") # get the value of the column rationale from the dataframe rationale = df['Rationale'].astype(str).reset_index(drop=True).values print("Rationale From DF: ", rationale) # generate insight from dataframe copilot_insight = await generate(prompt, rationale, "copilot insight", temp_slider, max_tokens_slider, top_p_slider, rep_pen_slider, pres_pen_slider) return copilot_insight #HACK make redundant functions on API side and multithreaded async def annotate(feed, blueprint): fileName = "Feed" try: blueprint = json.loads(blueprint) print("Blueprint: ", blueprint) #results = await query_profile(feed, blueprint) global global_total_score global global_num_sentences global global_meta_scores global global_metamarker_averages global scored_sentences #lowercase #text = text.lower() #remove punctuation #text = text.translate(str.maketrans('', '', string.punctuation)) #remove whitespace #remove empty strings #text = list(filter(None, text)) # Define a regex pattern for splitting sentences sentence_pattern = re.compile(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?|!|\n)(?!\d\.)(?=\d)\s|(?<=[?!\n])|(?<!\d)\.(?!\d)') # Use the pattern to split the text into sentences sentences = sentence_pattern.split(feed) # with nltk #sentences = nltk.sent_tokenize(text) #windows = sliding_window_sentences(text) #sentences = list(windows) scored_sentences_str = [] total_score = 0 num_sentences = 0 meta_sums = defaultdict(float) meta_counts = defaultdict(int) html = "" sentences_grounded = "" language_code = "" # for each sentence run Model for sentence in sentences: # Skip empty sentences if not sentence.strip(): continue language_code, flag = detect_lang(sentence) if language_code == "en": bp_result = await query_profile(sentence, blueprint) print("Text is English") else: bp_result = await query_profile(sentence, blueprint, lang=True) # Get the top level blueprint score blueprint_score = bp_result[0]["BlueprintScore"] print("Blueprint Score: ", blueprint_score) # Get the metamarker scores metamarkers = bp_result[1]["mw_score"] # Create the annotations annotations = {} mm_score_positive = False for mm, score in metamarkers.items(): annotations[mm] = (str(round(score, 2))) if score > 0: mm_score_positive = True print(annotations) print("Sentence: ", feed) # change the hexadecimal color based on the blueprint score going from gray-blue to green passing by red colors = { 0: "#F7FFF7", 0.25: "#FDE0DD", 0.5: "#21918c", 0.75: "#5ec962", 1: "#fde725" } if score <= 0.05: hex_color = colors[0] text_color = "#000000" elif score >= 1: hex_color = colors[1] text_color = "#FFFFFF" else: lower = max([x for x in colors.keys() if x <= score]) higher = min([x for x in colors.keys() if x >= score]) range = higher - lower progress = (score - lower) / range lower_color = colors[lower] higher_color = colors[higher] r1, g1, b1 = tuple(int(lower_color[i:i+2], 16) for i in (1, 3, 5)) r2, g2, b2 = tuple(int(higher_color[i:i+2], 16) for i in (1, 3, 5)) r = round(r1 + (r2 - r1) * progress) g = round(g1 + (g2 - g1) * progress) b = round(b1 + (b2 - b1) * progress) hex_color = "#{:02x}{:02x}{:02x}".format(r, g, b) # if hexa is dark then text color is white else black text_color = "#FFFFFF" if (r*0.299 + g*0.587 + b*0.114) < 186 else "#000000" highlighted = f"<mark style='background: {hex_color} !important; color: {text_color}'>{sentence}</mark>" #highlighted = f"<mark style='background: #00ced1 !important'> {sentence} </mark>" #print("Blueprint Result", bp_result) #print("Blueprint Nuances Scores", bp_result[2]["n_scores"]) score = round((float(bp_result[0]["BlueprintScore"])),2) #print("Scores: ", score) # only count not grounded sentences if at least one annotation is greater than 0 if mm_score_positive: html += f"{highlighted}<i>    - Gradient Score: </i>{score}</br>    - <i>Metamarkers: </i>{annotations}</br> <img src='{flag}' width='20'/></br>" total_score += score num_sentences += 1 metamarkers_score = bp_result[1]["mw_score"] #print("Metamarkers Scores: ", metamarkers_score) for key, value in metamarkers_score.items(): #print("Metamarkers Key: ", key) #print("Metamarkers Value: ", value) meta_sums[key] += value meta_counts[key] += 1 #print("Metamarkers Sums: ", meta_sums) # Create a string with the metamarkers scores meta_str = "" for key, value in metamarkers_score.items(): meta_str += f"{key}: {value} | " #print("Metamarkers String: ", meta_str) nuances_score = bp_result[2]["n_scores"] #print("Nuances Scores: ", nuances_score) #Trim trailing | meta_str = meta_str[:-3] # append scored_sentences with the sentence and its score and metamarkers scores and nuances_scores and fileName scored_sentences_str.append([sentence, score, meta_str]) scored_sentences.append([sentence, score, metamarkers_score, nuances_score, fileName]) else: # if score 0 or less, put the sentence in a list of sentences grounded sentences_grounded += f"{highlighted}</br> - Gradient Score: </i>{score}</br>    - <i>Metamarkers: </i>{annotations}</br>" # Calculate averages ## scores if num_sentences > 0: avg_score = total_score / num_sentences else: avg_score = 0 ## metamarkers meta_avgs = {} for key in meta_sums.keys(): meta_avgs[key] = round(meta_sums[key] / meta_counts[key], 2) #print("Metamarkers Averages: ", meta_avgs) # Update globals global_total_score += avg_score global_meta_scores.append(meta_avgs) meta_avg_str = "" for key, value in meta_avgs.items(): meta_avg_str += f"    {key}: {value} |</br>" # Trim last | meta_avg_str = meta_avg_str[:-3] #feed_details = f"Start of File: <b>{fileName}</b></br></br> <b>General Score: {avg_score:.2f}</b> </br></br><b>Global Feed Score per Metamarker: </b> </br> {meta_avg_str} </br></br> <details><summary><b>Insight Details </b></summary></br>" feed_details = f"<b>General Score: {avg_score:.2f}</b> </br></br><b>File Score per Metamarker: </b> </br> {meta_avg_str} </br></br> <details><summary" # comment to hide details feed_details += "".join([f"Sub-Segment: '{sentence}' -- <mark>-Weighted Score: <b>{score}</b></mark></br>   -- MetaScores: <i>{meta}</i></br></br>" for sentence, score, meta in scored_sentences_str]) feed_details += "</details></br> <b>-- End of Feed -- </b></br></br></br>" feed_details = "\n\n"+re.sub('\s+',' ', feed_details)+"\n\n\n" # format html in a better way with a header first html = f"<h3>Feed Insight</h3></br>{html} </br> </br><details>" html += f"<b>Grounded Segments: </b></br>{sentences_grounded}</details>" return html, feed_details, round(avg_score,2), meta_avgs except Exception as e: print(e) return sentence def detect_lang(text): """Detect Language""" language_code, _ = langid.classify(text) # Map language codes to emoji flags lang_to_flag = { 'en': 'https://flagcdn.com/w40/us.png', 'es': 'https://flagcdn.com/w40/es.png', 'fr': 'https://flagcdn.com/w40/fr.png', 'de': 'https://flagcdn.com/w40/de.png', 'it': 'https://flagcdn.com/w40/it.png', 'pt': 'https://flagcdn.com/w40/br.png', 'ru': 'https://flagcdn.com/w40/ru.png', 'ja': 'https://flagcdn.com/w40/jp.png', 'ko': 'https://flagcdn.com/w40/kr.png', 'zh': 'https://flagcdn.com/w40/cn.png', 'hi': 'https://flagcdn.com/w40/in.png', 'pt-br': 'https://flagcdn.com/w40/br.png', 'uk': 'https://flagcdn.com/w40/gb.png', } flag = lang_to_flag.get(language_code, 'https://blankstate.ai/wp-content/uploads/2024/01/w.gif') print("Flag: ", flag) # use emoji library and code to display the actual emoji flag #flag = emoji.emojize(flag, language='alias') return language_code, flag additional_inputs = [ gr.Textbox(label="Task", lines=1, value="", interactive= False, visible= False), gr.Slider(label="Temperature", value=0.9, minimum=0.0, maximum=1.0, step=0.05, interactive=True, visible= False), gr.Slider(label="Max new tokens", value=250, minimum=0, maximum=1097, step=64, interactive=True, visible= False), gr.Slider(label="Top-p", value=0.90, minimum=0.0, maximum=1, step=0.05, interactive=True, visible= False), gr.Slider(label="Repetition penalty", value=1.0, minimum=1.0, maximum=2.0, step=0.05, interactive=True, visible= False), gr.Slider(label="Presence penalty", value=1.5, minimum=1.0, maximum=2.0, step=0.05, interactive=True, visible= False), ] css = "style.css" company_logo = "https://blankstate.ai/wp-content/uploads/2023/11/logo_blankstate.ai_dark.png" #ey_company_logo = "https://bberry.ai/wp-content/uploads/2023/09/EY.png" company_banner = "https://blankstate.ai/wp-content/uploads/2023/11/Plan-de-travail-16@2x-8.png" """blueprint_list = [ ["AML High Risk Client Identification","bp_10_AML"], ["Corporate Governance", "bp_4"], ["Corporate Policies", "bp_1"], ["Client Service Assessment", "bp_9_CSA"], ["Risk Profile", "bp_0"], ["Financial Reporting Compliance", "bp_3"], ["Legal Breach Monitoring","bp_2"], ["Portfolio Analysis", "bp_8_PA"], ["Reputation", "bp_5"], ["Client Satisfaction", "bp_11_CH"], ["Fact Finder Requirements", "bp_12_RR_FF"] ]""" blueprint_list = [ ["Fact Find", "bp_12_RR_FF"], ["Risk Profile", "bp_13_RR_RP"], ["Suitability Report", "bp_14_RR_SR"], ["Control Report v1", "bp_18_CTR1"], ["Control Report v2", "bp_19_CTR2"], ["Control Report v3", "bp_20_CTR3"], ["Emerging Markets Equity Risk Analysis", "bp_15_RR_EMERA"], ["Emerging Markets Equity Risk Analysis - BR", "bp_15_RR_EMERA_BR"], ["Emerging Markets Equity Risk Analysis - HI", "bp_15_RR_EMERA_HI"], ["Emerging Markets Equity Risk Analysis - ZH", "bp_15_RR_EMERA_ZH"], ["Corporate Policies", "bp_1"], ["Corporate Policies - BR", "bp_16_COP_BR"], ["Corporate Policies - ZH", "bp_16_COP_ZH"], ["DORA", "bp_17_DORA"], ] """scale_gradients = [ {"Significant": 0.9}, {"High": 0.7}, {"Moderate": 0.5}, {"Low": 0.3}, {"Grounded": 0.1} ]""" scale_gradients =[{"Select a Blueprint": 0.0}] colors = { 0: "#F7FFF7", } html_content = "<div style='background-color: #0E1428; border-radius: 10px; display: flex; justify-content: space-between;'>" for gradient in scale_gradients: label, value = list(gradient.items())[0] color = colors[min(colors.keys(), key=lambda x: abs(x - value))] html_content += f"<div style='flex: 1; text-align: center;'><span style='color: {color};'>{label}</div>" html_content += "</div>" # phantom description on market sentiment and behaviour analysis in emerging markets phantom_description = f"<center> <h3>Multilingual Behaviour Analysis</h3><p> Detecting relevant interactions and events in a self-supervised manner based on a Protocol avoiding bias and allowing discovery of new unknown interactions. </br><b>πŸ”’ No data retained nor future training required.</b> </p> </center>" with gr.Blocks(theme=gr.themes.Base()) as demo: # add gr.header where error message will be displayed with gr.Row(): with gr.Column(scale=2): with gr.Tab('Blueprint Protocole'): with gr.Row(): with gr.Column(scale=1): with gr.Group(): bp_selector = gr.Dropdown(blueprint_list, value="blueprint_list", label="Available Blueprints", interactive= True, visible= True, allow_custom_value=True) blueprint_textbox = gr.Textbox(label="Blueprint", value="", interactive= True, visible= False) with gr.Accordion("See Blueprint Details", open=False, visible=False): blueprint_structure = gr.Code(label="Selected Blueprint Structure", interactive=True, visible= True, language="json", lines=30) #blueprint_structure = gr.Dataframe(label="Selected Blueprint Structure", interactive= False, visible= True) """with gr.Tab('Generate a Blueprint'): with gr.Column(scale=2): with gr.Tab('Blueprint by Definition πŸ”'): with gr.Row(): blueprint_title = gr.Textbox(label="Title", value="Employees Wellness", interactive= True, visible= True) blueprint_description = gr.Textbox(label="Description", value="Detecting employees feeling and sentiment", interactive= True, visible= True) with gr.Tab('Blueprint by Query πŸ€” (coming)'): blueprint_query = gr.Textbox(label="Question", placeholder="[coming soon]", interactive= False, visible= True) gen_gb_btn = gr.Button(value ="Generate Blueprint ✍️", interactive= True, visible= True) blueprint_generated = gr.JSON(label="Generated Structure", interactive= False, visible= True, lines=30) with gr.Tab('User Blueprint BuilderπŸ“'): blueprint_drafted = gr.JSON(label="Blueprint Made From Scratch", interactive= False, visible= True, lines=30)""" with gr.Column(scale=6): with gr.Tab('Phantom'): phantom_html = gr.HTML(label="Description", value=phantom_description, visible=True) with gr.Column(scale=6): with gr.Row(): with gr.Column(scale=5): with gr.Row(): with gr.Column(scale=1): feed = gr.Textbox(label="Feed", value="", lines=30, interactive= True) annotate_btn = gr.Button(value="Run Phantom", interactive= True) #lang_txt = gr.Textbox(label="Main Language Detected", value="🌐", lines=1, interactive= False) with gr.Column(scale=1): gradient_scale = gr.HTML(label="Blueprint Gradient Scale", value=html_content, visible=True) feed_insight = gr.HTML(label="Feed Insight", value="Feed Insight") with gr.Column(scale=1): feed_score = gr.Textbox(label="Insight General Score", value="", lines=1, interactive= False) feed_labels = gr.Label(label="Insight Metamarkers", value="", scale=1) feed_details = gr.HTML(label="Insight Feed Details", value="", visible=False) #chart_plot = gr.Plot(fig) """with gr.Column(scale=4): profile_score = gr.Textbox(label="Insight General Score", value="", lines=1, interactive= False) profile_labels = gr.Label(label="Insight Metamarkers", value="", scale=1) chart_plot = gr.Plot(fig) """ with gr.Column(scale=6): terminal = gr.Code(language="shell", lines=4, elem_id="terminal", interactive= False, label="Shell Log") with gr.Tab('Batch Insight'): with gr.Tab('Historical Data'): with gr.Column(scale=6): with gr.Row(): with gr.Column(scale=6): files_batch = gr.Files(label="Upload Files", height= 600, file_types=['txt', 'docx', 'pdf']) with gr.Column(scale=6): with gr.Group(): with gr.Row(): threshold_enabled = gr.Checkbox(label="Conditional Threshold πŸ”", info="Enable to isolate and focus on specific Outcomes", value=False, interactive= True, scale = 3) threshold_operator = gr.Radio(["<", ">"], label=f"Threshold Operator",visible=True, interactive= False, scale=1) threshold_range = gr.Dropdown(label=f"Threshold Range", info="Select what should be filtered 'in' or 'out' from of the dataframe display", visible=True, interactive= False, scale=2) with gr.Row(): n_dependency = gr.Checkbox(label="Additive Dependency", info="Enable if your metamarkers are likely to interact between each other so their combined appearance (de)escalates outcome", visible=True, interactive= False) """ Uncomment to add manually triggers with gr.Column(scale=6): triggers_num = gr.Slider(0, max_triggers, value=0, label="Triggers", step=1) triggers = [] for i in range(max_triggers): with gr.Row(): trigger_label = gr.Textbox(label=f"Trigger Label {i+1}", visible=False, interactive= True) operator = gr.Radio(["<", ">"], label=f"Operator {i+1}",visible=False, interactive= True) threshold = gr.Slider(label=f"Threshold {i+1}", value=0.5, minimum=0.0, maximum=1.0, step=0.01, visible=False, interactive= True) triggers.append(trigger_label) triggers.append(operator) triggers.append(threshold) triggers_num.change(variable_outputs, triggers_num, triggers) for j in range(max_triggers*3): #print("Triggers j: ", j) triggers[j].change(get_triggers_list_values, [triggers[0], triggers[1], triggers[2], triggers[3], triggers[4], triggers[5], triggers[6], triggers[7], triggers[8]]) """ #operator.input(variable_outputs, triggers_num, triggers).then(get_triggers_list_values) #threshold.input(variable_outputs, triggers_num, triggers).then(get_triggers_list_values) with gr.Column(scale=6): with gr.Group(): get_rationale = gr.Checkbox(label="Generate Written Rationale", info="Outcome will take slightly longer.", value=False, interactive= True, scale=5) extract_insight_btn = gr.Button(value ="Extract Insight 🧠", interactive= False) with gr.Column(scale=6): insight_df = gr.Dataframe(headers=["Metamarkers", "Output", "Gradient Match", "Rationale", "Source"], col_count=(5, "fixed"), wrap=True, visible= True, interactive= True) with gr.Group(): with gr.Row(): save_df_btn = gr.Button(value ="Save Data Frame Result πŸ’Ύ", visible=True, interactive= False, scale=1) with gr.Row(): csv_export = gr.File(label="Export DataFrame", scale=5, visible=False) dl_df_btn = gr.Button(value ="Download Data Frame πŸ“₯", visible=False, interactive= True, scale=1) copy_df_btn = gr.Button(value ="Copy Data Frame πŸ“‹", visible=False, interactive= True, scale=1) files_insight_extract = gr.HTML(label="Insight Extracted", value="") with gr.Column(scale=6): terminal = gr.Code(language="shell", lines=4, elem_id="terminal", interactive= False, label="Shell Log") with gr.Tab('E.V.A'): with gr.Column(scale=6): temp_slider = gr.Slider(label="Temperature", value=0.3, minimum=0.0, maximum=1.0, step=0.05, interactive=True, visible= False) max_new_tokens_slider = gr.Slider(label="Max new tokens", value=350, minimum=0, maximum=1097, step=64, interactive=True, visible= False) top_p_slider =gr.Slider(label="Top-p", value=0.3, minimum=0.0, maximum=1, step=0.05, interactive=True, visible= False) rep_pen_slider = gr.Slider(label="Repetition penalty", value=0.87, minimum=0, maximum=2.0, step=0.05, interactive=True, visible= False) pres_pen_slider = gr.Slider(label="Presence penalty", value=0, minimum=0, maximum=2.0, step=0.05, interactive=True, visible= False) copilot_instruct = gr.Textbox(label="Instruct", visible= False, value="Read Carefully the Background. Base your answers and rationale on the Background provided. If no Background provided, answer that you need more data to provide an answer. 1. FIRST ALWAYS state a concrete: Yes, Maybe/Unclear or No at the start of your answer. 2. Follow IMMEDIATELY with 1-2 sentence reasoning citing evidence. You will be asked a PRIMARY core question, POSSIBLY accompanied by extra context. Analyze any additional context if present, BUT base your FINAL determination PRIMARILY on confirmation of the KEY DETAILS in the MAIN PRIMARY question itself: If BACKGROUND confirms the KEY PRIMARY DETAILS, mark Yes and cite quote evidence. If SOME but NOT all PRIMARY DETAILS can be confirmed, mark Maybe/Unclear and cite partial evidence. ONLY mark No IF 20% or less of the PRIMARY question details have confirmation and provide a reasoning.", placeholder="Instruct", lines=1, interactive= True) copilot_entry = gr.Textbox(label="How can I help?", value="", placeholder="Gain Insight in Results", lines=1, interactive= True) copilot_insight_result = gr.HTML(label="blank_ Insight", value="") with gr.Row(): with gr.Column(scale=6): #rationale = gr.Textbox(label="Captured Rationale", value="", placeholder="Rationale", lines= 15, interactive= False) saved_insight_df_gr = gr.Dataframe(headers=["Metamarkers", "Output", "Gradient Match", "Rationale", "Source"], col_count=(5, "fixed"), wrap=True, visible= True, interactive= True) empty_df_btn = gr.Button(value ="Empty Data Frame πŸ—‘οΈ", visible=True, interactive= False, scale=1) with gr.Column(scale=6): terminal = gr.Code(language="shell", lines=4, elem_id="terminal", interactive= False, label="Shell Log") with gr.Tab('Real-time Guardrails'): with gr.Column(scale=6): with gr.Row(): with gr.Column(scale=4): chatbot = gr.ChatInterface(generate,additional_inputs=additional_inputs) #with gr.Row(): # profile_score = gr.Textbox(label="Insight General Score", value="", lines=1, interactive= False) # profile_labels = gr.Label(label="Insight Metamarkers", value="", scale=1) with gr.Column(scale=2): profile_score = gr.Textbox(label="Insight General Score", value="", lines=1, interactive= False) profile_labels = gr.Label(label="Insight Metamarkers", value="", scale=1) chart_plot = gr.Plot(fig) with gr.Column(scale=6): terminal = gr.Code(language="shell", lines=4, elem_id="terminal", interactive= False, label="Shell Log") #buildBP.click(build_structure, [title, gradients, blueprint, metamarkers]) bp_selector.select(fn=get_bp, inputs=bp_selector, outputs=[blueprint_textbox, blueprint_structure, phantom_html, gradient_scale, threshold_enabled, extract_insight_btn]) threshold_enabled.change(fn=threshold_condition, inputs=[threshold_enabled, blueprint_structure], outputs=[threshold_range, threshold_operator, n_dependency]) #dl_df_btn.click(df_to_csv, inputs=[insight_df], outputs=[fileobj]) #copy_df_btn.click(copy_df, inputs=[insight_df]) chatbot.textbox.submit(fn=update_scores,inputs=[chatbot.textbox, blueprint_structure], outputs=[profile_score, profile_labels, chart_plot]) """gen_gb_btn.click( generate_bp, inputs=[blueprint_title, blueprint_description], outputs=[blueprint_generated], api_name=False, queue=False, )""" extract_insight_btn.click( extract_insight, inputs=[files_batch, blueprint_structure, get_rationale, threshold_enabled, threshold_range, threshold_operator, n_dependency], outputs=[files_insight_extract, insight_df, save_df_btn], api_name=False, queue=False, ) save_df_btn.click( save_df, inputs=[insight_df], outputs=[saved_insight_df_gr, empty_df_btn], api_name=False, queue=False, ) empty_df_btn.click( empty_saved_df, inputs=[], outputs=[saved_insight_df_gr], api_name=False, queue=False, ) # call funciton when copilot_entry is submitted copilot_entry.submit( get_copilot_insight, inputs=[copilot_entry, saved_insight_df_gr, temp_slider, max_new_tokens_slider, top_p_slider, rep_pen_slider, pres_pen_slider, copilot_instruct], outputs=[copilot_insight_result], api_name=False, queue=False, ) feed.submit( annotate, inputs=[feed, blueprint_structure], outputs=[feed_insight, feed_details, feed_score, feed_labels], api_name=False, queue=False, ) annotate_btn.click( annotate, inputs=[feed, blueprint_structure], outputs=[feed_insight, feed_details, feed_score, feed_labels], api_name=False, queue=False, ) if __name__ == "__main__": demo.launch()