| | |
| | """ |
| | Frequency Analyzer for Brown Corpus Data |
| | |
| | Reads cached frequency data and provides comprehensive analysis and visualization |
| | of word frequency distribution from NLTK Brown corpus. |
| | """ |
| |
|
| | import os |
| | import pickle |
| | import sys |
| | import urllib.request |
| | import json |
| | import csv |
| | import numpy as np |
| | from collections import Counter |
| | from typing import Dict, List, Tuple, Optional |
| |
|
| | |
| | try: |
| | import matplotlib.pyplot as plt |
| | import matplotlib |
| | matplotlib.use('TkAgg') |
| | HAS_MATPLOTLIB = True |
| | except ImportError: |
| | HAS_MATPLOTLIB = False |
| | print("Note: matplotlib not available. Visualizations will be disabled.") |
| |
|
| | |
| | try: |
| | from wordfreq import word_frequency, zipf_frequency |
| | HAS_WORDFREQ = True |
| | except ImportError: |
| | HAS_WORDFREQ = False |
| | print("Note: wordfreq not available. External frequency sources will be disabled.") |
| |
|
| |
|
| | class FrequencySource: |
| | """Represents a word frequency data source.""" |
| | |
| | def __init__(self, name: str, description: str, url: str = None, |
| | filename: str = None, parser=None): |
| | self.name = name |
| | self.description = description |
| | self.url = url |
| | self.filename = filename |
| | self.parser = parser |
| | self.frequencies = None |
| | self.total_words = 0 |
| | self.unique_words = 0 |
| | |
| | def is_available(self, cache_dir: str) -> bool: |
| | """Check if source data is available locally.""" |
| | if self.name == 'wordfreq': |
| | return HAS_WORDFREQ |
| | elif self.filename: |
| | return os.path.exists(os.path.join(cache_dir, self.filename)) |
| | return False |
| | |
| | def load_data(self, cache_dir: str) -> bool: |
| | """Load frequency data for this source.""" |
| | if not self.is_available(cache_dir): |
| | return False |
| | |
| | try: |
| | if self.name == 'wordfreq': |
| | |
| | self.frequencies = self.parser() |
| | elif self.parser: |
| | filepath = os.path.join(cache_dir, self.filename) |
| | self.frequencies = self.parser(filepath) |
| | else: |
| | |
| | filepath = os.path.join(cache_dir, self.filename) |
| | with open(filepath, 'rb') as f: |
| | self.frequencies = pickle.load(f) |
| | |
| | if self.frequencies: |
| | self.total_words = sum(self.frequencies.values()) if isinstance(self.frequencies, dict) else len(self.frequencies) |
| | self.unique_words = len(self.frequencies) |
| | return True |
| | except Exception as e: |
| | print(f"Error loading {self.name}: {e}") |
| | |
| | return False |
| |
|
| |
|
| | class FrequencyAnalyzer: |
| | def __init__(self, cache_dir: str = None): |
| | """Initialize frequency analyzer.""" |
| | if cache_dir is None: |
| | cache_dir = os.path.join(os.path.dirname(__file__), 'model_cache') |
| | |
| | self.cache_dir = cache_dir |
| | self.word_frequencies = None |
| | self.total_words = 0 |
| | self.unique_words = 0 |
| | self.frequency_tiers = {} |
| | self.current_source = None |
| | |
| | |
| | self.frequency_sources = self._initialize_frequency_sources() |
| | |
| | |
| | self.load_default_source() |
| | |
| | def _initialize_frequency_sources(self) -> Dict[str, FrequencySource]: |
| | """Initialize available frequency sources.""" |
| | sources = {} |
| | |
| | |
| | sources['brown'] = FrequencySource( |
| | name='brown', |
| | description='NLTK Brown Corpus (1960s, ~1.1M words)', |
| | filename='brown_frequencies.pkl' |
| | ) |
| | |
| | |
| | if HAS_WORDFREQ: |
| | sources['wordfreq'] = FrequencySource( |
| | name='wordfreq', |
| | description='WordFreq Multi-source (Wikipedia, subtitles, news, books, web, Twitter, Reddit, ~2021)', |
| | parser=self._parse_wordfreq_data |
| | ) |
| | |
| | |
| | sources['google_ngram'] = FrequencySource( |
| | name='google_ngram', |
| | description='Google Books Ngram Corpus (v3, 2020) - Real data download required', |
| | filename='google_ngram_frequencies.pkl' |
| | ) |
| | |
| | return sources |
| | |
| | def _parse_wordfreq_data(self, filepath: str = None) -> Counter: |
| | """Parse wordfreq data - use wordfreq's own vocabulary.""" |
| | if not HAS_WORDFREQ: |
| | return None |
| | |
| | print("Generating wordfreq dataset using wordfreq's vocabulary...") |
| | |
| | |
| | try: |
| | from wordfreq import available_languages, top_n_list |
| | print(f"WordFreq available languages: {available_languages()}") |
| | |
| | |
| | |
| | print("Fetching top words from wordfreq vocabulary...") |
| | |
| | |
| | word_counts = [500000,100000, 50000, 25000] |
| | |
| | frequency_data = Counter() |
| | |
| | for max_words in word_counts: |
| | try: |
| | print(f"Attempting to fetch top {max_words:,} words...") |
| | |
| | |
| | top_words = top_n_list('en', max_words, wordlist='large') |
| | |
| | print(f"Retrieved {len(top_words):,} words from wordfreq") |
| | |
| | |
| | filtered_out_words = [] |
| | zero_freq_words = 0 |
| | total_processed = 0 |
| | |
| | |
| | for i, word in enumerate(top_words): |
| | try: |
| | |
| | freq = word_frequency(word, 'en', wordlist='large') |
| | total_processed += 1 |
| | |
| | if freq > 0: |
| | |
| | log_bin = int(-np.log10(freq)) |
| | |
| | |
| | |
| | count = int(freq * 1_000_000_000) |
| | if count > 0: |
| | frequency_data[word] = count |
| | else: |
| | |
| | frequency_data[word] = 1 |
| | filtered_out_words.append((word, freq, log_bin)) |
| | else: |
| | |
| | zero_freq_words += 1 |
| | |
| | |
| | if (i + 1) % 5000 == 0: |
| | print(f" Processed {i+1:,}/{len(top_words):,} words ({len(frequency_data):,} with frequencies)") |
| | |
| | except Exception as e: |
| | continue |
| | |
| | |
| | print(f"\nWordFreq Processing Results:") |
| | print(f" Total processed: {total_processed:,}") |
| | print(f" Words with frequencies: {len(frequency_data):,}") |
| | print(f" Words filtered out (tiny freq): {len(filtered_out_words):,}") |
| | print(f" Words with zero frequency: {zero_freq_words:,}") |
| | |
| | if filtered_out_words: |
| | print(f"\nFrequency distribution of filtered words:") |
| | bin_counts = Counter(log_bin for _, _, log_bin in filtered_out_words) |
| | for bin_num in sorted(bin_counts.keys()): |
| | print(f" Bin {bin_num} (freq ~1e-{bin_num}): {bin_counts[bin_num]:,} words") |
| | |
| | |
| | print(f"\nSample filtered words by frequency bin:") |
| | bins_sample = {} |
| | for word, freq, log_bin in filtered_out_words[:50]: |
| | if log_bin not in bins_sample: |
| | bins_sample[log_bin] = [] |
| | if len(bins_sample[log_bin]) < 3: |
| | bins_sample[log_bin].append((word, freq)) |
| | |
| | for bin_num in sorted(bins_sample.keys()): |
| | print(f" Bin {bin_num}: {', '.join(f'{w}({f:.2e})' for w, f in bins_sample[bin_num])}") |
| | |
| | if len(frequency_data) > 1000: |
| | break |
| | |
| | except Exception as e: |
| | print(f"Failed to fetch {max_words:,} words: {e}") |
| | continue |
| | |
| | if len(frequency_data) == 0: |
| | print("Fallback: generating frequencies for common words manually...") |
| | |
| | common_words = [ |
| | "the", "be", "to", "of", "and", "a", "in", "that", "have", "i", |
| | "it", "for", "not", "on", "with", "he", "as", "you", "do", "at", |
| | "this", "but", "his", "by", "from", "they", "she", "or", "an", "will", |
| | "my", "one", "all", "would", "there", "their", "what", "so", "up", "out" |
| | ] |
| | |
| | for word in common_words: |
| | try: |
| | freq = word_frequency(word, 'en', wordlist='large') |
| | if freq > 0: |
| | count = int(freq * 1_000_000) |
| | if count > 0: |
| | frequency_data[word] = count |
| | except: |
| | continue |
| | |
| | print(f"✓ Generated wordfreq dataset: {len(frequency_data):,} words with real frequencies") |
| | return frequency_data |
| | |
| | except ImportError as e: |
| | print(f"Could not access wordfreq vocabulary functions: {e}") |
| | return None |
| | |
| | def load_default_source(self): |
| | """Load the best available frequency source.""" |
| | |
| | priority_sources = ['wordfreq', 'brown'] |
| | |
| | for source_name in priority_sources: |
| | if self.switch_source(source_name): |
| | break |
| | |
| | if not self.current_source: |
| | print("Warning: No frequency sources available!") |
| | |
| | def switch_source(self, source_name: str) -> bool: |
| | """Switch to a different frequency source.""" |
| | if source_name not in self.frequency_sources: |
| | print(f"Unknown source: {source_name}") |
| | return False |
| | |
| | source = self.frequency_sources[source_name] |
| | |
| | if source.load_data(self.cache_dir): |
| | self.current_source = source |
| | self.word_frequencies = source.frequencies |
| | self.total_words = source.total_words |
| | self.unique_words = source.unique_words |
| | self.create_frequency_tiers() |
| | print(f"✓ Switched to {source.name}: {source.description}") |
| | return True |
| | else: |
| | print(f"✗ Source {source_name} not available. Use 'download {source_name}' to get it.") |
| | return False |
| | |
| | def download_source(self, source_name: str) -> bool: |
| | """Download external frequency source.""" |
| | if source_name not in self.frequency_sources: |
| | print(f"Unknown source: {source_name}") |
| | return False |
| | |
| | source = self.frequency_sources[source_name] |
| | |
| | if source_name == 'google_ngram': |
| | return self._download_google_ngram() |
| | else: |
| | print(f"Download not implemented for {source_name}") |
| | return False |
| | |
| | def _download_google_ngram(self) -> bool: |
| | """Download and process actual Google Books Ngram frequency data.""" |
| | print("Downloading Google Books Ngram frequency data...") |
| | print("Using streaming download from github.com/orgtre/google-books-ngram-frequency") |
| | |
| | |
| | url = "https://raw.githubusercontent.com/orgtre/google-books-ngram-frequency/main/ngrams/1grams_english.csv" |
| | |
| | try: |
| | print(f"Downloading top entries from: {url}") |
| | print("Note: Processing first 100,000 entries (most frequent words)") |
| | |
| | |
| | import subprocess |
| | result = subprocess.run([ |
| | 'curl', '-s', '-L', '--max-time', '60', url |
| | ], capture_output=True, text=True, timeout=90) |
| | |
| | if result.returncode != 0: |
| | raise Exception(f"curl failed: {result.stderr}") |
| | |
| | content = result.stdout |
| | print(f"Downloaded {len(content)} characters") |
| | |
| | |
| | frequency_data = Counter() |
| | lines = content.strip().split('\n') |
| | |
| | print(f"Processing {len(lines)} lines...") |
| | |
| | |
| | csv_reader = csv.DictReader(lines) |
| | |
| | for line_num, row in enumerate(csv_reader, 1): |
| | try: |
| | word = row['ngram'].strip().lower() |
| | freq_str = row['freq'].strip() |
| | |
| | |
| | freq_value = float(freq_str.replace(',', '')) |
| | |
| | if freq_value > 0: |
| | |
| | if len(word) > 1 and word.isalpha() and word.isascii() and ' ' not in word: |
| | frequency_data[word] = int(freq_value) |
| | |
| | |
| | if line_num % 10000 == 0: |
| | print(f" Processed {line_num:,} lines, found {len(frequency_data):,} valid words") |
| | |
| | |
| | if line_num >= 100000: |
| | print(f" Processed first 100,000 most frequent entries") |
| | break |
| | |
| | except (ValueError, KeyError, IndexError) as e: |
| | continue |
| | |
| | if len(frequency_data) > 1000: |
| | |
| | cache_path = os.path.join(self.cache_dir, 'google_ngram_frequencies.pkl') |
| | with open(cache_path, 'wb') as f: |
| | pickle.dump(frequency_data, f) |
| | |
| | print(f"✓ Downloaded Google Ngram data: {len(frequency_data):,} words") |
| | print(f"✓ Saved to: {cache_path}") |
| | return True |
| | else: |
| | print(f"✗ Not enough valid data found ({len(frequency_data)} words)") |
| | return False |
| | |
| | except Exception as e: |
| | print(f"✗ Failed to download Google Ngram data: {e}") |
| | return False |
| | |
| | |
| | |
| | def list_sources(self): |
| | """List all available frequency sources.""" |
| | print(f"\n{'='*70}") |
| | print("AVAILABLE FREQUENCY SOURCES") |
| | print(f"{'='*70}") |
| | print(f"{'Source':<12} {'Available':<10} {'Description'}") |
| | print("-" * 70) |
| | |
| | for name, source in self.frequency_sources.items(): |
| | available = "✓ Yes" if source.is_available(self.cache_dir) else "✗ No" |
| | current = " (current)" if self.current_source and self.current_source.name == name else "" |
| | print(f"{name:<12} {available:<10} {source.description}{current}") |
| | |
| | print(f"\nCurrent source: {self.current_source.name if self.current_source else 'None'}") |
| | |
| | def compare_word_across_sources(self, word: str): |
| | """Compare how a word is classified across different sources.""" |
| | print(f"\n{'='*70}") |
| | print(f"WORD COMPARISON: '{word}'") |
| | print(f"{'='*70}") |
| | print(f"{'Source':<12} {'Count':<8} {'Frequency':<12} {'Tier':<12} {'Available'}") |
| | print("-" * 70) |
| | |
| | current_source = self.current_source |
| | |
| | for name, source in self.frequency_sources.items(): |
| | if source.is_available(self.cache_dir): |
| | |
| | if source.load_data(self.cache_dir): |
| | temp_freq = source.frequencies |
| | temp_total = sum(temp_freq.values()) if isinstance(temp_freq, dict) else len(temp_freq) |
| | |
| | count = temp_freq.get(word.lower(), 0) |
| | freq = count / temp_total if temp_total > 0 else 0.0 |
| | |
| | |
| | if freq > 0.001: |
| | tier = "very_common" |
| | elif freq > 0.0001: |
| | tier = "common" |
| | elif freq > 0.00001: |
| | tier = "uncommon" |
| | else: |
| | tier = "rare" |
| | |
| | available = "✓" |
| | print(f"{name:<12} {count:<8} {freq:<12.6f} {tier:<12} {available}") |
| | else: |
| | print(f"{name:<12} {'N/A':<8} {'N/A':<12} {'N/A':<12} {'✗'}") |
| | else: |
| | print(f"{name:<12} {'N/A':<8} {'N/A':<12} {'N/A':<12} {'✗'}") |
| | |
| | |
| | if current_source: |
| | current_source.load_data(self.cache_dir) |
| | self.current_source = current_source |
| | self.word_frequencies = current_source.frequencies |
| | self.total_words = current_source.total_words |
| | self.unique_words = current_source.unique_words |
| | |
| | def load_frequency_data(self) -> bool: |
| | """Load cached Brown corpus frequency data.""" |
| | freq_cache_path = os.path.join(self.cache_dir, 'brown_frequencies.pkl') |
| | |
| | if not os.path.exists(freq_cache_path): |
| | print(f"Error: Frequency cache not found at {freq_cache_path}") |
| | print("Please run the thematic word generator first to create the cache.") |
| | return False |
| | |
| | try: |
| | print("Loading frequency data from cache...") |
| | with open(freq_cache_path, 'rb') as f: |
| | self.word_frequencies = pickle.load(f) |
| | |
| | self.total_words = sum(self.word_frequencies.values()) |
| | self.unique_words = len(self.word_frequencies) |
| | |
| | print(f"✓ Loaded frequency data:") |
| | print(f" - Total word tokens: {self.total_words:,}") |
| | print(f" - Unique words: {self.unique_words:,}") |
| | |
| | return True |
| | |
| | except Exception as e: |
| | print(f"Error loading frequency cache: {e}") |
| | return False |
| | |
| | def create_frequency_tiers(self): |
| | """Create detailed frequency tier classifications with 10 bins.""" |
| | if not self.word_frequencies: |
| | return |
| | |
| | tiers = {} |
| | most_common = self.word_frequencies.most_common(50000) |
| | |
| | |
| | all_counts = [count for word, count in self.word_frequencies.items()] |
| | all_counts.sort(reverse=True) |
| | |
| | |
| | tier_definitions = [ |
| | ("tier_1_ultra_common", 0.999, "Ultra Common (Top 0.1%)"), |
| | ("tier_2_extremely_common", 0.995, "Extremely Common (Top 0.5%)"), |
| | ("tier_3_very_common", 0.99, "Very Common (Top 1%)"), |
| | ("tier_4_highly_common", 0.97, "Highly Common (Top 3%)"), |
| | ("tier_5_common", 0.92, "Common (Top 8%)"), |
| | ("tier_6_moderately_common", 0.85, "Moderately Common (Top 15%)"), |
| | ("tier_7_somewhat_uncommon", 0.70, "Somewhat Uncommon (Top 30%)"), |
| | ("tier_8_uncommon", 0.50, "Uncommon (Top 50%)"), |
| | ("tier_9_rare", 0.25, "Rare (Top 75%)"), |
| | ("tier_10_very_rare", 0.0, "Very Rare (Bottom 25%)") |
| | ] |
| | |
| | |
| | thresholds = [] |
| | for tier_name, percentile, description in tier_definitions: |
| | if percentile > 0: |
| | idx = int((1 - percentile) * len(all_counts)) |
| | threshold = all_counts[min(idx, len(all_counts) - 1)] |
| | else: |
| | threshold = 0 |
| | thresholds.append((tier_name, threshold, description)) |
| | |
| | |
| | for word, count in self.word_frequencies.items(): |
| | assigned = False |
| | for tier_name, threshold, description in thresholds: |
| | if count >= threshold: |
| | tiers[word] = tier_name |
| | assigned = True |
| | break |
| | |
| | if not assigned: |
| | tiers[word] = "tier_10_very_rare" |
| | |
| | self.frequency_tiers = tiers |
| | self.tier_descriptions = {name: desc for name, _, desc in thresholds} |
| | |
| | |
| | tier_counts = Counter(tiers.values()) |
| | print(f"\n✓ Frequency tier distribution (10-tier system):") |
| | |
| | |
| | tier_order = [f"tier_{i}_{name}" for i, name in enumerate([ |
| | "ultra_common", "extremely_common", "very_common", "highly_common", |
| | "common", "moderately_common", "somewhat_uncommon", "uncommon", |
| | "rare", "very_rare" |
| | ], 1)] |
| | |
| | for tier_key in tier_order: |
| | if tier_key in tier_counts: |
| | count = tier_counts[tier_key] |
| | description = self.tier_descriptions.get(tier_key, tier_key) |
| | percentage = (count / len(tiers)) * 100 |
| | print(f" - {description}: {count:,} words ({percentage:.1f}%)") |
| | |
| | def get_word_info(self, word: str) -> Tuple[int, float, str, int]: |
| | """Get detailed information about a word.""" |
| | word = word.lower() |
| | count = self.word_frequencies.get(word, 0) |
| | relative_freq = count / self.total_words if self.total_words > 0 else 0.0 |
| | tier = self.frequency_tiers.get(word, "rare") |
| | |
| | |
| | rank = 0 |
| | if count > 0: |
| | rank = sum(1 for w, c in self.word_frequencies.items() if c > count) + 1 |
| | |
| | return count, relative_freq, tier, rank |
| | |
| | def show_top_words(self, n: int = 50): |
| | """Display the most common words.""" |
| | print(f"\n{'='*60}") |
| | print(f"TOP {n} MOST COMMON WORDS") |
| | print(f"{'='*60}") |
| | print(f"{'Rank':<6} {'Word':<15} {'Count':<8} {'Frequency':<12} {'Tier'}") |
| | print("-" * 60) |
| | |
| | for i, (word, count) in enumerate(self.word_frequencies.most_common(n)): |
| | relative_freq = count / self.total_words |
| | tier = self.frequency_tiers.get(word, "rare") |
| | print(f"{i+1:<6} {word:<15} {count:<8} {relative_freq:<12.6f} {tier}") |
| | |
| | def show_bottom_words(self, n: int = 50): |
| | """Display the least common words.""" |
| | print(f"\n{'='*60}") |
| | print(f"BOTTOM {n} LEAST COMMON WORDS") |
| | print(f"{'='*60}") |
| | print(f"{'Word':<15} {'Count':<8} {'Frequency':<12} {'Tier'}") |
| | print("-" * 60) |
| | |
| | |
| | bottom_words = self.word_frequencies.most_common()[:-n-1:-1] |
| | |
| | for word, count in bottom_words: |
| | relative_freq = count / self.total_words |
| | tier = self.frequency_tiers.get(word, "rare") |
| | print(f"{word:<15} {count:<8} {relative_freq:<12.6f} {tier}") |
| | |
| | def show_frequency_ranges(self): |
| | """Show distribution of words across detailed frequency ranges.""" |
| | print(f"\n{'='*70}") |
| | print("DETAILED FREQUENCY RANGE DISTRIBUTION") |
| | print(f"{'='*70}") |
| | |
| | |
| | ranges = [ |
| | ("Ultra High (>1e-2)", lambda f: f > 0.01), |
| | ("Extremely High (1e-3 to 1e-2)", lambda f: 0.001 < f <= 0.01), |
| | ("Very High (1e-4 to 1e-3)", lambda f: 0.0001 < f <= 0.001), |
| | ("High (1e-5 to 1e-4)", lambda f: 0.00001 < f <= 0.0001), |
| | ("Moderately High (1e-6 to 1e-5)", lambda f: 0.000001 < f <= 0.00001), |
| | ("Medium (1e-7 to 1e-6)", lambda f: 0.0000001 < f <= 0.000001), |
| | ("Moderately Low (1e-8 to 1e-7)", lambda f: 0.00000001 < f <= 0.0000001), |
| | ("Low (1e-9 to 1e-8)", lambda f: 0.000000001 < f <= 0.00000001), |
| | ("Very Low (1e-10 to 1e-9)", lambda f: 0.0000000001 < f <= 0.000000001), |
| | ("Ultra Low (<1e-10)", lambda f: f <= 0.0000000001) |
| | ] |
| | |
| | print(f"{'Range':<30} {'Count':<10} {'Percentage'}") |
| | print("-" * 70) |
| | |
| | for range_name, condition in ranges: |
| | count = sum(1 for word, word_count in self.word_frequencies.items() |
| | if condition(word_count / self.total_words)) |
| | percentage = (count / self.unique_words) * 100 |
| | print(f"{range_name:<30} {count:>8,} words ({percentage:>5.1f}%)") |
| | |
| | def show_tier_samples(self, n: int = 5): |
| | """Show sample words from each frequency tier.""" |
| | print(f"\n{'='*80}") |
| | print(f"SAMPLE WORDS BY TIER (showing {n} per tier)") |
| | print(f"{'='*80}") |
| | |
| | |
| | tier_order = [f"tier_{i}_{name}" for i, name in enumerate([ |
| | "ultra_common", "extremely_common", "very_common", "highly_common", |
| | "common", "moderately_common", "somewhat_uncommon", "uncommon", |
| | "rare", "very_rare" |
| | ], 1)] |
| | |
| | tier_samples = {tier: [] for tier in tier_order} |
| | |
| | |
| | for word, tier in self.frequency_tiers.items(): |
| | if tier in tier_samples and len(tier_samples[tier]) < n: |
| | count, freq, _, rank = self.get_word_info(word) |
| | tier_samples[tier].append((word, count, freq, rank)) |
| | |
| | |
| | for tier in tier_order: |
| | if tier in tier_samples and tier_samples[tier]: |
| | description = self.tier_descriptions.get(tier, tier) |
| | print(f"\n{description}:") |
| | print(f"{'Word':<15} {'Count':<12} {'Frequency':<12} {'Rank'}") |
| | print("-" * 55) |
| | |
| | for word, count, freq, rank in tier_samples[tier]: |
| | print(f"{word:<15} {count:<12,} {freq:<12.8f} {rank:,}") |
| | |
| | def lookup_word(self, word: str): |
| | """Look up detailed information for a specific word.""" |
| | count, freq, tier, rank = self.get_word_info(word) |
| | |
| | print(f"\nWord: '{word}'") |
| | print(f" Count: {count:,}") |
| | print(f" Frequency: {freq:.8f}") |
| | print(f" Tier: {tier}") |
| | print(f" Rank: {rank:,} (out of {self.unique_words:,})") |
| | |
| | if count == 0: |
| | print(" Note: Word not found in Brown corpus") |
| | |
| | def batch_lookup(self, words: List[str]): |
| | """Look up multiple words and compare them.""" |
| | print(f"\n{'='*80}") |
| | print("BATCH WORD LOOKUP") |
| | print(f"{'='*80}") |
| | print(f"{'Word':<15} {'Count':<8} {'Frequency':<12} {'Tier':<12} {'Rank'}") |
| | print("-" * 80) |
| | |
| | results = [] |
| | for word in words: |
| | count, freq, tier, rank = self.get_word_info(word) |
| | results.append((word, count, freq, tier, rank)) |
| | print(f"{word:<15} {count:<8} {freq:<12.6f} {tier:<12} {rank:,}") |
| | |
| | return results |
| | |
| | def analyze_zipf_law(self): |
| | """Analyze how well the frequency distribution follows Zipf's law.""" |
| | print(f"\n{'='*60}") |
| | print("ZIPF'S LAW ANALYSIS") |
| | print(f"{'='*60}") |
| | |
| | |
| | top_words = self.word_frequencies.most_common(1000) |
| | |
| | print("Zipf's law prediction vs actual frequency (top 20 words):") |
| | print(f"{'Rank':<6} {'Word':<15} {'Actual Freq':<12} {'Zipf Pred':<12} {'Ratio'}") |
| | print("-" * 70) |
| | |
| | |
| | baseline_freq = top_words[0][1] / self.total_words |
| | |
| | for i, (word, count) in enumerate(top_words[:20]): |
| | rank = i + 1 |
| | actual_freq = count / self.total_words |
| | zipf_predicted = baseline_freq / rank |
| | ratio = actual_freq / zipf_predicted if zipf_predicted > 0 else 0 |
| | |
| | print(f"{rank:<6} {word:<15} {actual_freq:<12.6f} {zipf_predicted:<12.6f} {ratio:<8.2f}") |
| | |
| | def plot_frequency_distribution(self): |
| | """Create visualizations of frequency distribution.""" |
| | if not HAS_MATPLOTLIB: |
| | print("Matplotlib not available. Skipping visualizations.") |
| | return |
| | |
| | print("\nGenerating frequency distribution plots...") |
| | |
| | |
| | counts = list(self.word_frequencies.values()) |
| | counts.sort(reverse=True) |
| | |
| | |
| | fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10)) |
| | source_name = self.current_source.description if self.current_source else 'Unknown Source' |
| | fig.suptitle(f'Frequency Analysis - {source_name}', fontsize=16) |
| | |
| | |
| | ax1.hist([c for c in counts if c > 1], bins=50, alpha=0.7, edgecolor='black') |
| | ax1.set_xlabel('Word Count') |
| | ax1.set_ylabel('Number of Words') |
| | ax1.set_title('Word Count Distribution') |
| | ax1.set_yscale('log') |
| | |
| | |
| | ranks = list(range(1, min(1000, len(counts)) + 1)) |
| | top_counts = counts[:len(ranks)] |
| | ax2.loglog(ranks, top_counts, 'bo-', alpha=0.7, markersize=3) |
| | ax2.set_xlabel('Rank') |
| | ax2.set_ylabel('Frequency') |
| | ax2.set_title("Zipf's Law (Rank vs Frequency)") |
| | ax2.grid(True, alpha=0.3) |
| | |
| | |
| | tier_counts = Counter(self.frequency_tiers.values()) |
| | |
| | |
| | tier_order = [f"tier_{i}_{name}" for i, name in enumerate([ |
| | "ultra_common", "extremely_common", "very_common", "highly_common", |
| | "common", "moderately_common", "somewhat_uncommon", "uncommon", |
| | "rare", "very_rare" |
| | ], 1)] |
| | |
| | tier_labels = [f"T{i}" for i in range(1, 11)] |
| | tier_values = [tier_counts.get(tier, 0) for tier in tier_order] |
| | |
| | bars = ax3.bar(tier_labels, tier_values, alpha=0.7, edgecolor='black') |
| | ax3.set_ylabel('Number of Words') |
| | ax3.set_title('10-Tier Frequency Distribution') |
| | ax3.set_xlabel('Frequency Tiers (T1=Ultra Common → T10=Very Rare)') |
| | |
| | |
| | import matplotlib.cm as cm |
| | colors = cm.RdYlGn_r(np.linspace(0, 1, len(bars))) |
| | for bar, color in zip(bars, colors): |
| | bar.set_color(color) |
| | |
| | |
| | ranges = [ |
| | "Ultra\nHigh", "Extremely\nHigh", "Very\nHigh", "High", "Mod.\nHigh", |
| | "Medium", "Mod.\nLow", "Low", "Very\nLow", "Ultra\nLow" |
| | ] |
| | |
| | range_counts = [] |
| | conditions = [ |
| | lambda f: f > 0.01, |
| | lambda f: 0.001 < f <= 0.01, |
| | lambda f: 0.0001 < f <= 0.001, |
| | lambda f: 0.00001 < f <= 0.0001, |
| | lambda f: 0.000001 < f <= 0.00001, |
| | lambda f: 0.0000001 < f <= 0.000001, |
| | lambda f: 0.00000001 < f <= 0.0000001, |
| | lambda f: 0.000000001 < f <= 0.00000001, |
| | lambda f: 0.0000000001 < f <= 0.000000001, |
| | lambda f: f <= 0.0000000001 |
| | ] |
| | |
| | for condition in conditions: |
| | count = sum(1 for word, word_count in self.word_frequencies.items() |
| | if condition(word_count / self.total_words)) |
| | range_counts.append(count) |
| | |
| | bars4 = ax4.bar(ranges, range_counts, alpha=0.7, edgecolor='black') |
| | ax4.set_ylabel('Number of Words') |
| | ax4.set_title('Logarithmic Frequency Ranges') |
| | ax4.tick_params(axis='x', rotation=45) |
| | |
| | |
| | colors4 = cm.viridis(np.linspace(0, 1, len(bars4))) |
| | for bar, color in zip(bars4, colors4): |
| | bar.set_color(color) |
| | |
| | plt.tight_layout() |
| | |
| | |
| | plot_path = os.path.join(self.cache_dir, 'frequency_analysis.png') |
| | plt.savefig(plot_path, dpi=300, bbox_inches='tight') |
| | print(f"✓ Saved plots to: {plot_path}") |
| | |
| | |
| | plt.show() |
| | |
| | def interactive_mode(self): |
| | """Run interactive analysis mode.""" |
| | print(f"\n{'='*60}") |
| | print("INTERACTIVE FREQUENCY ANALYZER") |
| | print(f"{'='*60}") |
| | print("Commands:") |
| | print(" lookup <word> - Look up word frequency") |
| | print(" batch <w1,w2,w3> - Look up multiple words") |
| | print(" top [n] - Show top n most common words") |
| | print(" bottom [n] - Show bottom n least common words") |
| | print(" ranges - Show frequency range distribution") |
| | print(" tiers - Show sample words by tier") |
| | print(" zipf - Analyze Zipf's law") |
| | print(" plot - Generate visualizations") |
| | print(" stats - Show basic statistics") |
| | print(" sources - List available frequency sources") |
| | print(" source <name> - Switch to frequency source") |
| | print(" download <source> - Download/create frequency source") |
| | print(" compare <word> - Compare word across sources") |
| | print(" help - Show this help message") |
| | print(" quit - Exit") |
| | print("-" * 60) |
| | |
| | while True: |
| | try: |
| | cmd = input("\nfreq> ").strip() |
| | |
| | if cmd.lower() == 'quit': |
| | break |
| | |
| | parts = cmd.split() |
| | if not parts: |
| | continue |
| | |
| | command = parts[0].lower() |
| | |
| | if command == 'lookup' and len(parts) > 1: |
| | self.lookup_word(parts[1]) |
| | |
| | elif command == 'batch' and len(parts) > 1: |
| | words = [w.strip() for w in ' '.join(parts[1:]).split(',')] |
| | self.batch_lookup(words) |
| | |
| | elif command == 'top': |
| | n = int(parts[1]) if len(parts) > 1 else 20 |
| | self.show_top_words(n) |
| | |
| | elif command == 'bottom': |
| | n = int(parts[1]) if len(parts) > 1 else 20 |
| | self.show_bottom_words(n) |
| | |
| | elif command == 'ranges': |
| | self.show_frequency_ranges() |
| | |
| | elif command == 'tiers': |
| | self.show_tier_samples() |
| | |
| | elif command == 'zipf': |
| | self.analyze_zipf_law() |
| | |
| | elif command == 'plot': |
| | self.plot_frequency_distribution() |
| | |
| | elif command == 'stats': |
| | print(f"\nBasic Statistics:") |
| | print(f" Current source: {self.current_source.name if self.current_source else 'None'}") |
| | print(f" Total word tokens: {self.total_words:,}") |
| | print(f" Unique words: {self.unique_words:,}") |
| | if self.word_frequencies: |
| | print(f" Average word length: {sum(len(w) for w in self.word_frequencies) / self.unique_words:.1f}") |
| | |
| | |
| | most_common_word, most_common_count = self.word_frequencies.most_common(1)[0] |
| | print(f" Most common word: '{most_common_word}' ({most_common_count:,} times)") |
| | |
| | elif command == 'sources': |
| | self.list_sources() |
| | |
| | elif command == 'source' and len(parts) > 1: |
| | self.switch_source(parts[1]) |
| | |
| | elif command == 'download' and len(parts) > 1: |
| | if self.download_source(parts[1]): |
| | print(f"✓ Downloaded {parts[1]}. Use 'source {parts[1]}' to switch to it.") |
| | |
| | elif command == 'compare' and len(parts) > 1: |
| | self.compare_word_across_sources(parts[1]) |
| | |
| | elif command == 'help': |
| | print(f"\n{'='*60}") |
| | print("AVAILABLE COMMANDS") |
| | print(f"{'='*60}") |
| | print(" lookup <word> - Look up word frequency") |
| | print(" batch <w1,w2,w3> - Look up multiple words") |
| | print(" top [n] - Show top n most common words") |
| | print(" bottom [n] - Show bottom n least common words") |
| | print(" ranges - Show frequency range distribution") |
| | print(" tiers - Show sample words by tier") |
| | print(" zipf - Analyze Zipf's law") |
| | print(" plot - Generate visualizations") |
| | print(" stats - Show basic statistics") |
| | print(" sources - List available frequency sources") |
| | print(" source <name> - Switch to frequency source") |
| | print(" download <source> - Download/create frequency source") |
| | print(" compare <word> - Compare word across sources") |
| | print(" help - Show this help message") |
| | print(" quit - Exit") |
| | |
| | else: |
| | print("Unknown command. Type 'help' for available commands or 'quit' to exit.") |
| | |
| | except KeyboardInterrupt: |
| | break |
| | except Exception as e: |
| | print(f"Error: {e}") |
| | |
| | print("\nGoodbye!") |
| |
|
| |
|
| | def main(): |
| | """Main function.""" |
| | |
| | cache_dir = os.path.join(os.path.dirname(__file__), 'model_cache') |
| | if not os.path.exists(cache_dir): |
| | print(f"Error: Cache directory not found: {cache_dir}") |
| | print("Please run the thematic word generator first to create the cache.") |
| | sys.exit(1) |
| | |
| | |
| | analyzer = FrequencyAnalyzer(cache_dir) |
| | |
| | if not analyzer.word_frequencies: |
| | print("Failed to load frequency data. Exiting.") |
| | sys.exit(1) |
| | |
| | |
| | if len(sys.argv) > 1: |
| | command = sys.argv[1].lower() |
| | |
| | if command == 'stats': |
| | print(f"\nBrown Corpus Statistics:") |
| | print(f" Total word tokens: {analyzer.total_words:,}") |
| | print(f" Unique words: {analyzer.unique_words:,}") |
| | |
| | elif command == 'top': |
| | n = int(sys.argv[2]) if len(sys.argv) > 2 else 20 |
| | analyzer.show_top_words(n) |
| | |
| | elif command == 'bottom': |
| | n = int(sys.argv[2]) if len(sys.argv) > 2 else 20 |
| | analyzer.show_bottom_words(n) |
| | |
| | elif command == 'ranges': |
| | analyzer.show_frequency_ranges() |
| | |
| | elif command == 'tiers': |
| | analyzer.show_tier_samples() |
| | |
| | elif command == 'zipf': |
| | analyzer.analyze_zipf_law() |
| | |
| | elif command == 'plot': |
| | analyzer.plot_frequency_distribution() |
| | |
| | elif command == 'lookup' and len(sys.argv) > 2: |
| | analyzer.lookup_word(sys.argv[2]) |
| | |
| | elif command == 'interactive': |
| | analyzer.interactive_mode() |
| | |
| | elif command == 'sources': |
| | analyzer.list_sources() |
| | |
| | elif command == 'download' and len(sys.argv) > 2: |
| | source_name = sys.argv[2] |
| | if analyzer.download_source(source_name): |
| | print(f"✓ Downloaded {source_name}. Use 'source {source_name}' to switch to it.") |
| | |
| | elif command == 'source' and len(sys.argv) > 2: |
| | analyzer.switch_source(sys.argv[2]) |
| | |
| | elif command == 'compare' and len(sys.argv) > 2: |
| | analyzer.compare_word_across_sources(sys.argv[2]) |
| | |
| | elif command == 'help': |
| | print("\nAvailable commands:") |
| | print(" stats - Show basic frequency statistics") |
| | print(" top [n] - Show top n most common words") |
| | print(" bottom [n] - Show bottom n least common words") |
| | print(" ranges - Show frequency range distribution") |
| | print(" tiers - Show sample words by tier") |
| | print(" zipf - Analyze Zipf's law") |
| | print(" plot - Generate visualizations") |
| | print(" sources - List available frequency sources") |
| | print(" download <source> - Download/create frequency source") |
| | print(" source <name> - Switch to frequency source") |
| | print(" compare <word> - Compare word across sources") |
| | print(" lookup <word> - Look up word frequency") |
| | print(" interactive - Enter interactive mode") |
| | print(" help - Show this help message") |
| | |
| | else: |
| | print("Usage: python frequency_analyzer.py [help|stats|top|bottom|ranges|tiers|zipf|plot|sources|download <source>|source <name>|compare <word>|lookup <word>|interactive]") |
| | |
| | else: |
| | |
| | print(f"\nBrown Corpus Overview:") |
| | print(f" Total tokens: {analyzer.total_words:,}") |
| | print(f" Unique words: {analyzer.unique_words:,}") |
| | |
| | analyzer.show_top_words(10) |
| | analyzer.show_tier_samples(5) |
| | analyzer.interactive_mode() |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|