Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| from typing import List, Tuple, Optional, Dict, Any | |
| import io | |
| import os | |
| import tempfile | |
| import numpy as np | |
| import mne | |
| from scipy.io import loadmat | |
| try: | |
| import h5py # MAT v7.3 (HDF5) | |
| except Exception: # pragma: no cover | |
| h5py = None | |
| # ============================================================ | |
| # EEGLAB loader (.set + .fdt) | |
| # ============================================================ | |
| def pick_set_fdt(files) -> Tuple[Optional[object], Optional[object]]: | |
| """ | |
| Streamlitの accept_multiple_files=True で受け取ったfilesから .set と .fdt を拾う。 | |
| Returns: (set_file, fdt_file) | |
| """ | |
| set_file = None | |
| fdt_file = None | |
| for f in files: | |
| name = (getattr(f, "name", "") or "").lower() | |
| if name.endswith(".set"): | |
| set_file = f | |
| elif name.endswith(".fdt"): | |
| fdt_file = f | |
| return set_file, fdt_file | |
| def same_stem(a_name: str, b_name: str) -> bool: | |
| """Check if two filenames have the same stem (basename without extension).""" | |
| a_stem = os.path.splitext(os.path.basename(a_name))[0] | |
| b_stem = os.path.splitext(os.path.basename(b_name))[0] | |
| return a_stem == b_stem | |
| def extract_electrode_positions_from_hdf5(set_path: str) -> tuple: | |
| """ | |
| HDF5形式のEEGLABファイルから電極位置を抽出。 | |
| Returns: | |
| tuple: (pos_2d, pos_3d) | |
| pos_2d: (C, 2) 電極の2D座標、取得できない場合はNone | |
| pos_3d: (C, 3) 電極の3D座標、取得できない場合はNone | |
| """ | |
| if h5py is None: | |
| return None, None | |
| try: | |
| with h5py.File(set_path, "r") as f: | |
| # EEGLABのchanlocs構造を探す | |
| chanlocs_path = None | |
| for path in ["EEG/chanlocs", "chanlocs"]: | |
| if path in f: | |
| chanlocs_path = path | |
| break | |
| if chanlocs_path is None: | |
| return None, None | |
| chanlocs = f[chanlocs_path] | |
| # X, Y, Z座標を取得 | |
| xs, ys, zs = [], [], [] | |
| # パターン1: chanlocs/X, chanlocs/Y, chanlocs/Z が直接データの場合 | |
| if "X" in chanlocs and "Y" in chanlocs and "Z" in chanlocs: | |
| x_data = chanlocs["X"][()] | |
| y_data = chanlocs["Y"][()] | |
| z_data = chanlocs["Z"][()] | |
| # 参照型の場合は各参照を辿る | |
| if x_data.dtype == h5py.ref_dtype: | |
| for i in range(len(x_data)): | |
| try: | |
| x_val = f[x_data[i, 0]][()] | |
| y_val = f[y_data[i, 0]][()] | |
| z_val = f[z_data[i, 0]][()] | |
| # スカラー値を取得 | |
| x_val = float(x_val.flat[0]) if hasattr(x_val, 'flat') else float(x_val) | |
| y_val = float(y_val.flat[0]) if hasattr(y_val, 'flat') else float(y_val) | |
| z_val = float(z_val.flat[0]) if hasattr(z_val, 'flat') else float(z_val) | |
| xs.append(x_val) | |
| ys.append(y_val) | |
| zs.append(z_val) | |
| except: | |
| # 読み込めない座標は0に | |
| xs.append(0.0) | |
| ys.append(0.0) | |
| zs.append(0.0) | |
| else: | |
| # 直接数値データの場合 | |
| xs = x_data.flatten().astype(float) | |
| ys = y_data.flatten().astype(float) | |
| zs = z_data.flatten().astype(float) | |
| else: | |
| return None, None | |
| # リストをnumpy配列に変換 | |
| xs = np.array(xs, dtype=float) | |
| ys = np.array(ys, dtype=float) | |
| zs = np.array(zs, dtype=float) | |
| if len(xs) == 0: | |
| return None, None | |
| # NaN値をチェック(数値型に変換後) | |
| valid_mask = ~(np.isnan(xs) | np.isnan(ys) | np.isnan(zs)) | |
| if not np.any(valid_mask): | |
| return None, None | |
| # 無効な座標は平均値で置き換え | |
| if not np.all(valid_mask): | |
| xs[~valid_mask] = np.nanmean(xs) | |
| ys[~valid_mask] = np.nanmean(ys) | |
| zs[~valid_mask] = np.nanmean(zs) | |
| # 3D座標を構築 | |
| positions_3d = np.column_stack([xs, ys, zs]) | |
| # 正規化 | |
| dists = np.sqrt(np.sum(positions_3d**2, axis=1)) | |
| max_dist_3d = np.max(dists[dists > 0]) if np.any(dists > 0) else 1.0 | |
| if max_dist_3d > 0: | |
| positions_3d = positions_3d / max_dist_3d | |
| # 2D投影 | |
| pos_2d = positions_3d[:, :2] | |
| dists_2d = np.sqrt(np.sum(pos_2d**2, axis=1)) | |
| max_dist_2d = np.max(dists_2d[dists_2d > 0]) if np.any(dists_2d > 0) else 1.0 | |
| if max_dist_2d > 0: | |
| pos_2d = pos_2d / max_dist_2d * 0.85 | |
| print(f"HDF5から電極位置を取得: {len(xs)} channels") | |
| return pos_2d.astype(np.float32), positions_3d.astype(np.float32) | |
| except Exception as e: | |
| print(f"HDF5から電極位置の抽出に失敗: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return None, None | |
| def extract_electrode_positions_2d(set_path: str): | |
| """ | |
| EEGLABファイルから電極位置(2D, 3D)を抽出。 | |
| Returns: | |
| tuple: (pos_2d, pos_3d) | |
| pos_2d: (C, 2) 電極の2D座標、取得できない場合はNone | |
| pos_3d: (C, 3) 電極の3D座標、取得できない場合はNone | |
| """ | |
| try: | |
| # MNEで読み込み | |
| raw = mne.io.read_raw_eeglab(set_path, preload=False, verbose=False) | |
| montage = raw.get_montage() | |
| if montage is None: | |
| return None, None | |
| # 3D座標を取得 | |
| pos_3d_dict = montage.get_positions()['ch_pos'] | |
| if not pos_3d_dict: | |
| return None, None | |
| # チャンネル名順に並べ替え | |
| ch_names = raw.ch_names | |
| positions_3d = [] | |
| for ch_name in ch_names: | |
| if ch_name in pos_3d_dict: | |
| positions_3d.append(pos_3d_dict[ch_name]) | |
| else: | |
| # 座標がないチャンネルは原点に配置 | |
| positions_3d.append([0, 0, 0]) | |
| positions_3d = np.array(positions_3d) | |
| # 3D座標を正規化 | |
| max_dist_3d = np.max(np.sqrt(np.sum(positions_3d**2, axis=1))) | |
| if max_dist_3d > 0: | |
| positions_3d = positions_3d / max_dist_3d | |
| # 3D -> 2D 投影(上から見た図) | |
| pos_2d = positions_3d[:, :2] | |
| # 2D座標を正規化: 最大距離が0.85になるようにスケーリング | |
| max_dist_2d = np.max(np.sqrt(np.sum(pos_2d**2, axis=1))) | |
| if max_dist_2d > 0: | |
| pos_2d = pos_2d / max_dist_2d * 0.85 | |
| return pos_2d.astype(np.float32), positions_3d.astype(np.float32) | |
| except Exception as e: | |
| print(f"電極位置の抽出に失敗: {e}") | |
| return None, None | |
| def _load_eeglab_hdf5(set_path: str, fdt_path: Optional[str] = None, debug: bool = False): | |
| """ | |
| Load EEGLAB .set file saved in MATLAB v7.3 (HDF5) format using h5py. | |
| Returns: (x_tc, fs) where x_tc is (T, C) | |
| """ | |
| if h5py is None: | |
| raise RuntimeError("EEGLAB .set ファイルが MATLAB v7.3 (HDF5) 形式ですが、h5py がインストールされていません。pip install h5py を実行してください。") | |
| with h5py.File(set_path, "r") as f: | |
| # デバッグ: ファイル構造を表示 | |
| if debug: | |
| print("=== HDF5 file structure ===") | |
| def print_structure(name, obj): | |
| if isinstance(obj, h5py.Dataset): | |
| print(f"Dataset: {name}, shape: {obj.shape}, dtype: {obj.dtype}") | |
| elif isinstance(obj, h5py.Group): | |
| print(f"Group: {name}") | |
| f.visititems(print_structure) | |
| print("===========================") | |
| # サンプリングレートを取得 | |
| fs = None | |
| for path in ["EEG/srate", "srate"]: | |
| if path in f: | |
| srate_data = f[path] | |
| if isinstance(srate_data, h5py.Dataset): | |
| val = srate_data[()] | |
| # 配列の場合は最初の要素を取得 | |
| fs = float(val.flat[0]) if hasattr(val, 'flat') else float(val) | |
| break | |
| if fs is None: | |
| raise ValueError("サンプリングレート (srate) が見つかりません") | |
| # チャンネル数を取得 | |
| nbchan = None | |
| for path in ["EEG/nbchan", "nbchan"]: | |
| if path in f: | |
| nbchan_data = f[path] | |
| if isinstance(nbchan_data, h5py.Dataset): | |
| val = nbchan_data[()] | |
| nbchan = int(val.flat[0]) if hasattr(val, 'flat') else int(val) | |
| break | |
| # サンプル数を取得 | |
| pnts = None | |
| for path in ["EEG/pnts", "pnts"]: | |
| if path in f: | |
| pnts_data = f[path] | |
| if isinstance(pnts_data, h5py.Dataset): | |
| val = pnts_data[()] | |
| pnts = int(val.flat[0]) if hasattr(val, 'flat') else int(val) | |
| break | |
| if debug: | |
| print(f"nbchan: {nbchan}, pnts: {pnts}, fs: {fs}") | |
| # データを取得 - まず .set 内を確認 | |
| data = None | |
| data_shape = None | |
| if debug: | |
| print(f"Checking for data, fdt_path provided: {fdt_path is not None}") | |
| if fdt_path: | |
| print(f"fdt_path exists: {os.path.exists(fdt_path)}") | |
| # パターン1: EEG/data が参照配列の場合、各参照を辿る | |
| if "EEG" in f and "data" in f["EEG"]: | |
| data_ref = f["EEG"]["data"] | |
| if isinstance(data_ref, h5py.Dataset): | |
| if debug: | |
| print(f"EEG/data dtype: {data_ref.dtype}, shape: {data_ref.shape}, size: {data_ref.size}") | |
| if data_ref.dtype == h5py.ref_dtype: | |
| # 参照の場合 - 通常は .fdt ファイルを指す | |
| if debug: | |
| print("EEG/data is reference type - data should be in .fdt file") | |
| # .fdt ファイルが必要 | |
| if fdt_path is not None and os.path.exists(fdt_path): | |
| data = _load_fdt_file(fdt_path, nbchan, pnts, debug=debug) | |
| else: | |
| raise ValueError(".fdt ファイルが必要ですが見つかりません。.set と .fdt の両方をアップロードしてください。") | |
| elif data_ref.size > 100: # 参照配列ではなく実データ | |
| data = data_ref[()] | |
| data_shape = data.shape | |
| if debug: | |
| print(f"EEG/data contains actual data, shape: {data_shape}") | |
| else: | |
| # 小さい配列 = 参照リスト、.fdtファイルが必要 | |
| if debug: | |
| print(f"EEG/data is small array (size={data_ref.size}), assuming reference to .fdt") | |
| if fdt_path is not None and os.path.exists(fdt_path): | |
| data = _load_fdt_file(fdt_path, nbchan, pnts, debug=debug) | |
| else: | |
| raise ValueError(".fdt ファイルが必要ですが見つかりません。.set と .fdt の両方をアップロードしてください。") | |
| # パターン2: 直接 data | |
| if data is None and "data" in f: | |
| data_obj = f["data"] | |
| if isinstance(data_obj, h5py.Dataset): | |
| data = data_obj[()] | |
| data_shape = data.shape | |
| if data is None: | |
| raise ValueError("EEGデータが見つかりません。.fdt ファイルが必要な可能性があります。") | |
| if debug: | |
| print(f"Data shape: {data.shape if hasattr(data, 'shape') else 'loaded from fdt'}") | |
| # データの形状を調整 | |
| if data.ndim != 2: | |
| raise ValueError(f"予期しないデータ次元: {data.ndim}") | |
| dim0, dim1 = data.shape | |
| # nbchan情報があればそれを使う | |
| if nbchan is not None: | |
| if dim0 == nbchan: | |
| # (C, T) 形式 | |
| x_tc = data.T.astype(np.float32) | |
| elif dim1 == nbchan: | |
| # (T, C) 形式 | |
| x_tc = data.astype(np.float32) | |
| else: | |
| # nbchanと一致しない場合は小さい方をチャンネル数と仮定 | |
| if dim0 < dim1: | |
| x_tc = data.T.astype(np.float32) | |
| else: | |
| x_tc = data.astype(np.float32) | |
| else: | |
| # 一般的な判定: 小さい方がチャンネル数 | |
| if dim0 < dim1: | |
| x_tc = data.T.astype(np.float32) | |
| else: | |
| x_tc = data.astype(np.float32) | |
| if debug: | |
| print(f"Final shape (T, C): {x_tc.shape}") | |
| return x_tc, fs | |
| """ | |
| EEGLABファイルから電極位置(2D)を抽出。 | |
| Returns: | |
| pos: (C, 2) 電極の2D座標、取得できない場合はNone | |
| """ | |
| try: | |
| # MNEで読み込み | |
| raw = mne.io.read_raw_eeglab(set_path, preload=False, verbose=False) | |
| montage = raw.get_montage() | |
| if montage is None: | |
| return None | |
| # 3D座標を取得 | |
| pos_3d = montage.get_positions()['ch_pos'] | |
| if not pos_3d: | |
| return None | |
| # チャンネル名順に並べ替え | |
| ch_names = raw.ch_names | |
| positions = [] | |
| for ch_name in ch_names: | |
| if ch_name in pos_3d: | |
| positions.append(pos_3d[ch_name]) | |
| else: | |
| # 座標がないチャンネルは原点に配置 | |
| positions.append([0, 0, 0]) | |
| positions = np.array(positions) | |
| # 3D -> 2D 投影(上から見た図) | |
| # x, y座標を使用し、正規化 | |
| pos_2d = positions[:, :2] | |
| # 正規化: 最大距離が1になるようにスケーリング | |
| max_dist = np.max(np.sqrt(np.sum(pos_2d**2, axis=1))) | |
| if max_dist > 0: | |
| pos_2d = pos_2d / max_dist * 0.85 # 0.85倍で頭の輪郭内に収める | |
| return pos_2d.astype(np.float32) | |
| except Exception as e: | |
| print(f"電極位置の抽出に失敗: {e}") | |
| return None | |
| """ | |
| Load EEGLAB .set file saved in MATLAB v7.3 (HDF5) format using h5py. | |
| Returns: (x_tc, fs) where x_tc is (T, C) | |
| """ | |
| if h5py is None: | |
| raise RuntimeError("EEGLAB .set ファイルが MATLAB v7.3 (HDF5) 形式ですが、h5py がインストールされていません。pip install h5py を実行してください。") | |
| with h5py.File(set_path, "r") as f: | |
| # デバッグ: ファイル構造を表示 | |
| if debug: | |
| print("=== HDF5 file structure ===") | |
| def print_structure(name, obj): | |
| if isinstance(obj, h5py.Dataset): | |
| print(f"Dataset: {name}, shape: {obj.shape}, dtype: {obj.dtype}") | |
| elif isinstance(obj, h5py.Group): | |
| print(f"Group: {name}") | |
| f.visititems(print_structure) | |
| print("===========================") | |
| # サンプリングレートを取得 | |
| fs = None | |
| for path in ["EEG/srate", "srate"]: | |
| if path in f: | |
| srate_data = f[path] | |
| if isinstance(srate_data, h5py.Dataset): | |
| val = srate_data[()] | |
| # 配列の場合は最初の要素を取得 | |
| fs = float(val.flat[0]) if hasattr(val, 'flat') else float(val) | |
| break | |
| if fs is None: | |
| raise ValueError("サンプリングレート (srate) が見つかりません") | |
| # チャンネル数を取得 | |
| nbchan = None | |
| for path in ["EEG/nbchan", "nbchan"]: | |
| if path in f: | |
| nbchan_data = f[path] | |
| if isinstance(nbchan_data, h5py.Dataset): | |
| val = nbchan_data[()] | |
| nbchan = int(val.flat[0]) if hasattr(val, 'flat') else int(val) | |
| break | |
| # サンプル数を取得 | |
| pnts = None | |
| for path in ["EEG/pnts", "pnts"]: | |
| if path in f: | |
| pnts_data = f[path] | |
| if isinstance(pnts_data, h5py.Dataset): | |
| val = pnts_data[()] | |
| pnts = int(val.flat[0]) if hasattr(val, 'flat') else int(val) | |
| break | |
| if debug: | |
| print(f"nbchan: {nbchan}, pnts: {pnts}, fs: {fs}") | |
| # データを取得 - まず .set 内を確認 | |
| data = None | |
| data_shape = None | |
| if debug: | |
| print(f"Checking for data, fdt_path provided: {fdt_path is not None}") | |
| if fdt_path: | |
| print(f"fdt_path exists: {os.path.exists(fdt_path)}") | |
| # パターン1: EEG/data が参照配列の場合、各参照を辿る | |
| if "EEG" in f and "data" in f["EEG"]: | |
| data_ref = f["EEG"]["data"] | |
| if isinstance(data_ref, h5py.Dataset): | |
| if debug: | |
| print(f"EEG/data dtype: {data_ref.dtype}, shape: {data_ref.shape}, size: {data_ref.size}") | |
| if data_ref.dtype == h5py.ref_dtype: | |
| # 参照の場合 - 通常は .fdt ファイルを指す | |
| if debug: | |
| print("EEG/data is reference type - data should be in .fdt file") | |
| # .fdt ファイルが必要 | |
| if fdt_path is not None and os.path.exists(fdt_path): | |
| data = _load_fdt_file(fdt_path, nbchan, pnts, debug=debug) | |
| else: | |
| raise ValueError(".fdt ファイルが必要ですが見つかりません。.set と .fdt の両方をアップロードしてください。") | |
| elif data_ref.size > 100: # 参照配列ではなく実データ | |
| data = data_ref[()] | |
| data_shape = data.shape | |
| if debug: | |
| print(f"EEG/data contains actual data, shape: {data_shape}") | |
| else: | |
| # 小さい配列 = 参照リスト、.fdtファイルが必要 | |
| if debug: | |
| print(f"EEG/data is small array (size={data_ref.size}), assuming reference to .fdt") | |
| if fdt_path is not None and os.path.exists(fdt_path): | |
| data = _load_fdt_file(fdt_path, nbchan, pnts, debug=debug) | |
| else: | |
| raise ValueError(".fdt ファイルが必要ですが見つかりません。.set と .fdt の両方をアップロードしてください。") | |
| # パターン2: 直接 data | |
| if data is None and "data" in f: | |
| data_obj = f["data"] | |
| if isinstance(data_obj, h5py.Dataset): | |
| data = data_obj[()] | |
| data_shape = data.shape | |
| if data is None: | |
| raise ValueError("EEGデータが見つかりません。.fdt ファイルが必要な可能性があります。") | |
| if debug: | |
| print(f"Data shape: {data.shape if hasattr(data, 'shape') else 'loaded from fdt'}") | |
| # データの形状を調整 | |
| if data.ndim != 2: | |
| raise ValueError(f"予期しないデータ次元: {data.ndim}") | |
| dim0, dim1 = data.shape | |
| # nbchan情報があればそれを使う | |
| if nbchan is not None: | |
| if dim0 == nbchan: | |
| # (C, T) 形式 | |
| x_tc = data.T.astype(np.float32) | |
| elif dim1 == nbchan: | |
| # (T, C) 形式 | |
| x_tc = data.astype(np.float32) | |
| else: | |
| # nbchanと一致しない場合は小さい方をチャンネル数と仮定 | |
| if dim0 < dim1: | |
| x_tc = data.T.astype(np.float32) | |
| else: | |
| x_tc = data.astype(np.float32) | |
| else: | |
| # 一般的な判定: 小さい方がチャンネル数 | |
| if dim0 < dim1: | |
| x_tc = data.T.astype(np.float32) | |
| else: | |
| x_tc = data.astype(np.float32) | |
| if debug: | |
| print(f"Final shape (T, C): {x_tc.shape}") | |
| return x_tc, fs | |
| def _load_fdt_file(fdt_path: str, nbchan: Optional[int], pnts: Optional[int], debug: bool = False) -> np.ndarray: | |
| """ | |
| Load .fdt file (raw binary float32 data). | |
| EEGLAB .fdt files are stored as float32 in (C, T) order. | |
| """ | |
| if debug: | |
| print(f"Loading .fdt file: {fdt_path}") | |
| # .fdt ファイルは float32 のバイナリデータ | |
| data = np.fromfile(fdt_path, dtype=np.float32) | |
| if debug: | |
| print(f"Loaded {data.size} float32 values from .fdt") | |
| # チャンネル数とサンプル数がわかっている場合はリシェイプ | |
| if nbchan is not None and pnts is not None: | |
| expected_size = nbchan * pnts | |
| if data.size == expected_size: | |
| # EEGLAB は (C, T) 順で保存 | |
| data = data.reshape(nbchan, pnts) | |
| if debug: | |
| print(f"Reshaped to ({nbchan}, {pnts})") | |
| else: | |
| if debug: | |
| print(f"Warning: expected {expected_size} values but got {data.size}") | |
| # 可能な限りリシェイプを試みる | |
| if data.size % nbchan == 0: | |
| data = data.reshape(nbchan, -1) | |
| elif data.size % pnts == 0: | |
| data = data.reshape(-1, pnts) | |
| else: | |
| raise ValueError(f"Cannot reshape data of size {data.size} with nbchan={nbchan}, pnts={pnts}") | |
| else: | |
| raise ValueError("nbchan と pnts の情報が必要です") | |
| return data | |
| def load_eeglab_tc_from_bytes( | |
| set_bytes: bytes, | |
| set_name: str, | |
| fdt_bytes: Optional[bytes] = None, | |
| fdt_name: Optional[str] = None, | |
| ): | |
| """ | |
| Load EEGLAB .set (and optional .fdt) from bytes using MNE or h5py. | |
| Returns: | |
| tuple: (x_tc, fs, electrode_pos_2d, electrode_pos_3d) | |
| x_tc: (T, C) float32 | |
| fs: sampling rate (Hz) | |
| electrode_pos_2d: (C, 2) float32 or None - 電極の2D座標 | |
| electrode_pos_3d: (C, 3) float32 or None - 電極の3D座標 | |
| Notes: | |
| - 多くのEEGLABは .set が .fdt を参照するため、同じディレクトリに同名で置く必要があります。 | |
| - .set単体で完結している場合は fdt_* を省略可能にしています。 | |
| - MATLAB v7.3 (HDF5) 形式の .set にも対応しています。 | |
| """ | |
| if fdt_bytes is not None or fdt_name is not None: | |
| if fdt_bytes is None or fdt_name is None: | |
| raise ValueError("fdt_bytes と fdt_name は両方指定してください。") | |
| if not same_stem(set_name, fdt_name): | |
| raise ValueError(f".set と .fdt のファイル名(拡張子除く)が一致していません: {set_name} vs {fdt_name}") | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| set_path = os.path.join(tmpdir, os.path.basename(set_name)) | |
| with open(set_path, "wb") as f: | |
| f.write(set_bytes) | |
| fdt_path = None # 初期化 | |
| if fdt_bytes is not None and fdt_name is not None: | |
| fdt_path = os.path.join(tmpdir, os.path.basename(fdt_name)) | |
| with open(fdt_path, "wb") as f: | |
| f.write(fdt_bytes) | |
| # 1) Rawとして読む(通常のEEGLAB形式) | |
| try: | |
| raw = mne.io.read_raw_eeglab(set_path, preload=True, verbose=False) | |
| fs = float(raw.info["sfreq"]) | |
| x_tc = raw.get_data().T # (T,C) | |
| # 電極位置を取得 | |
| result = extract_electrode_positions_2d(set_path) | |
| if result is not None: | |
| electrode_pos_2d, electrode_pos_3d = result | |
| else: | |
| electrode_pos_2d, electrode_pos_3d = None, None | |
| return x_tc.astype(np.float32), fs, electrode_pos_2d, electrode_pos_3d | |
| except Exception as e_raw: | |
| # 2) Epochsとして読む(エポックデータ用) | |
| try: | |
| epochs = mne.io.read_epochs_eeglab(set_path, verbose=False, montage_units="cm") | |
| fs = float(epochs.info["sfreq"]) | |
| x = epochs.get_data(copy=True) # (n_epochs, n_channels, n_times) | |
| # ここは方針を選ぶ:平均 or 連結 | |
| x_mean = x.mean(axis=0) # (C,T) | |
| x_tc = x_mean.T # (T,C) | |
| # 電極位置を取得(epochsからも取得可能) | |
| result = extract_electrode_positions_2d(set_path) | |
| if result is not None: | |
| electrode_pos_2d, electrode_pos_3d = result | |
| else: | |
| electrode_pos_2d, electrode_pos_3d = None, None | |
| return x_tc.astype(np.float32), fs, electrode_pos_2d, electrode_pos_3d | |
| except Exception as e_ep: | |
| # 3) HDF5形式として読む(MATLAB v7.3) | |
| try: | |
| # デバッグモードを有効化(環境変数で制御可能) | |
| debug = os.environ.get("EEGLAB_DEBUG", "0") == "1" | |
| # Streamlit環境では常にデバッグ情報を表示 | |
| import sys | |
| if 'streamlit' in sys.modules: | |
| debug = True | |
| try: | |
| x_tc, fs = _load_eeglab_hdf5(set_path, fdt_path=fdt_path, debug=debug) | |
| except Exception as e_hdf5_inner: | |
| import traceback | |
| print("HDF5読み込みの詳細エラー:") | |
| print(traceback.format_exc()) | |
| raise e_hdf5_inner | |
| # HDF5の場合、電極位置をHDF5から直接取得を試みる | |
| electrode_pos_2d, electrode_pos_3d = extract_electrode_positions_from_hdf5(set_path) | |
| if debug and electrode_pos_2d is not None: | |
| print(f"HDF5から電極位置を取得しました: {electrode_pos_2d.shape}") | |
| return x_tc, fs, electrode_pos_2d, electrode_pos_3d | |
| except Exception as e_hdf5: | |
| import traceback | |
| # すべて失敗した場合 | |
| msg = ( | |
| "EEGLABの読み込みに失敗しました。\n" | |
| f"- read_raw_eeglab error: {e_raw}\n" | |
| f"- read_epochs_eeglab error: {e_ep}\n" | |
| f"- HDF5読み込み error: {e_hdf5}\n" | |
| f"\n詳細トレースバック:\n{traceback.format_exc()}" | |
| ) | |
| raise RuntimeError(msg) from e_hdf5 | |
| # ============================================================ | |
| # MAT loader (.mat) | |
| # ============================================================ | |
| def _mat_keys_loadmat(mat_dict: Dict[str, Any]) -> List[str]: | |
| return sorted([k for k in mat_dict.keys() if not k.startswith("__")]) | |
| def _try_get_numeric_arrays_loadmat(mat_dict: Dict[str, Any]) -> Dict[str, np.ndarray]: | |
| """ | |
| loadmatで読んだdictから、1D/2Dの数値ndarrayだけ抽出して返す。 | |
| 3次元配列も含める(エポックデータの可能性)。 | |
| """ | |
| out: Dict[str, np.ndarray] = {} | |
| for k in _mat_keys_loadmat(mat_dict): | |
| v = mat_dict[k] | |
| if isinstance(v, np.ndarray) and v.size > 0: | |
| # 数値型かどうかチェック | |
| if np.issubdtype(v.dtype, np.number): | |
| if v.ndim in (1, 2): | |
| out[k] = v | |
| elif v.ndim == 3: | |
| # 3次元配列の場合は (epochs, channels, time) の可能性 | |
| # 平均を取って2次元にする、または連結する | |
| out[k + "_mean"] = v.mean(axis=0) # (C, T) | |
| out[k + "_concat"] = v.reshape(-1, v.shape[-1]) # (epochs*C, T) | |
| return out | |
| def _load_mat_v72(bytes_data: bytes) -> Dict[str, Any]: | |
| # v7.2以前のMAT(一般的なMAT) | |
| return loadmat(io.BytesIO(bytes_data), squeeze_me=False, struct_as_record=False) | |
| def _load_mat_v73_candidates(bytes_data: bytes) -> Dict[str, np.ndarray]: | |
| """ | |
| v7.3(HDF5)のMATから、数値1D/2D/3D dataset を拾って返す。 | |
| keyは HDF5内のパスになります(例: 'group/data')。 | |
| 修正: h5pyの新しいバージョンに対応。BytesIOではなく一時ファイルを使用。 | |
| """ | |
| if h5py is None: | |
| raise RuntimeError("MAT v7.3(HDF5) 形式の可能性がありますが、h5py が入っていません。pip install h5py を実行してください。") | |
| out: Dict[str, np.ndarray] = {} | |
| # h5pyの新しいバージョンではBytesIOから直接開けない場合があるため、一時ファイルを使用 | |
| with tempfile.NamedTemporaryFile(suffix='.mat', delete=False) as tmp: | |
| tmp.write(bytes_data) | |
| tmp_path = tmp.name | |
| try: | |
| with h5py.File(tmp_path, "r") as f: | |
| def visitor(name, obj): | |
| if not isinstance(obj, h5py.Dataset): | |
| return | |
| try: | |
| arr = obj[()] | |
| except Exception: | |
| return | |
| # MATLABの文字列/参照等は除外して、数値だけ | |
| if isinstance(arr, np.ndarray) and arr.size > 0 and np.issubdtype(arr.dtype, np.number): | |
| if arr.ndim in (1, 2): | |
| out[name] = arr | |
| elif arr.ndim == 3: | |
| # 3次元配列も含める | |
| out[name + "_mean"] = arr.mean(axis=0) | |
| out[name + "_concat"] = arr.reshape(-1, arr.shape[-1]) | |
| f.visititems(lambda name, obj: visitor(name, obj)) | |
| finally: | |
| # 一時ファイルを削除 | |
| try: | |
| os.unlink(tmp_path) | |
| except Exception: | |
| pass | |
| return out | |
| def load_mat_candidates(bytes_data: bytes) -> Dict[str, np.ndarray]: | |
| """ | |
| Return dict: variable_name -> ndarray(1D/2D numeric) | |
| Tries v7.2 (scipy.io.loadmat). If it fails, tries v7.3 (h5py). | |
| """ | |
| try: | |
| md = _load_mat_v72(bytes_data) | |
| cands = _try_get_numeric_arrays_loadmat(md) | |
| return cands | |
| except Exception: | |
| return _load_mat_v73_candidates(bytes_data) |