| """ |
| ane_lora_trainer.py — LoRA training engine using Apple Neural Engine. |
| |
| Manages per-layer LoRA adapters (A & B matrices), compiles ANE kernels once, |
| and runs forward/backward passes on ANE hardware. Training loop: |
| 1. Forward: base model inference via MLX, with LoRA additions via ANE |
| 2. Loss: cross-entropy computed on CPU |
| 3. Backward: LoRA gradients computed on ANE |
| 4. Update: Adam optimizer on CPU (LoRA params only — tiny, instant) |
| |
| The adapter weights live as numpy arrays in shared memory. MLX inference |
| reads them (zero-copy via mlx.array), ANE training writes updated values. |
| """ |
|
|
| import json |
| import logging |
| import math |
| import time |
| from pathlib import Path |
| from typing import Optional |
|
|
| import numpy as np |
|
|
| from ane_bridge_py import ANEBridge |
| from ane_mil_lora import LoRAKernelSet |
| from neural_config import NeuralConfig |
|
|
| log = logging.getLogger("ane_lora_trainer") |
|
|
|
|
| class LoRAAdapter: |
| """Per-target LoRA adapter (A & B matrices) for all layers.""" |
|
|
| def __init__(self, n_layers: int, dim: int, rank: int): |
| self.n_layers = n_layers |
| self.dim = dim |
| self.rank = rank |
|
|
| |
| |
| scale = 1.0 / math.sqrt(dim) |
| self.A = [np.random.randn(rank, dim).astype(np.float32) * scale |
| for _ in range(n_layers)] |
| self.B = [np.zeros((dim, rank), dtype=np.float32) |
| for _ in range(n_layers)] |
|
|
| def param_count(self) -> int: |
| """Total trainable parameters.""" |
| return self.n_layers * 2 * self.dim * self.rank |
|
|
| def memory_bytes(self) -> int: |
| """Total memory for adapter weights.""" |
| return self.param_count() * 4 |
|
|
|
|
| class AdamState: |
| """Adam optimizer state for LoRA parameters.""" |
|
|
| def __init__(self, adapter: LoRAAdapter, lr: float = 1e-5, |
| beta1: float = 0.9, beta2: float = 0.999, |
| eps: float = 1e-8, weight_decay: float = 0.0): |
| self.lr = lr |
| self.beta1 = beta1 |
| self.beta2 = beta2 |
| self.eps = eps |
| self.weight_decay = weight_decay |
| self.t = 0 |
|
|
| n = adapter.n_layers |
| |
| self.m_A = [np.zeros_like(adapter.A[i]) for i in range(n)] |
| self.v_A = [np.zeros_like(adapter.A[i]) for i in range(n)] |
| self.m_B = [np.zeros_like(adapter.B[i]) for i in range(n)] |
| self.v_B = [np.zeros_like(adapter.B[i]) for i in range(n)] |
|
|
| def step(self, adapter: LoRAAdapter, |
| grads_A: list[np.ndarray], grads_B: list[np.ndarray], |
| grad_clip: float = 1.0): |
| """One Adam update step for all layers. |
| |
| Args: |
| adapter: LoRA adapter to update in-place |
| grads_A: list of dA gradients per layer |
| grads_B: list of dB gradients per layer |
| grad_clip: max gradient norm (per-parameter) |
| """ |
| self.t += 1 |
| bc1 = 1 - self.beta1 ** self.t |
| bc2 = 1 - self.beta2 ** self.t |
|
|
| for i in range(adapter.n_layers): |
| for param, grad, m, v in [ |
| (adapter.A, grads_A, self.m_A, self.v_A), |
| (adapter.B, grads_B, self.m_B, self.v_B), |
| ]: |
| g = grad[i] |
|
|
| |
| gnorm = np.linalg.norm(g) |
| if gnorm > grad_clip: |
| g = g * (grad_clip / gnorm) |
|
|
| |
| if self.weight_decay > 0: |
| param[i] -= self.lr * self.weight_decay * param[i] |
|
|
| |
| m[i] = self.beta1 * m[i] + (1 - self.beta1) * g |
| v[i] = self.beta2 * v[i] + (1 - self.beta2) * g * g |
|
|
| |
| m_hat = m[i] / bc1 |
| v_hat = v[i] / bc2 |
| param[i] -= self.lr * m_hat / (np.sqrt(v_hat) + self.eps) |
|
|
|
|
| class ANELoRATrainer: |
| """Main training engine orchestrating ANE kernels + optimizer. |
| |
| Usage: |
| trainer = ANELoRATrainer(config) |
| trainer.initialize(n_layers=32, dim=3584) |
| |
| # Per-turn training |
| for input_ids, target_ids in training_data: |
| loss = trainer.train_step(activations, target_logits) |
| |
| # Save adapter |
| trainer.save_adapter("/path/to/adapter/") |
| """ |
|
|
| def __init__(self, config: NeuralConfig): |
| self.config = config |
| self.ane: Optional[ANEBridge] = None |
| self.kernels: Optional[LoRAKernelSet] = None |
| self.initialized = False |
|
|
| |
| self.adapters: dict[str, LoRAAdapter] = {} |
| self.optimizers: dict[str, AdamState] = {} |
|
|
| |
| self.total_steps = 0 |
| self.total_cycles = 0 |
| self.last_loss = float('inf') |
| self.loss_history: list[float] = [] |
| self.adapter_version = 0 |
|
|
| def initialize(self, n_layers: int, dim: int): |
| """Initialize ANE bridge, compile kernels, create adapters. |
| |
| Args: |
| n_layers: number of transformer layers |
| dim: model hidden dimension |
| """ |
| rank = self.config.lora_rank |
| seq = self.config.ane_seq_len |
| scaling = self.config.lora_scaling |
|
|
| log.info(f"Initializing ANE LoRA trainer: {n_layers} layers, " |
| f"dim={dim}, rank={rank}, seq={seq}, scaling={scaling:.2f}") |
|
|
| |
| self.ane = ANEBridge() |
| log.info(f"ANE bridge initialized (compile budget: " |
| f"{self.ane.compile_budget_remaining})") |
|
|
| |
| self.kernels = LoRAKernelSet(self.ane, dim, rank, seq, scaling) |
| log.info(f"LoRA kernels compiled (4 kernels, " |
| f"compile count: {self.ane.compile_count})") |
|
|
| |
| for target in self.config.lora_targets: |
| adapter = LoRAAdapter(n_layers, dim, rank) |
| self.adapters[target] = adapter |
| self.optimizers[target] = AdamState( |
| adapter, |
| lr=self.config.learning_rate, |
| beta1=self.config.adam_beta1, |
| beta2=self.config.adam_beta2, |
| eps=self.config.adam_eps, |
| weight_decay=self.config.weight_decay, |
| ) |
|
|
| total_params = sum(a.param_count() for a in self.adapters.values()) |
| total_mb = sum(a.memory_bytes() for a in self.adapters.values()) / 1e6 |
| log.info(f"Adapters initialized: {len(self.adapters)} targets, " |
| f"{total_params:,} params ({total_mb:.1f} MB)") |
|
|
| self.initialized = True |
| self.n_layers = n_layers |
| self.dim = dim |
|
|
| def get_adapter_weights(self, target: str, layer: int) -> tuple[np.ndarray, np.ndarray]: |
| """Get LoRA A and B matrices for a specific target and layer. |
| |
| Used by MLX inference to add LoRA contribution. |
| |
| Returns: |
| (A [rank, dim], B [dim, rank]) |
| """ |
| adapter = self.adapters[target] |
| return adapter.A[layer], adapter.B[layer] |
|
|
| def compute_lora_forward(self, target: str, layer: int, |
| x: np.ndarray) -> np.ndarray: |
| """Compute LoRA forward pass for one target in one layer on ANE. |
| |
| Args: |
| target: "q_proj" or "v_proj" |
| layer: transformer layer index |
| x: [1, dim, 1, seq] fp32 activation |
| |
| Returns: |
| [1, dim, 1, seq] fp32 LoRA output (to be added to base output) |
| """ |
| adapter = self.adapters[target] |
| return self.kernels.forward(x, adapter.A[layer], adapter.B[layer]) |
|
|
| def compute_lora_backward(self, target: str, layer: int, |
| grad_out: np.ndarray, |
| x: np.ndarray) -> tuple[np.ndarray, np.ndarray]: |
| """Compute LoRA gradients for one target in one layer on ANE. |
| |
| Args: |
| target: "q_proj" or "v_proj" |
| layer: transformer layer index |
| grad_out: [1, dim, 1, seq] fp32 upstream gradient |
| x: [1, dim, 1, seq] fp32 saved activation |
| |
| Returns: |
| (dA [rank, dim], dB [dim, rank]) |
| """ |
| adapter = self.adapters[target] |
| return self.kernels.backward( |
| grad_out, x, adapter.A[layer], adapter.B[layer]) |
|
|
| def train_step(self, layer_activations: list[np.ndarray], |
| logits: np.ndarray, target_ids: np.ndarray) -> float: |
| """One complete training step: forward + loss + backward + update. |
| |
| This is the simplified version that computes LoRA gradients |
| using a "shortcut" approach: we approximate the gradient by |
| computing dL/d(lora_output) for each layer independently, |
| treating the base model's gradient flow as given. |
| |
| For the full training loop with proper gradient propagation, |
| the neural_daemon integrates with MLX's autograd. |
| |
| Args: |
| layer_activations: list of [1, dim, 1, seq] per layer |
| (saved during MLX forward pass) |
| logits: [vocab, seq] fp32 model output logits |
| target_ids: [seq] int target token IDs |
| |
| Returns: |
| float: cross-entropy loss value |
| """ |
| if not self.initialized: |
| raise RuntimeError("Trainer not initialized") |
|
|
| |
| loss, dlogits = self._cross_entropy_backward(logits, target_ids) |
|
|
| |
| all_grads: dict[str, tuple[list[np.ndarray], list[np.ndarray]]] = {} |
|
|
| for target in self.adapters: |
| grads_A = [] |
| grads_B = [] |
|
|
| for layer_idx in range(self.n_layers): |
| |
| x = layer_activations[layer_idx] |
|
|
| |
| |
| |
| grad_out = self._approximate_layer_gradient( |
| layer_idx, dlogits, layer_activations) |
|
|
| |
| dA, dB = self.compute_lora_backward( |
| target, layer_idx, grad_out, x) |
|
|
| grads_A.append(dA) |
| grads_B.append(dB) |
|
|
| all_grads[target] = (grads_A, grads_B) |
|
|
| |
| for target, (grads_A, grads_B) in all_grads.items(): |
| self.optimizers[target].step( |
| self.adapters[target], grads_A, grads_B, |
| grad_clip=self.config.gradient_clip) |
|
|
| self.total_steps += 1 |
| self.last_loss = loss |
| self.loss_history.append(loss) |
|
|
| return loss |
|
|
| def train_micro_step_direct(self, target: str, layer: int, |
| x: np.ndarray, |
| grad_out: np.ndarray) -> tuple[float, float]: |
| """Direct micro-training step for a single layer/target. |
| |
| Called by the neural daemon when MLX provides per-layer gradients. |
| This is the primary training interface. |
| |
| Args: |
| target: "q_proj" or "v_proj" |
| layer: layer index |
| x: [1, dim, 1, seq] fp32 activation |
| grad_out: [1, dim, 1, seq] fp32 gradient from MLX |
| |
| Returns: |
| (grad_norm_A, grad_norm_B) for monitoring |
| """ |
| |
| dA, dB = self.compute_lora_backward(target, layer, grad_out, x) |
|
|
| |
| adapter = self.adapters[target] |
| optimizer = self.optimizers[target] |
|
|
| optimizer.t += 1 |
| bc1 = 1 - optimizer.beta1 ** optimizer.t |
| bc2 = 1 - optimizer.beta2 ** optimizer.t |
|
|
| grad_norm_A = float(np.linalg.norm(dA)) |
| grad_norm_B = float(np.linalg.norm(dB)) |
|
|
| for param_list, grad, m_list, v_list in [ |
| (adapter.A, dA, optimizer.m_A, optimizer.v_A), |
| (adapter.B, dB, optimizer.m_B, optimizer.v_B), |
| ]: |
| g = grad |
| gnorm = np.linalg.norm(g) |
| if gnorm > self.config.gradient_clip: |
| g = g * (self.config.gradient_clip / gnorm) |
|
|
| if self.config.weight_decay > 0: |
| param_list[layer] -= optimizer.lr * self.config.weight_decay * param_list[layer] |
|
|
| m_list[layer] = optimizer.beta1 * m_list[layer] + (1 - optimizer.beta1) * g |
| v_list[layer] = optimizer.beta2 * v_list[layer] + (1 - optimizer.beta2) * g * g |
|
|
| m_hat = m_list[layer] / bc1 |
| v_hat = v_list[layer] / bc2 |
| param_list[layer] -= optimizer.lr * m_hat / (np.sqrt(v_hat) + optimizer.eps) |
|
|
| return grad_norm_A, grad_norm_B |
|
|
| def run_training_cycle(self, layer_activations: list[np.ndarray], |
| logits: np.ndarray, target_ids: np.ndarray, |
| steps: int = 0) -> dict: |
| """Run a full micro-training cycle (multiple steps on same data). |
| |
| Args: |
| layer_activations: per-layer activations from forward pass |
| logits: model output logits |
| target_ids: target token IDs |
| steps: number of steps (0 = use config default) |
| |
| Returns: |
| dict with training metrics |
| """ |
| steps = steps or self.config.steps_per_cycle |
| start = time.time() |
| losses = [] |
|
|
| for step in range(steps): |
| loss = self.train_step(layer_activations, logits, target_ids) |
| losses.append(loss) |
|
|
| elapsed = time.time() - start |
| self.total_cycles += 1 |
|
|
| |
| if (self.config.auto_save_interval > 0 and |
| self.total_cycles % self.config.auto_save_interval == 0): |
| self.save_adapter() |
| self.adapter_version += 1 |
|
|
| return { |
| "cycle": self.total_cycles, |
| "steps": steps, |
| "initial_loss": losses[0], |
| "final_loss": losses[-1], |
| "mean_loss": float(np.mean(losses)), |
| "elapsed_sec": elapsed, |
| "steps_per_sec": steps / elapsed if elapsed > 0 else 0, |
| "adapter_version": self.adapter_version, |
| } |
|
|
| @staticmethod |
| def _cross_entropy_backward(logits: np.ndarray, |
| target_ids: np.ndarray) -> tuple[float, np.ndarray]: |
| """Compute cross-entropy loss and gradient w.r.t. logits. |
| |
| Args: |
| logits: [vocab, seq] fp32 |
| target_ids: [seq] int |
| |
| Returns: |
| (loss, dlogits [vocab, seq]) |
| """ |
| vocab, seq_len = logits.shape |
|
|
| |
| logits_shifted = logits - logits.max(axis=0, keepdims=True) |
| exp_logits = np.exp(logits_shifted) |
| probs = exp_logits / exp_logits.sum(axis=0, keepdims=True) |
|
|
| |
| target_probs = probs[target_ids, np.arange(seq_len)] |
| loss = -np.log(target_probs + 1e-10).mean() |
|
|
| |
| dlogits = probs.copy() |
| dlogits[target_ids, np.arange(seq_len)] -= 1.0 |
| dlogits /= seq_len |
|
|
| return float(loss), dlogits |
|
|
| def _approximate_layer_gradient(self, layer_idx: int, |
| dlogits: np.ndarray, |
| activations: list[np.ndarray]) -> np.ndarray: |
| """Approximate per-layer gradient for standalone training. |
| |
| Uses the layer's activation as a gradient proxy, scaled by layer depth |
| and a lightweight signal from the loss gradient. This avoids the |
| prohibitively expensive random projection from vocab-size space. |
| |
| In the full daemon, MLX computes exact gradients. |
| """ |
| seq = self.config.ane_seq_len |
| dim = self.dim |
|
|
| |
| depth_scale = (layer_idx + 1) / self.n_layers |
|
|
| |
| |
| activation = activations[layer_idx] |
| grad_magnitude = np.sqrt((dlogits ** 2).mean()) * depth_scale |
|
|
| |
| rng = np.random.RandomState(layer_idx + self.total_steps) |
| noise = rng.randn(1, dim, 1, seq).astype(np.float32) * 0.01 |
|
|
| grad = (activation * grad_magnitude + noise).astype(np.float32) |
| return grad.reshape(1, dim, 1, seq) |
|
|
| def save_adapter(self, path: str = ""): |
| """Save all adapter weights to disk.""" |
| path = path or self.config.adapter_dir |
| Path(path).mkdir(parents=True, exist_ok=True) |
|
|
| for target, adapter in self.adapters.items(): |
| target_dir = Path(path) / target |
| target_dir.mkdir(exist_ok=True) |
|
|
| for i in range(adapter.n_layers): |
| np.save(str(target_dir / f"A_{i:03d}.npy"), adapter.A[i]) |
| np.save(str(target_dir / f"B_{i:03d}.npy"), adapter.B[i]) |
|
|
| |
| meta = { |
| "n_layers": self.n_layers, |
| "dim": self.dim, |
| "rank": self.config.lora_rank, |
| "targets": list(self.adapters.keys()), |
| "total_steps": self.total_steps, |
| "total_cycles": self.total_cycles, |
| "last_loss": self.last_loss, |
| "adapter_version": self.adapter_version, |
| "timestamp": time.time(), |
| } |
| with open(Path(path) / "adapter_meta.json", "w") as f: |
| json.dump(meta, f, indent=2) |
|
|
| log.info(f"Adapter saved to {path} (v{self.adapter_version}, " |
| f"{self.total_steps} steps, loss={self.last_loss:.4f})") |
|
|
| def load_adapter(self, path: str = ""): |
| """Load adapter weights from disk.""" |
| path = path or self.config.adapter_dir |
| meta_path = Path(path) / "adapter_meta.json" |
|
|
| if not meta_path.exists(): |
| log.warning(f"No adapter found at {path}") |
| return False |
|
|
| with open(meta_path) as f: |
| meta = json.load(f) |
|
|
| for target in meta["targets"]: |
| if target not in self.adapters: |
| log.warning(f"Adapter target {target} not in current config") |
| continue |
|
|
| adapter = self.adapters[target] |
| target_dir = Path(path) / target |
|
|
| for i in range(min(meta["n_layers"], adapter.n_layers)): |
| a_path = target_dir / f"A_{i:03d}.npy" |
| b_path = target_dir / f"B_{i:03d}.npy" |
| if a_path.exists() and b_path.exists(): |
| adapter.A[i] = np.load(str(a_path)) |
| adapter.B[i] = np.load(str(b_path)) |
|
|
| self.total_steps = meta.get("total_steps", 0) |
| self.total_cycles = meta.get("total_cycles", 0) |
| self.last_loss = meta.get("last_loss", float('inf')) |
| self.adapter_version = meta.get("adapter_version", 0) |
|
|
| log.info(f"Adapter loaded from {path} (v{self.adapter_version}, " |
| f"{self.total_steps} steps)") |
| return True |
|
|
| def reset_adapter(self): |
| """Reset all adapters to initial values (fresh start).""" |
| for target, adapter in self.adapters.items(): |
| scale = 1.0 / math.sqrt(adapter.dim) |
| for i in range(adapter.n_layers): |
| adapter.A[i] = np.random.randn( |
| adapter.rank, adapter.dim).astype(np.float32) * scale |
| adapter.B[i] = np.zeros( |
| (adapter.dim, adapter.rank), dtype=np.float32) |
|
|
| |
| optimizer = self.optimizers[target] |
| optimizer.t = 0 |
| for i in range(adapter.n_layers): |
| optimizer.m_A[i].fill(0) |
| optimizer.v_A[i].fill(0) |
| optimizer.m_B[i].fill(0) |
| optimizer.v_B[i].fill(0) |
|
|
| self.total_steps = 0 |
| self.total_cycles = 0 |
| self.last_loss = float('inf') |
| self.loss_history.clear() |
| self.adapter_version += 1 |
| log.info("Adapter reset to initial values") |
|
|
| def update_learning_rate(self, lr: float): |
| """Update learning rate for all optimizers.""" |
| for opt in self.optimizers.values(): |
| opt.lr = lr |
| self.config.learning_rate = lr |
|
|
| def stats(self) -> dict: |
| """Return training statistics.""" |
| total_params = sum(a.param_count() for a in self.adapters.values()) |
| total_mb = sum(a.memory_bytes() for a in self.adapters.values()) / 1e6 |
|
|
| result = { |
| "initialized": self.initialized, |
| "total_params": total_params, |
| "adapter_memory_mb": round(total_mb, 1), |
| "targets": list(self.adapters.keys()), |
| "total_steps": self.total_steps, |
| "total_cycles": self.total_cycles, |
| "last_loss": self.last_loss, |
| "adapter_version": self.adapter_version, |
| } |
|
|
| if self.ane: |
| result["ane_compile_count"] = self.ane.compile_count |
| result["ane_compile_budget"] = self.ane.compile_budget_remaining |
|
|
| if self.loss_history: |
| recent = self.loss_history[-10:] |
| result["recent_avg_loss"] = round(float(np.mean(recent)), 4) |
|
|
| return result |
|
|
| def cleanup(self): |
| """Free ANE resources.""" |
| if self.kernels: |
| self.kernels.free() |
| self.kernels = None |
| self.initialized = False |
| log.info("ANE LoRA trainer cleaned up") |
|
|
|
|
| def self_test(): |
| """Test the training engine with a small model.""" |
| logging.basicConfig(level=logging.INFO, |
| format="%(name)s: %(message)s") |
|
|
| print("ANE LoRA Trainer Self-Test") |
| print("=" * 50) |
|
|
| config = NeuralConfig() |
| config.lora_rank = 16 |
| config.lora_targets = ["q_proj", "v_proj"] |
| config.ane_seq_len = 16 |
| config.learning_rate = 1e-4 |
| config.adapter_dir = "/tmp/jarvis_lora_test" |
| config.resolve_paths() |
|
|
| trainer = ANELoRATrainer(config) |
|
|
| |
| n_layers = 4 |
| dim = 64 |
| seq = 16 |
| vocab = 128 |
|
|
| print(f"\nInitializing: {n_layers} layers, dim={dim}, rank={config.lora_rank}") |
| trainer.initialize(n_layers, dim) |
| print(f"[OK] Initialized: {trainer.stats()['total_params']:,} params") |
|
|
| |
| print("\nTesting LoRA forward pass...") |
| x = np.random.randn(1, dim, 1, seq).astype(np.float32) * 0.1 |
| out_q = trainer.compute_lora_forward("q_proj", 0, x) |
| out_v = trainer.compute_lora_forward("v_proj", 0, x) |
| print(f"[OK] Forward: q_proj max={np.abs(out_q).max():.6f}, " |
| f"v_proj max={np.abs(out_v).max():.6f}") |
|
|
| |
| print("\nTesting training step...") |
| activations = [np.random.randn(1, dim, 1, seq).astype(np.float32) * 0.1 |
| for _ in range(n_layers)] |
| logits = np.random.randn(vocab, seq).astype(np.float32) |
| target_ids = np.random.randint(0, vocab, size=seq) |
|
|
| loss = trainer.train_step(activations, logits, target_ids) |
| print(f"[OK] Training step: loss={loss:.4f}") |
|
|
| |
| print("\nRunning 5 training steps...") |
| losses = [loss] |
| for _ in range(4): |
| l = trainer.train_step(activations, logits, target_ids) |
| losses.append(l) |
| print(f"[OK] Losses: {[f'{l:.4f}' for l in losses]}") |
| print(f" Steps completed: {trainer.total_steps}") |
|
|
| |
| print("\nTesting direct micro-step...") |
| grad_out = np.random.randn(1, dim, 1, seq).astype(np.float32) * 0.01 |
| gn_a, gn_b = trainer.train_micro_step_direct("q_proj", 0, x, grad_out) |
| print(f"[OK] Micro-step: grad_norm_A={gn_a:.6f}, grad_norm_B={gn_b:.6f}") |
|
|
| |
| print("\nTesting save/load...") |
| trainer.save_adapter() |
|
|
| |
| A_before, B_before = trainer.get_adapter_weights("q_proj", 0) |
| A_copy = A_before.copy() |
|
|
| |
| trainer.reset_adapter() |
| A_after, _ = trainer.get_adapter_weights("q_proj", 0) |
| assert not np.allclose(A_copy, A_after), "Reset didn't change weights" |
|
|
| |
| trainer.load_adapter() |
| A_loaded, _ = trainer.get_adapter_weights("q_proj", 0) |
| assert np.allclose(A_copy, A_loaded), "Loaded weights don't match saved" |
| print("[OK] Save/load round-trip verified") |
|
|
| |
| trainer.cleanup() |
| print(f"\n[PASS] All trainer tests passed") |
| print(f" Stats: {trainer.stats()}") |
|
|
| |
| import shutil |
| shutil.rmtree("/tmp/jarvis_lora_test", ignore_errors=True) |
|
|
| return True |
|
|
|
|
| if __name__ == "__main__": |
| success = self_test() |
| exit(0 if success else 1) |
|
|