| import nltk |
| import os |
| import json |
| import math |
| import re |
| import gradio as gr |
| from collections import defaultdict, Counter |
| from nltk.tokenize import word_tokenize |
| from nltk.stem import PorterStemmer, WordNetLemmatizer |
|
|
| nltk.download("punkt") |
| nltk.download("wordnet") |
| nltk.download("punkt_tab") |
|
|
| stop_words = {"a", "is", "the", "of", "all", "and", "to", "can", "be", "as", "once", "for", "at", "am", "are", "has", "have", "had", "up", "his", "her", "in", "on", "no", "we", "do"} |
|
|
| with open("docs.json", "r", encoding="utf-8") as f: |
| docs_ds = json.load(f) |
|
|
| with open("queries.json", "r", encoding="utf-8") as f: |
| queries_ds = json.load(f) |
|
|
| documents = {int(doc["doc_id"]): doc["text"] for doc in docs_ds} |
| queries = {int(q["query_id"]): q["text"] for q in queries_ds} |
|
|
| inverted_index = defaultdict(set) |
| positional_index = defaultdict(lambda: defaultdict(list)) |
| tf_idf_vectors = defaultdict(dict) |
| idf_scores = {} |
|
|
| def process_documents(documents): |
| stemmer = PorterStemmer() |
| lemmatizer = WordNetLemmatizer() |
| doc_freq = defaultdict(int) |
| term_freqs = {} |
| for doc_id, text in documents.items(): |
| words = word_tokenize(text.lower()) |
| filtered_words = [lemmatizer.lemmatize(w) for w in words if w.isalnum() and w not in stop_words] |
| term_counts = Counter(filtered_words) |
| term_freqs[doc_id] = term_counts |
| for pos, word in enumerate(filtered_words): |
| stemmed = stemmer.stem(word) |
| inverted_index[stemmed].add(doc_id) |
| positional_index[stemmed][doc_id].append(pos) |
| for word in set(filtered_words): |
| doc_freq[word] += 1 |
| total_docs = len(documents) |
| for word, df in doc_freq.items(): |
| idf_scores[word] = math.log(total_docs / df) |
| for doc_id, term_counts in term_freqs.items(): |
| tf_idf_vectors[doc_id] = {word: count * idf_scores[word] for word, count in term_counts.items()} |
|
|
| def execute_boolean_query(query, documents): |
| query = query.lower() |
| tokens = query.split() |
| stemmer = PorterStemmer() |
| operators = {'and', 'or', 'not'} |
| term_stack = [] |
| operator_stack = [] |
| for token in tokens: |
| if token in operators: |
| operator_stack.append(token) |
| else: |
| stemmed_word = stemmer.stem(token) |
| term_set = inverted_index.get(stemmed_word, set()) |
| term_stack.append(term_set) |
| while 'not' in operator_stack: |
| idx = operator_stack.index('not') |
| term_stack[idx] = set(documents.keys()) - term_stack[idx] |
| operator_stack.pop(idx) |
| while operator_stack: |
| op = operator_stack.pop(0) |
| left = term_stack.pop(0) |
| right = term_stack.pop(0) |
| if op == 'and': |
| term_stack.insert(0, left & right) |
| elif op == 'or': |
| term_stack.insert(0, left | right) |
| return sorted(term_stack[0]) if term_stack else [] |
|
|
| def execute_proximity_query(query): |
| match = re.match(r'(\w+)\s+(\w+)\s*/\s*(\d+)', query) |
| if not match: |
| return [] |
| word1, word2, k = match.groups() |
| k = int(k) |
| stemmer = PorterStemmer() |
| word1 = stemmer.stem(word1.lower()) |
| word2 = stemmer.stem(word2.lower()) |
| result_docs = set() |
| if word1 in positional_index and word2 in positional_index: |
| for doc_id in positional_index[word1]: |
| if doc_id in positional_index[word2]: |
| positions1 = positional_index[word1][doc_id] |
| positions2 = positional_index[word2][doc_id] |
| if any(0 < abs(p1 - p2) <= k for p1 in positions1 for p2 in positions2): |
| result_docs.add(doc_id) |
| return sorted(result_docs) |
|
|
| def evaluate_cosine_similarity_score(vec1, vec2): |
| common = set(vec1.keys()) & set(vec2.keys()) |
| dot_product = sum(vec1[k] * vec2[k] for k in common) |
| norm1 = math.sqrt(sum(v**2 for v in vec1.values())) |
| norm2 = math.sqrt(sum(v**2 for v in vec2.values())) |
| if norm1 == 0 or norm2 == 0: |
| return 0.0 |
| return dot_product / (norm1 * norm2) |
|
|
| def process_query(user_input_query): |
| lemmatizer = WordNetLemmatizer() |
| tokens = word_tokenize(user_input_query.lower()) |
| filtered = [lemmatizer.lemmatize(w) for w in tokens if w.isalnum() and w not in stop_words] |
| query_counts = Counter(filtered) |
| return {w: query_counts[w] * idf_scores.get(w, 0) for w in query_counts} |
|
|
| def execute_vsm_query(user_input_query, alpha=0.001): |
| query_vector = process_query(user_input_query) |
| scores = {} |
| for doc_id, doc_vector in tf_idf_vectors.items(): |
| sim = evaluate_cosine_similarity_score(query_vector, doc_vector) |
| if sim >= alpha: |
| scores[doc_id] = sim |
| return sorted(scores, key=scores.get, reverse=True) |
|
|
| process_documents(documents) |
|
|
| def chatbot_fn(query, method): |
| if not query: |
| return "Query cannot be empty" |
| if method == "Boolean": |
| result = execute_boolean_query(query, documents) |
| elif method == "Proximity": |
| result = execute_proximity_query(query) |
| elif method == "Vector Space Model": |
| result = execute_vsm_query(query) |
| return f"Result-set: {result}" |
|
|
| iface = gr.Interface( |
| fn=chatbot_fn, |
| inputs=["text", gr.Radio(["Boolean", "Proximity", "Vector Space Model"], label="Method")], |
| outputs="text", |
| title="Information Retrieval Chatbot", |
| ) |
| iface.launch() |