Spaces:
Sleeping
Sleeping
File size: 31,796 Bytes
b11ec91 e648c90 5ec4552 e648c90 5ec4552 e648c90 5ec4552 e648c90 5ec4552 e648c90 5ec4552 e648c90 5ec4552 e648c90 5ec4552 e648c90 5ec4552 e648c90 5ec4552 e648c90 5ec4552 e648c90 5ec4552 e648c90 5ec4552 e648c90 5ec4552 e648c90 5ec4552 e648c90 b11ec91 5ec4552 b11ec91 e648c90 b11ec91 e648c90 b11ec91 5ec4552 e648c90 5ec4552 e648c90 b11ec91 5ec4552 e648c90 5ec4552 e648c90 b11ec91 5ec4552 e648c90 5ec4552 e648c90 b11ec91 e648c90 b11ec91 e648c90 b11ec91 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 | from __future__ import annotations
from typing import List, Tuple, Optional, Dict, Any
import io
import os
import tempfile
import numpy as np
import mne
from scipy.io import loadmat
try:
import h5py # MAT v7.3 (HDF5)
except Exception: # pragma: no cover
h5py = None
# ============================================================
# EEGLAB loader (.set + .fdt)
# ============================================================
def pick_set_fdt(files) -> Tuple[Optional[object], Optional[object]]:
"""
Streamlitの accept_multiple_files=True で受け取ったfilesから .set と .fdt を拾う。
Returns: (set_file, fdt_file)
"""
set_file = None
fdt_file = None
for f in files:
name = (getattr(f, "name", "") or "").lower()
if name.endswith(".set"):
set_file = f
elif name.endswith(".fdt"):
fdt_file = f
return set_file, fdt_file
def same_stem(a_name: str, b_name: str) -> bool:
"""Check if two filenames have the same stem (basename without extension)."""
a_stem = os.path.splitext(os.path.basename(a_name))[0]
b_stem = os.path.splitext(os.path.basename(b_name))[0]
return a_stem == b_stem
def extract_electrode_positions_from_hdf5(set_path: str) -> tuple:
"""
HDF5形式のEEGLABファイルから電極位置を抽出。
Returns:
tuple: (pos_2d, pos_3d)
pos_2d: (C, 2) 電極の2D座標、取得できない場合はNone
pos_3d: (C, 3) 電極の3D座標、取得できない場合はNone
"""
if h5py is None:
return None, None
try:
with h5py.File(set_path, "r") as f:
# EEGLABのchanlocs構造を探す
chanlocs_path = None
for path in ["EEG/chanlocs", "chanlocs"]:
if path in f:
chanlocs_path = path
break
if chanlocs_path is None:
return None, None
chanlocs = f[chanlocs_path]
# X, Y, Z座標を取得
xs, ys, zs = [], [], []
# パターン1: chanlocs/X, chanlocs/Y, chanlocs/Z が直接データの場合
if "X" in chanlocs and "Y" in chanlocs and "Z" in chanlocs:
x_data = chanlocs["X"][()]
y_data = chanlocs["Y"][()]
z_data = chanlocs["Z"][()]
# 参照型の場合は各参照を辿る
if x_data.dtype == h5py.ref_dtype:
for i in range(len(x_data)):
try:
x_val = f[x_data[i, 0]][()]
y_val = f[y_data[i, 0]][()]
z_val = f[z_data[i, 0]][()]
# スカラー値を取得
x_val = float(x_val.flat[0]) if hasattr(x_val, 'flat') else float(x_val)
y_val = float(y_val.flat[0]) if hasattr(y_val, 'flat') else float(y_val)
z_val = float(z_val.flat[0]) if hasattr(z_val, 'flat') else float(z_val)
xs.append(x_val)
ys.append(y_val)
zs.append(z_val)
except:
# 読み込めない座標は0に
xs.append(0.0)
ys.append(0.0)
zs.append(0.0)
else:
# 直接数値データの場合
xs = x_data.flatten().astype(float)
ys = y_data.flatten().astype(float)
zs = z_data.flatten().astype(float)
else:
return None, None
# リストをnumpy配列に変換
xs = np.array(xs, dtype=float)
ys = np.array(ys, dtype=float)
zs = np.array(zs, dtype=float)
if len(xs) == 0:
return None, None
# NaN値をチェック(数値型に変換後)
valid_mask = ~(np.isnan(xs) | np.isnan(ys) | np.isnan(zs))
if not np.any(valid_mask):
return None, None
# 無効な座標は平均値で置き換え
if not np.all(valid_mask):
xs[~valid_mask] = np.nanmean(xs)
ys[~valid_mask] = np.nanmean(ys)
zs[~valid_mask] = np.nanmean(zs)
# 3D座標を構築
positions_3d = np.column_stack([xs, ys, zs])
# 正規化
dists = np.sqrt(np.sum(positions_3d**2, axis=1))
max_dist_3d = np.max(dists[dists > 0]) if np.any(dists > 0) else 1.0
if max_dist_3d > 0:
positions_3d = positions_3d / max_dist_3d
# 2D投影
pos_2d = positions_3d[:, :2]
dists_2d = np.sqrt(np.sum(pos_2d**2, axis=1))
max_dist_2d = np.max(dists_2d[dists_2d > 0]) if np.any(dists_2d > 0) else 1.0
if max_dist_2d > 0:
pos_2d = pos_2d / max_dist_2d * 0.85
print(f"HDF5から電極位置を取得: {len(xs)} channels")
return pos_2d.astype(np.float32), positions_3d.astype(np.float32)
except Exception as e:
print(f"HDF5から電極位置の抽出に失敗: {e}")
import traceback
traceback.print_exc()
return None, None
def extract_electrode_positions_2d(set_path: str):
"""
EEGLABファイルから電極位置(2D, 3D)を抽出。
Returns:
tuple: (pos_2d, pos_3d)
pos_2d: (C, 2) 電極の2D座標、取得できない場合はNone
pos_3d: (C, 3) 電極の3D座標、取得できない場合はNone
"""
try:
# MNEで読み込み
raw = mne.io.read_raw_eeglab(set_path, preload=False, verbose=False)
montage = raw.get_montage()
if montage is None:
return None, None
# 3D座標を取得
pos_3d_dict = montage.get_positions()['ch_pos']
if not pos_3d_dict:
return None, None
# チャンネル名順に並べ替え
ch_names = raw.ch_names
positions_3d = []
for ch_name in ch_names:
if ch_name in pos_3d_dict:
positions_3d.append(pos_3d_dict[ch_name])
else:
# 座標がないチャンネルは原点に配置
positions_3d.append([0, 0, 0])
positions_3d = np.array(positions_3d)
# 3D座標を正規化
max_dist_3d = np.max(np.sqrt(np.sum(positions_3d**2, axis=1)))
if max_dist_3d > 0:
positions_3d = positions_3d / max_dist_3d
# 3D -> 2D 投影(上から見た図)
pos_2d = positions_3d[:, :2]
# 2D座標を正規化: 最大距離が0.85になるようにスケーリング
max_dist_2d = np.max(np.sqrt(np.sum(pos_2d**2, axis=1)))
if max_dist_2d > 0:
pos_2d = pos_2d / max_dist_2d * 0.85
return pos_2d.astype(np.float32), positions_3d.astype(np.float32)
except Exception as e:
print(f"電極位置の抽出に失敗: {e}")
return None, None
def _load_eeglab_hdf5(set_path: str, fdt_path: Optional[str] = None, debug: bool = False):
"""
Load EEGLAB .set file saved in MATLAB v7.3 (HDF5) format using h5py.
Returns: (x_tc, fs) where x_tc is (T, C)
"""
if h5py is None:
raise RuntimeError("EEGLAB .set ファイルが MATLAB v7.3 (HDF5) 形式ですが、h5py がインストールされていません。pip install h5py を実行してください。")
with h5py.File(set_path, "r") as f:
# デバッグ: ファイル構造を表示
if debug:
print("=== HDF5 file structure ===")
def print_structure(name, obj):
if isinstance(obj, h5py.Dataset):
print(f"Dataset: {name}, shape: {obj.shape}, dtype: {obj.dtype}")
elif isinstance(obj, h5py.Group):
print(f"Group: {name}")
f.visititems(print_structure)
print("===========================")
# サンプリングレートを取得
fs = None
for path in ["EEG/srate", "srate"]:
if path in f:
srate_data = f[path]
if isinstance(srate_data, h5py.Dataset):
val = srate_data[()]
# 配列の場合は最初の要素を取得
fs = float(val.flat[0]) if hasattr(val, 'flat') else float(val)
break
if fs is None:
raise ValueError("サンプリングレート (srate) が見つかりません")
# チャンネル数を取得
nbchan = None
for path in ["EEG/nbchan", "nbchan"]:
if path in f:
nbchan_data = f[path]
if isinstance(nbchan_data, h5py.Dataset):
val = nbchan_data[()]
nbchan = int(val.flat[0]) if hasattr(val, 'flat') else int(val)
break
# サンプル数を取得
pnts = None
for path in ["EEG/pnts", "pnts"]:
if path in f:
pnts_data = f[path]
if isinstance(pnts_data, h5py.Dataset):
val = pnts_data[()]
pnts = int(val.flat[0]) if hasattr(val, 'flat') else int(val)
break
if debug:
print(f"nbchan: {nbchan}, pnts: {pnts}, fs: {fs}")
# データを取得 - まず .set 内を確認
data = None
data_shape = None
if debug:
print(f"Checking for data, fdt_path provided: {fdt_path is not None}")
if fdt_path:
print(f"fdt_path exists: {os.path.exists(fdt_path)}")
# パターン1: EEG/data が参照配列の場合、各参照を辿る
if "EEG" in f and "data" in f["EEG"]:
data_ref = f["EEG"]["data"]
if isinstance(data_ref, h5py.Dataset):
if debug:
print(f"EEG/data dtype: {data_ref.dtype}, shape: {data_ref.shape}, size: {data_ref.size}")
if data_ref.dtype == h5py.ref_dtype:
# 参照の場合 - 通常は .fdt ファイルを指す
if debug:
print("EEG/data is reference type - data should be in .fdt file")
# .fdt ファイルが必要
if fdt_path is not None and os.path.exists(fdt_path):
data = _load_fdt_file(fdt_path, nbchan, pnts, debug=debug)
else:
raise ValueError(".fdt ファイルが必要ですが見つかりません。.set と .fdt の両方をアップロードしてください。")
elif data_ref.size > 100: # 参照配列ではなく実データ
data = data_ref[()]
data_shape = data.shape
if debug:
print(f"EEG/data contains actual data, shape: {data_shape}")
else:
# 小さい配列 = 参照リスト、.fdtファイルが必要
if debug:
print(f"EEG/data is small array (size={data_ref.size}), assuming reference to .fdt")
if fdt_path is not None and os.path.exists(fdt_path):
data = _load_fdt_file(fdt_path, nbchan, pnts, debug=debug)
else:
raise ValueError(".fdt ファイルが必要ですが見つかりません。.set と .fdt の両方をアップロードしてください。")
# パターン2: 直接 data
if data is None and "data" in f:
data_obj = f["data"]
if isinstance(data_obj, h5py.Dataset):
data = data_obj[()]
data_shape = data.shape
if data is None:
raise ValueError("EEGデータが見つかりません。.fdt ファイルが必要な可能性があります。")
if debug:
print(f"Data shape: {data.shape if hasattr(data, 'shape') else 'loaded from fdt'}")
# データの形状を調整
if data.ndim != 2:
raise ValueError(f"予期しないデータ次元: {data.ndim}")
dim0, dim1 = data.shape
# nbchan情報があればそれを使う
if nbchan is not None:
if dim0 == nbchan:
# (C, T) 形式
x_tc = data.T.astype(np.float32)
elif dim1 == nbchan:
# (T, C) 形式
x_tc = data.astype(np.float32)
else:
# nbchanと一致しない場合は小さい方をチャンネル数と仮定
if dim0 < dim1:
x_tc = data.T.astype(np.float32)
else:
x_tc = data.astype(np.float32)
else:
# 一般的な判定: 小さい方がチャンネル数
if dim0 < dim1:
x_tc = data.T.astype(np.float32)
else:
x_tc = data.astype(np.float32)
if debug:
print(f"Final shape (T, C): {x_tc.shape}")
return x_tc, fs
"""
EEGLABファイルから電極位置(2D)を抽出。
Returns:
pos: (C, 2) 電極の2D座標、取得できない場合はNone
"""
try:
# MNEで読み込み
raw = mne.io.read_raw_eeglab(set_path, preload=False, verbose=False)
montage = raw.get_montage()
if montage is None:
return None
# 3D座標を取得
pos_3d = montage.get_positions()['ch_pos']
if not pos_3d:
return None
# チャンネル名順に並べ替え
ch_names = raw.ch_names
positions = []
for ch_name in ch_names:
if ch_name in pos_3d:
positions.append(pos_3d[ch_name])
else:
# 座標がないチャンネルは原点に配置
positions.append([0, 0, 0])
positions = np.array(positions)
# 3D -> 2D 投影(上から見た図)
# x, y座標を使用し、正規化
pos_2d = positions[:, :2]
# 正規化: 最大距離が1になるようにスケーリング
max_dist = np.max(np.sqrt(np.sum(pos_2d**2, axis=1)))
if max_dist > 0:
pos_2d = pos_2d / max_dist * 0.85 # 0.85倍で頭の輪郭内に収める
return pos_2d.astype(np.float32)
except Exception as e:
print(f"電極位置の抽出に失敗: {e}")
return None
"""
Load EEGLAB .set file saved in MATLAB v7.3 (HDF5) format using h5py.
Returns: (x_tc, fs) where x_tc is (T, C)
"""
if h5py is None:
raise RuntimeError("EEGLAB .set ファイルが MATLAB v7.3 (HDF5) 形式ですが、h5py がインストールされていません。pip install h5py を実行してください。")
with h5py.File(set_path, "r") as f:
# デバッグ: ファイル構造を表示
if debug:
print("=== HDF5 file structure ===")
def print_structure(name, obj):
if isinstance(obj, h5py.Dataset):
print(f"Dataset: {name}, shape: {obj.shape}, dtype: {obj.dtype}")
elif isinstance(obj, h5py.Group):
print(f"Group: {name}")
f.visititems(print_structure)
print("===========================")
# サンプリングレートを取得
fs = None
for path in ["EEG/srate", "srate"]:
if path in f:
srate_data = f[path]
if isinstance(srate_data, h5py.Dataset):
val = srate_data[()]
# 配列の場合は最初の要素を取得
fs = float(val.flat[0]) if hasattr(val, 'flat') else float(val)
break
if fs is None:
raise ValueError("サンプリングレート (srate) が見つかりません")
# チャンネル数を取得
nbchan = None
for path in ["EEG/nbchan", "nbchan"]:
if path in f:
nbchan_data = f[path]
if isinstance(nbchan_data, h5py.Dataset):
val = nbchan_data[()]
nbchan = int(val.flat[0]) if hasattr(val, 'flat') else int(val)
break
# サンプル数を取得
pnts = None
for path in ["EEG/pnts", "pnts"]:
if path in f:
pnts_data = f[path]
if isinstance(pnts_data, h5py.Dataset):
val = pnts_data[()]
pnts = int(val.flat[0]) if hasattr(val, 'flat') else int(val)
break
if debug:
print(f"nbchan: {nbchan}, pnts: {pnts}, fs: {fs}")
# データを取得 - まず .set 内を確認
data = None
data_shape = None
if debug:
print(f"Checking for data, fdt_path provided: {fdt_path is not None}")
if fdt_path:
print(f"fdt_path exists: {os.path.exists(fdt_path)}")
# パターン1: EEG/data が参照配列の場合、各参照を辿る
if "EEG" in f and "data" in f["EEG"]:
data_ref = f["EEG"]["data"]
if isinstance(data_ref, h5py.Dataset):
if debug:
print(f"EEG/data dtype: {data_ref.dtype}, shape: {data_ref.shape}, size: {data_ref.size}")
if data_ref.dtype == h5py.ref_dtype:
# 参照の場合 - 通常は .fdt ファイルを指す
if debug:
print("EEG/data is reference type - data should be in .fdt file")
# .fdt ファイルが必要
if fdt_path is not None and os.path.exists(fdt_path):
data = _load_fdt_file(fdt_path, nbchan, pnts, debug=debug)
else:
raise ValueError(".fdt ファイルが必要ですが見つかりません。.set と .fdt の両方をアップロードしてください。")
elif data_ref.size > 100: # 参照配列ではなく実データ
data = data_ref[()]
data_shape = data.shape
if debug:
print(f"EEG/data contains actual data, shape: {data_shape}")
else:
# 小さい配列 = 参照リスト、.fdtファイルが必要
if debug:
print(f"EEG/data is small array (size={data_ref.size}), assuming reference to .fdt")
if fdt_path is not None and os.path.exists(fdt_path):
data = _load_fdt_file(fdt_path, nbchan, pnts, debug=debug)
else:
raise ValueError(".fdt ファイルが必要ですが見つかりません。.set と .fdt の両方をアップロードしてください。")
# パターン2: 直接 data
if data is None and "data" in f:
data_obj = f["data"]
if isinstance(data_obj, h5py.Dataset):
data = data_obj[()]
data_shape = data.shape
if data is None:
raise ValueError("EEGデータが見つかりません。.fdt ファイルが必要な可能性があります。")
if debug:
print(f"Data shape: {data.shape if hasattr(data, 'shape') else 'loaded from fdt'}")
# データの形状を調整
if data.ndim != 2:
raise ValueError(f"予期しないデータ次元: {data.ndim}")
dim0, dim1 = data.shape
# nbchan情報があればそれを使う
if nbchan is not None:
if dim0 == nbchan:
# (C, T) 形式
x_tc = data.T.astype(np.float32)
elif dim1 == nbchan:
# (T, C) 形式
x_tc = data.astype(np.float32)
else:
# nbchanと一致しない場合は小さい方をチャンネル数と仮定
if dim0 < dim1:
x_tc = data.T.astype(np.float32)
else:
x_tc = data.astype(np.float32)
else:
# 一般的な判定: 小さい方がチャンネル数
if dim0 < dim1:
x_tc = data.T.astype(np.float32)
else:
x_tc = data.astype(np.float32)
if debug:
print(f"Final shape (T, C): {x_tc.shape}")
return x_tc, fs
def _load_fdt_file(fdt_path: str, nbchan: Optional[int], pnts: Optional[int], debug: bool = False) -> np.ndarray:
"""
Load .fdt file (raw binary float32 data).
EEGLAB .fdt files are stored as float32 in (C, T) order.
"""
if debug:
print(f"Loading .fdt file: {fdt_path}")
# .fdt ファイルは float32 のバイナリデータ
data = np.fromfile(fdt_path, dtype=np.float32)
if debug:
print(f"Loaded {data.size} float32 values from .fdt")
# チャンネル数とサンプル数がわかっている場合はリシェイプ
if nbchan is not None and pnts is not None:
expected_size = nbchan * pnts
if data.size == expected_size:
# EEGLAB は (C, T) 順で保存
data = data.reshape(nbchan, pnts)
if debug:
print(f"Reshaped to ({nbchan}, {pnts})")
else:
if debug:
print(f"Warning: expected {expected_size} values but got {data.size}")
# 可能な限りリシェイプを試みる
if data.size % nbchan == 0:
data = data.reshape(nbchan, -1)
elif data.size % pnts == 0:
data = data.reshape(-1, pnts)
else:
raise ValueError(f"Cannot reshape data of size {data.size} with nbchan={nbchan}, pnts={pnts}")
else:
raise ValueError("nbchan と pnts の情報が必要です")
return data
def load_eeglab_tc_from_bytes(
set_bytes: bytes,
set_name: str,
fdt_bytes: Optional[bytes] = None,
fdt_name: Optional[str] = None,
):
"""
Load EEGLAB .set (and optional .fdt) from bytes using MNE or h5py.
Returns:
tuple: (x_tc, fs, electrode_pos_2d, electrode_pos_3d)
x_tc: (T, C) float32
fs: sampling rate (Hz)
electrode_pos_2d: (C, 2) float32 or None - 電極の2D座標
electrode_pos_3d: (C, 3) float32 or None - 電極の3D座標
Notes:
- 多くのEEGLABは .set が .fdt を参照するため、同じディレクトリに同名で置く必要があります。
- .set単体で完結している場合は fdt_* を省略可能にしています。
- MATLAB v7.3 (HDF5) 形式の .set にも対応しています。
"""
if fdt_bytes is not None or fdt_name is not None:
if fdt_bytes is None or fdt_name is None:
raise ValueError("fdt_bytes と fdt_name は両方指定してください。")
if not same_stem(set_name, fdt_name):
raise ValueError(f".set と .fdt のファイル名(拡張子除く)が一致していません: {set_name} vs {fdt_name}")
with tempfile.TemporaryDirectory() as tmpdir:
set_path = os.path.join(tmpdir, os.path.basename(set_name))
with open(set_path, "wb") as f:
f.write(set_bytes)
fdt_path = None # 初期化
if fdt_bytes is not None and fdt_name is not None:
fdt_path = os.path.join(tmpdir, os.path.basename(fdt_name))
with open(fdt_path, "wb") as f:
f.write(fdt_bytes)
# 1) Rawとして読む(通常のEEGLAB形式)
try:
raw = mne.io.read_raw_eeglab(set_path, preload=True, verbose=False)
fs = float(raw.info["sfreq"])
x_tc = raw.get_data().T # (T,C)
# 電極位置を取得
result = extract_electrode_positions_2d(set_path)
if result is not None:
electrode_pos_2d, electrode_pos_3d = result
else:
electrode_pos_2d, electrode_pos_3d = None, None
return x_tc.astype(np.float32), fs, electrode_pos_2d, electrode_pos_3d
except Exception as e_raw:
# 2) Epochsとして読む(エポックデータ用)
try:
epochs = mne.io.read_epochs_eeglab(set_path, verbose=False, montage_units="cm")
fs = float(epochs.info["sfreq"])
x = epochs.get_data(copy=True) # (n_epochs, n_channels, n_times)
# ここは方針を選ぶ:平均 or 連結
x_mean = x.mean(axis=0) # (C,T)
x_tc = x_mean.T # (T,C)
# 電極位置を取得(epochsからも取得可能)
result = extract_electrode_positions_2d(set_path)
if result is not None:
electrode_pos_2d, electrode_pos_3d = result
else:
electrode_pos_2d, electrode_pos_3d = None, None
return x_tc.astype(np.float32), fs, electrode_pos_2d, electrode_pos_3d
except Exception as e_ep:
# 3) HDF5形式として読む(MATLAB v7.3)
try:
# デバッグモードを有効化(環境変数で制御可能)
debug = os.environ.get("EEGLAB_DEBUG", "0") == "1"
# Streamlit環境では常にデバッグ情報を表示
import sys
if 'streamlit' in sys.modules:
debug = True
try:
x_tc, fs = _load_eeglab_hdf5(set_path, fdt_path=fdt_path, debug=debug)
except Exception as e_hdf5_inner:
import traceback
print("HDF5読み込みの詳細エラー:")
print(traceback.format_exc())
raise e_hdf5_inner
# HDF5の場合、電極位置をHDF5から直接取得を試みる
electrode_pos_2d, electrode_pos_3d = extract_electrode_positions_from_hdf5(set_path)
if debug and electrode_pos_2d is not None:
print(f"HDF5から電極位置を取得しました: {electrode_pos_2d.shape}")
return x_tc, fs, electrode_pos_2d, electrode_pos_3d
except Exception as e_hdf5:
import traceback
# すべて失敗した場合
msg = (
"EEGLABの読み込みに失敗しました。\n"
f"- read_raw_eeglab error: {e_raw}\n"
f"- read_epochs_eeglab error: {e_ep}\n"
f"- HDF5読み込み error: {e_hdf5}\n"
f"\n詳細トレースバック:\n{traceback.format_exc()}"
)
raise RuntimeError(msg) from e_hdf5
# ============================================================
# MAT loader (.mat)
# ============================================================
def _mat_keys_loadmat(mat_dict: Dict[str, Any]) -> List[str]:
return sorted([k for k in mat_dict.keys() if not k.startswith("__")])
def _try_get_numeric_arrays_loadmat(mat_dict: Dict[str, Any]) -> Dict[str, np.ndarray]:
"""
loadmatで読んだdictから、1D/2Dの数値ndarrayだけ抽出して返す。
3次元配列も含める(エポックデータの可能性)。
"""
out: Dict[str, np.ndarray] = {}
for k in _mat_keys_loadmat(mat_dict):
v = mat_dict[k]
if isinstance(v, np.ndarray) and v.size > 0:
# 数値型かどうかチェック
if np.issubdtype(v.dtype, np.number):
if v.ndim in (1, 2):
out[k] = v
elif v.ndim == 3:
# 3次元配列の場合は (epochs, channels, time) の可能性
# 平均を取って2次元にする、または連結する
out[k + "_mean"] = v.mean(axis=0) # (C, T)
out[k + "_concat"] = v.reshape(-1, v.shape[-1]) # (epochs*C, T)
return out
def _load_mat_v72(bytes_data: bytes) -> Dict[str, Any]:
# v7.2以前のMAT(一般的なMAT)
return loadmat(io.BytesIO(bytes_data), squeeze_me=False, struct_as_record=False)
def _load_mat_v73_candidates(bytes_data: bytes) -> Dict[str, np.ndarray]:
"""
v7.3(HDF5)のMATから、数値1D/2D/3D dataset を拾って返す。
keyは HDF5内のパスになります(例: 'group/data')。
修正: h5pyの新しいバージョンに対応。BytesIOではなく一時ファイルを使用。
"""
if h5py is None:
raise RuntimeError("MAT v7.3(HDF5) 形式の可能性がありますが、h5py が入っていません。pip install h5py を実行してください。")
out: Dict[str, np.ndarray] = {}
# h5pyの新しいバージョンではBytesIOから直接開けない場合があるため、一時ファイルを使用
with tempfile.NamedTemporaryFile(suffix='.mat', delete=False) as tmp:
tmp.write(bytes_data)
tmp_path = tmp.name
try:
with h5py.File(tmp_path, "r") as f:
def visitor(name, obj):
if not isinstance(obj, h5py.Dataset):
return
try:
arr = obj[()]
except Exception:
return
# MATLABの文字列/参照等は除外して、数値だけ
if isinstance(arr, np.ndarray) and arr.size > 0 and np.issubdtype(arr.dtype, np.number):
if arr.ndim in (1, 2):
out[name] = arr
elif arr.ndim == 3:
# 3次元配列も含める
out[name + "_mean"] = arr.mean(axis=0)
out[name + "_concat"] = arr.reshape(-1, arr.shape[-1])
f.visititems(lambda name, obj: visitor(name, obj))
finally:
# 一時ファイルを削除
try:
os.unlink(tmp_path)
except Exception:
pass
return out
def load_mat_candidates(bytes_data: bytes) -> Dict[str, np.ndarray]:
"""
Return dict: variable_name -> ndarray(1D/2D numeric)
Tries v7.2 (scipy.io.loadmat). If it fails, tries v7.3 (h5py).
"""
try:
md = _load_mat_v72(bytes_data)
cands = _try_get_numeric_arrays_loadmat(md)
return cands
except Exception:
return _load_mat_v73_candidates(bytes_data) |