| | import streamlit.components.v1 as components |
| | import streamlit as st |
| | from random import randrange, uniform |
| | import pandas as pd |
| | import logging |
| | import numpy as np |
| | import random |
| | from datetime import datetime, timedelta |
| | from babel.numbers import format_currency |
| |
|
| | |
| | COL_NAMES = [ |
| | "Transaction date", |
| | "Transaction type", |
| | "Amount transferred", |
| | "Sender's initial balance", |
| | "Sender's new balance", |
| | "Recipient's initial balance", |
| | "Recipient's new balance", |
| | "Sender exactly credited", |
| | "Receiver exactly credited", |
| | "Large amount", |
| | "Frequent receiver", |
| | "Merchant receiver", |
| | "Sender ID", |
| | "Receiver ID", |
| | ] |
| |
|
| | |
| | feature_texts = { |
| | 0: "Date of transaction", |
| | 1: "Amount transferred", |
| | 2: "Initial balance of sender", |
| | 3: "New balance of sender", |
| | 4: "Initial balance of recipient", |
| | 5: "New balance of recipient", |
| | 6: "Sender's balance was exactly credited", |
| | 7: "Receiver's balance was exactly credited", |
| | 8: "Large amount", |
| | 9: "Frequent receiver of transactions", |
| | 10: "Receiver is merchant", |
| | 11: "Sender ID", |
| | 12: "Receiver ID", |
| | 13: "Transaction type is Cash out", |
| | 14: "Transaction type is Transfer", |
| | 15: "Transaction type is Payment", |
| | 16: "Transaction type is Cash in", |
| | 17: "Transaction type is Debit", |
| | } |
| |
|
| | |
| | CATEGORIES = np.array(["CASH_OUT", "TRANSFER", "PAYMENT", "CASH_IN", "DEBIT"]) |
| |
|
| |
|
| | |
| | def transformation(input, categories): |
| | new_x = input |
| | cat = np.array(input[1]) |
| | del new_x[1] |
| | result_array = np.zeros(5, dtype=int) |
| | match_index = np.where(categories == cat)[0] |
| | result_array[match_index] = 1 |
| | new_x.extend(result_array.tolist()) |
| | python_objects = [ |
| | np_type.item() if isinstance(np_type, np.generic) else np_type |
| | for np_type in new_x |
| | ] |
| | return python_objects |
| |
|
| |
|
| | |
| | def get_request_body(datapoint): |
| | data = datapoint.iloc[0].tolist() |
| | instances = [int(x) if isinstance(x, (np.int32, np.int64)) else x for x in data] |
| | request_body = {"instances": [instances]} |
| | return request_body |
| |
|
| |
|
| | |
| | def get_explainability_texts(shap_values, feature_texts): |
| | |
| | positive_dict = {index: val for index, val in enumerate(shap_values) if val > 0} |
| | |
| | sorted_positive_indices = [ |
| | index |
| | for index, _ in sorted( |
| | positive_dict.items(), key=lambda item: abs(item[1]), reverse=True |
| | ) |
| | ] |
| | positive_texts = [feature_texts[x] for x in sorted_positive_indices] |
| | positive_texts = positive_texts[2:] |
| | sorted_positive_indices = sorted_positive_indices[2:] |
| | if len(positive_texts) > 5: |
| | positive_texts = positive_texts[:5] |
| | sorted_positive_indices = sorted_positive_indices[:5] |
| | return positive_texts, sorted_positive_indices |
| |
|
| |
|
| | |
| | |
| | def random_past_date_from_last_year(): |
| | one_year_ago = datetime.now() - timedelta(days=365) |
| | random_days = random.randint(0, (datetime.now() - one_year_ago).days) |
| | random_date = one_year_ago + timedelta(days=random_days) |
| | return random_date.strftime("%Y-%m-%d") |
| |
|
| |
|
| | |
| | def get_explainability_values(pos_indices, data): |
| | rounded_data = [ |
| | round(value, 2) if isinstance(value, float) else value for value in data |
| | ] |
| | transformed_data = transformation(input=rounded_data, categories=CATEGORIES) |
| | vals = [] |
| | for idx in pos_indices: |
| | if idx in range(6, 11) or idx in range(13, 18): |
| | val = str(bool(transformed_data[idx])).capitalize() |
| | else: |
| | val = transformed_data[idx] |
| | vals.append(val) |
| | return vals |
| |
|
| |
|
| | |
| | def modify_datapoint( |
| | datapoint, |
| | ): |
| | data = datapoint.iloc[0].tolist() |
| | data[0] = random_past_date_from_last_year() |
| | modified_amounts = data.copy() |
| | if any(val > 12000 for val in data[2:7]): |
| | modified_amounts[2:7] = [ |
| | value / 100 if value != 0 else 0 for value in data[2:7] |
| | ] |
| | if any(val > 120000 for val in modified_amounts[2:7]): |
| | new_list = [value / 10 if value != 0 else 0 for value in modified_amounts[2:7]] |
| | modified_amounts[2:7] = new_list |
| | rounded_data = [ |
| | round(value, 2) if isinstance(value, float) else value |
| | for value in modified_amounts |
| | ] |
| | rounded_data[2:7] = [ |
| | format_currency(value, "EUR", locale="en_GB") for value in rounded_data[2:7] |
| | ] |
| | return rounded_data |
| |
|
| |
|
| | |
| | def get_weights(shap_values, sorted_indices, target_sum=0.95): |
| | weights = [shap_values[x] for x in sorted_indices] |
| | total_sum = sum(weights) |
| | |
| | scaled_values = [val * (target_sum / total_sum) for val in weights] |
| | return scaled_values |
| |
|
| |
|
| | |
| | def get_fake_certainty(): |
| | |
| | fake_certainty = uniform(0.75, 0.99) |
| | formatted_fake_certainty = "{:.2%}".format(fake_certainty) |
| | return formatted_fake_certainty |
| |
|
| |
|
| | |
| | def get_random_suspicious_transaction(data): |
| | suspicious_data = data[data["isFraud"] == 1] |
| | max_n = len(suspicious_data) |
| | random_nr = randrange(max_n) |
| | suspicious_transaction = suspicious_data[random_nr - 1 : random_nr].drop( |
| | "isFraud", axis=1 |
| | ) |
| | return suspicious_transaction |
| |
|
| |
|
| | |
| | def send_evaluation( |
| | client, deployment_id, request_log_id, prediction_log_id, evaluation_input |
| | ): |
| | """Send evaluation to Deeploy.""" |
| | try: |
| | with st.spinner("Submitting response..."): |
| | |
| | client.evaluate( |
| | deployment_id, |
| | prediction_log_id, |
| | evaluation_input |
| | ) |
| | return True |
| | except Exception as e: |
| | logging.error(e) |
| | st.error( |
| | "Failed to submit feedback." |
| | + "Check whether you are using the right model URL and Token. " |
| | + "Contact Deeploy if the problem persists." |
| | ) |
| | st.write(f"Error message: {e}") |
| |
|
| |
|
| | |
| | def get_model_url(): |
| | """Get model url and retrieve workspace id and deployment id from it""" |
| | model_url = st.text_area( |
| | "Model URL (default is the demo deployment)", |
| | "https://api.app.deeploy.ml/workspaces/708b5808-27af-461a-8ee5-80add68384c7/deployments/ac56dbdf-ba04-462f-aa70-5a0d18698e42/", |
| | height=125, |
| | ) |
| | elems = model_url.split("/") |
| | try: |
| | workspace_id = elems[4] |
| | deployment_id = elems[6] |
| | except IndexError: |
| | workspace_id = "" |
| | deployment_id = "" |
| | return model_url, workspace_id, deployment_id |
| |
|
| |
|
| | |
| | def get_comment_explanation(certainty, explainability_texts, explainability_values): |
| | cleaned = [x.replace(":", "") for x in explainability_texts] |
| | fi = [f"{cleaned[i]} is {x}" for i, x in enumerate(explainability_values)] |
| | fi.insert(0, "Important suspicious features: ") |
| | result = "\n".join(fi) |
| | comment = f"Model certainty is {certainty}" + "\n" "\n" + result |
| | return comment |
| |
|
| |
|
| | |
| | def create_data_input_table(data, col_names): |
| | st.subheader("Transaction details") |
| | data[7:12] = [bool(value) for value in data[7:12]] |
| | rounded_list = [ |
| | round(value, 2) if isinstance(value, float) else value for value in data |
| | ] |
| | df = pd.DataFrame({"Feature name": col_names, "Value": rounded_list}) |
| | st.dataframe( |
| | df, hide_index=True, width=475, height=35 * len(df) + 38 |
| | ) |
| |
|
| |
|
| | |
| | def create_table(texts, values, weights, title): |
| | df = pd.DataFrame( |
| | {"Feature Explanation": texts, "Value": values, "Weight": weights} |
| | ) |
| | st.markdown(f"#### {title}") |
| | st.dataframe( |
| | df, |
| | hide_index=True, |
| | width=475, |
| | column_config={ |
| | "Weight": st.column_config.ProgressColumn( |
| | "Weight", width="small", format="%.2f", min_value=0, max_value=1 |
| | ) |
| | }, |
| | ) |
| |
|
| |
|
| | |
| | def ChangeButtonColour(widget_label, font_color, background_color="transparent"): |
| | htmlstr = f""" |
| | <script> |
| | var elements = window.parent.document.querySelectorAll('button'); |
| | for (var i = 0; i < elements.length; ++i) {{ |
| | if (elements[i].innerText == '{widget_label}') {{ |
| | elements[i].style.color ='{font_color}'; |
| | elements[i].style.background = '{background_color}' |
| | }} |
| | }} |
| | </script> |
| | """ |
| | components.html(f"{htmlstr}", height=0, width=0) |
| |
|