stardust-coder's picture
[add] directed graph
e648c90
from __future__ import annotations
from typing import List, Tuple, Optional, Dict, Any
import io
import os
import tempfile
import numpy as np
import mne
from scipy.io import loadmat
try:
import h5py # MAT v7.3 (HDF5)
except Exception: # pragma: no cover
h5py = None
# ============================================================
# EEGLAB loader (.set + .fdt)
# ============================================================
def pick_set_fdt(files) -> Tuple[Optional[object], Optional[object]]:
"""
Streamlitの accept_multiple_files=True で受け取ったfilesから .set と .fdt を拾う。
Returns: (set_file, fdt_file)
"""
set_file = None
fdt_file = None
for f in files:
name = (getattr(f, "name", "") or "").lower()
if name.endswith(".set"):
set_file = f
elif name.endswith(".fdt"):
fdt_file = f
return set_file, fdt_file
def same_stem(a_name: str, b_name: str) -> bool:
"""Check if two filenames have the same stem (basename without extension)."""
a_stem = os.path.splitext(os.path.basename(a_name))[0]
b_stem = os.path.splitext(os.path.basename(b_name))[0]
return a_stem == b_stem
def extract_electrode_positions_from_hdf5(set_path: str) -> tuple:
"""
HDF5形式のEEGLABファイルから電極位置を抽出。
Returns:
tuple: (pos_2d, pos_3d)
pos_2d: (C, 2) 電極の2D座標、取得できない場合はNone
pos_3d: (C, 3) 電極の3D座標、取得できない場合はNone
"""
if h5py is None:
return None, None
try:
with h5py.File(set_path, "r") as f:
# EEGLABのchanlocs構造を探す
chanlocs_path = None
for path in ["EEG/chanlocs", "chanlocs"]:
if path in f:
chanlocs_path = path
break
if chanlocs_path is None:
return None, None
chanlocs = f[chanlocs_path]
# X, Y, Z座標を取得
xs, ys, zs = [], [], []
# パターン1: chanlocs/X, chanlocs/Y, chanlocs/Z が直接データの場合
if "X" in chanlocs and "Y" in chanlocs and "Z" in chanlocs:
x_data = chanlocs["X"][()]
y_data = chanlocs["Y"][()]
z_data = chanlocs["Z"][()]
# 参照型の場合は各参照を辿る
if x_data.dtype == h5py.ref_dtype:
for i in range(len(x_data)):
try:
x_val = f[x_data[i, 0]][()]
y_val = f[y_data[i, 0]][()]
z_val = f[z_data[i, 0]][()]
# スカラー値を取得
x_val = float(x_val.flat[0]) if hasattr(x_val, 'flat') else float(x_val)
y_val = float(y_val.flat[0]) if hasattr(y_val, 'flat') else float(y_val)
z_val = float(z_val.flat[0]) if hasattr(z_val, 'flat') else float(z_val)
xs.append(x_val)
ys.append(y_val)
zs.append(z_val)
except:
# 読み込めない座標は0に
xs.append(0.0)
ys.append(0.0)
zs.append(0.0)
else:
# 直接数値データの場合
xs = x_data.flatten().astype(float)
ys = y_data.flatten().astype(float)
zs = z_data.flatten().astype(float)
else:
return None, None
# リストをnumpy配列に変換
xs = np.array(xs, dtype=float)
ys = np.array(ys, dtype=float)
zs = np.array(zs, dtype=float)
if len(xs) == 0:
return None, None
# NaN値をチェック(数値型に変換後)
valid_mask = ~(np.isnan(xs) | np.isnan(ys) | np.isnan(zs))
if not np.any(valid_mask):
return None, None
# 無効な座標は平均値で置き換え
if not np.all(valid_mask):
xs[~valid_mask] = np.nanmean(xs)
ys[~valid_mask] = np.nanmean(ys)
zs[~valid_mask] = np.nanmean(zs)
# 3D座標を構築
positions_3d = np.column_stack([xs, ys, zs])
# 正規化
dists = np.sqrt(np.sum(positions_3d**2, axis=1))
max_dist_3d = np.max(dists[dists > 0]) if np.any(dists > 0) else 1.0
if max_dist_3d > 0:
positions_3d = positions_3d / max_dist_3d
# 2D投影
pos_2d = positions_3d[:, :2]
dists_2d = np.sqrt(np.sum(pos_2d**2, axis=1))
max_dist_2d = np.max(dists_2d[dists_2d > 0]) if np.any(dists_2d > 0) else 1.0
if max_dist_2d > 0:
pos_2d = pos_2d / max_dist_2d * 0.85
print(f"HDF5から電極位置を取得: {len(xs)} channels")
return pos_2d.astype(np.float32), positions_3d.astype(np.float32)
except Exception as e:
print(f"HDF5から電極位置の抽出に失敗: {e}")
import traceback
traceback.print_exc()
return None, None
def extract_electrode_positions_2d(set_path: str):
"""
EEGLABファイルから電極位置(2D, 3D)を抽出。
Returns:
tuple: (pos_2d, pos_3d)
pos_2d: (C, 2) 電極の2D座標、取得できない場合はNone
pos_3d: (C, 3) 電極の3D座標、取得できない場合はNone
"""
try:
# MNEで読み込み
raw = mne.io.read_raw_eeglab(set_path, preload=False, verbose=False)
montage = raw.get_montage()
if montage is None:
return None, None
# 3D座標を取得
pos_3d_dict = montage.get_positions()['ch_pos']
if not pos_3d_dict:
return None, None
# チャンネル名順に並べ替え
ch_names = raw.ch_names
positions_3d = []
for ch_name in ch_names:
if ch_name in pos_3d_dict:
positions_3d.append(pos_3d_dict[ch_name])
else:
# 座標がないチャンネルは原点に配置
positions_3d.append([0, 0, 0])
positions_3d = np.array(positions_3d)
# 3D座標を正規化
max_dist_3d = np.max(np.sqrt(np.sum(positions_3d**2, axis=1)))
if max_dist_3d > 0:
positions_3d = positions_3d / max_dist_3d
# 3D -> 2D 投影(上から見た図)
pos_2d = positions_3d[:, :2]
# 2D座標を正規化: 最大距離が0.85になるようにスケーリング
max_dist_2d = np.max(np.sqrt(np.sum(pos_2d**2, axis=1)))
if max_dist_2d > 0:
pos_2d = pos_2d / max_dist_2d * 0.85
return pos_2d.astype(np.float32), positions_3d.astype(np.float32)
except Exception as e:
print(f"電極位置の抽出に失敗: {e}")
return None, None
def _load_eeglab_hdf5(set_path: str, fdt_path: Optional[str] = None, debug: bool = False):
"""
Load EEGLAB .set file saved in MATLAB v7.3 (HDF5) format using h5py.
Returns: (x_tc, fs) where x_tc is (T, C)
"""
if h5py is None:
raise RuntimeError("EEGLAB .set ファイルが MATLAB v7.3 (HDF5) 形式ですが、h5py がインストールされていません。pip install h5py を実行してください。")
with h5py.File(set_path, "r") as f:
# デバッグ: ファイル構造を表示
if debug:
print("=== HDF5 file structure ===")
def print_structure(name, obj):
if isinstance(obj, h5py.Dataset):
print(f"Dataset: {name}, shape: {obj.shape}, dtype: {obj.dtype}")
elif isinstance(obj, h5py.Group):
print(f"Group: {name}")
f.visititems(print_structure)
print("===========================")
# サンプリングレートを取得
fs = None
for path in ["EEG/srate", "srate"]:
if path in f:
srate_data = f[path]
if isinstance(srate_data, h5py.Dataset):
val = srate_data[()]
# 配列の場合は最初の要素を取得
fs = float(val.flat[0]) if hasattr(val, 'flat') else float(val)
break
if fs is None:
raise ValueError("サンプリングレート (srate) が見つかりません")
# チャンネル数を取得
nbchan = None
for path in ["EEG/nbchan", "nbchan"]:
if path in f:
nbchan_data = f[path]
if isinstance(nbchan_data, h5py.Dataset):
val = nbchan_data[()]
nbchan = int(val.flat[0]) if hasattr(val, 'flat') else int(val)
break
# サンプル数を取得
pnts = None
for path in ["EEG/pnts", "pnts"]:
if path in f:
pnts_data = f[path]
if isinstance(pnts_data, h5py.Dataset):
val = pnts_data[()]
pnts = int(val.flat[0]) if hasattr(val, 'flat') else int(val)
break
if debug:
print(f"nbchan: {nbchan}, pnts: {pnts}, fs: {fs}")
# データを取得 - まず .set 内を確認
data = None
data_shape = None
if debug:
print(f"Checking for data, fdt_path provided: {fdt_path is not None}")
if fdt_path:
print(f"fdt_path exists: {os.path.exists(fdt_path)}")
# パターン1: EEG/data が参照配列の場合、各参照を辿る
if "EEG" in f and "data" in f["EEG"]:
data_ref = f["EEG"]["data"]
if isinstance(data_ref, h5py.Dataset):
if debug:
print(f"EEG/data dtype: {data_ref.dtype}, shape: {data_ref.shape}, size: {data_ref.size}")
if data_ref.dtype == h5py.ref_dtype:
# 参照の場合 - 通常は .fdt ファイルを指す
if debug:
print("EEG/data is reference type - data should be in .fdt file")
# .fdt ファイルが必要
if fdt_path is not None and os.path.exists(fdt_path):
data = _load_fdt_file(fdt_path, nbchan, pnts, debug=debug)
else:
raise ValueError(".fdt ファイルが必要ですが見つかりません。.set と .fdt の両方をアップロードしてください。")
elif data_ref.size > 100: # 参照配列ではなく実データ
data = data_ref[()]
data_shape = data.shape
if debug:
print(f"EEG/data contains actual data, shape: {data_shape}")
else:
# 小さい配列 = 参照リスト、.fdtファイルが必要
if debug:
print(f"EEG/data is small array (size={data_ref.size}), assuming reference to .fdt")
if fdt_path is not None and os.path.exists(fdt_path):
data = _load_fdt_file(fdt_path, nbchan, pnts, debug=debug)
else:
raise ValueError(".fdt ファイルが必要ですが見つかりません。.set と .fdt の両方をアップロードしてください。")
# パターン2: 直接 data
if data is None and "data" in f:
data_obj = f["data"]
if isinstance(data_obj, h5py.Dataset):
data = data_obj[()]
data_shape = data.shape
if data is None:
raise ValueError("EEGデータが見つかりません。.fdt ファイルが必要な可能性があります。")
if debug:
print(f"Data shape: {data.shape if hasattr(data, 'shape') else 'loaded from fdt'}")
# データの形状を調整
if data.ndim != 2:
raise ValueError(f"予期しないデータ次元: {data.ndim}")
dim0, dim1 = data.shape
# nbchan情報があればそれを使う
if nbchan is not None:
if dim0 == nbchan:
# (C, T) 形式
x_tc = data.T.astype(np.float32)
elif dim1 == nbchan:
# (T, C) 形式
x_tc = data.astype(np.float32)
else:
# nbchanと一致しない場合は小さい方をチャンネル数と仮定
if dim0 < dim1:
x_tc = data.T.astype(np.float32)
else:
x_tc = data.astype(np.float32)
else:
# 一般的な判定: 小さい方がチャンネル数
if dim0 < dim1:
x_tc = data.T.astype(np.float32)
else:
x_tc = data.astype(np.float32)
if debug:
print(f"Final shape (T, C): {x_tc.shape}")
return x_tc, fs
"""
EEGLABファイルから電極位置(2D)を抽出。
Returns:
pos: (C, 2) 電極の2D座標、取得できない場合はNone
"""
try:
# MNEで読み込み
raw = mne.io.read_raw_eeglab(set_path, preload=False, verbose=False)
montage = raw.get_montage()
if montage is None:
return None
# 3D座標を取得
pos_3d = montage.get_positions()['ch_pos']
if not pos_3d:
return None
# チャンネル名順に並べ替え
ch_names = raw.ch_names
positions = []
for ch_name in ch_names:
if ch_name in pos_3d:
positions.append(pos_3d[ch_name])
else:
# 座標がないチャンネルは原点に配置
positions.append([0, 0, 0])
positions = np.array(positions)
# 3D -> 2D 投影(上から見た図)
# x, y座標を使用し、正規化
pos_2d = positions[:, :2]
# 正規化: 最大距離が1になるようにスケーリング
max_dist = np.max(np.sqrt(np.sum(pos_2d**2, axis=1)))
if max_dist > 0:
pos_2d = pos_2d / max_dist * 0.85 # 0.85倍で頭の輪郭内に収める
return pos_2d.astype(np.float32)
except Exception as e:
print(f"電極位置の抽出に失敗: {e}")
return None
"""
Load EEGLAB .set file saved in MATLAB v7.3 (HDF5) format using h5py.
Returns: (x_tc, fs) where x_tc is (T, C)
"""
if h5py is None:
raise RuntimeError("EEGLAB .set ファイルが MATLAB v7.3 (HDF5) 形式ですが、h5py がインストールされていません。pip install h5py を実行してください。")
with h5py.File(set_path, "r") as f:
# デバッグ: ファイル構造を表示
if debug:
print("=== HDF5 file structure ===")
def print_structure(name, obj):
if isinstance(obj, h5py.Dataset):
print(f"Dataset: {name}, shape: {obj.shape}, dtype: {obj.dtype}")
elif isinstance(obj, h5py.Group):
print(f"Group: {name}")
f.visititems(print_structure)
print("===========================")
# サンプリングレートを取得
fs = None
for path in ["EEG/srate", "srate"]:
if path in f:
srate_data = f[path]
if isinstance(srate_data, h5py.Dataset):
val = srate_data[()]
# 配列の場合は最初の要素を取得
fs = float(val.flat[0]) if hasattr(val, 'flat') else float(val)
break
if fs is None:
raise ValueError("サンプリングレート (srate) が見つかりません")
# チャンネル数を取得
nbchan = None
for path in ["EEG/nbchan", "nbchan"]:
if path in f:
nbchan_data = f[path]
if isinstance(nbchan_data, h5py.Dataset):
val = nbchan_data[()]
nbchan = int(val.flat[0]) if hasattr(val, 'flat') else int(val)
break
# サンプル数を取得
pnts = None
for path in ["EEG/pnts", "pnts"]:
if path in f:
pnts_data = f[path]
if isinstance(pnts_data, h5py.Dataset):
val = pnts_data[()]
pnts = int(val.flat[0]) if hasattr(val, 'flat') else int(val)
break
if debug:
print(f"nbchan: {nbchan}, pnts: {pnts}, fs: {fs}")
# データを取得 - まず .set 内を確認
data = None
data_shape = None
if debug:
print(f"Checking for data, fdt_path provided: {fdt_path is not None}")
if fdt_path:
print(f"fdt_path exists: {os.path.exists(fdt_path)}")
# パターン1: EEG/data が参照配列の場合、各参照を辿る
if "EEG" in f and "data" in f["EEG"]:
data_ref = f["EEG"]["data"]
if isinstance(data_ref, h5py.Dataset):
if debug:
print(f"EEG/data dtype: {data_ref.dtype}, shape: {data_ref.shape}, size: {data_ref.size}")
if data_ref.dtype == h5py.ref_dtype:
# 参照の場合 - 通常は .fdt ファイルを指す
if debug:
print("EEG/data is reference type - data should be in .fdt file")
# .fdt ファイルが必要
if fdt_path is not None and os.path.exists(fdt_path):
data = _load_fdt_file(fdt_path, nbchan, pnts, debug=debug)
else:
raise ValueError(".fdt ファイルが必要ですが見つかりません。.set と .fdt の両方をアップロードしてください。")
elif data_ref.size > 100: # 参照配列ではなく実データ
data = data_ref[()]
data_shape = data.shape
if debug:
print(f"EEG/data contains actual data, shape: {data_shape}")
else:
# 小さい配列 = 参照リスト、.fdtファイルが必要
if debug:
print(f"EEG/data is small array (size={data_ref.size}), assuming reference to .fdt")
if fdt_path is not None and os.path.exists(fdt_path):
data = _load_fdt_file(fdt_path, nbchan, pnts, debug=debug)
else:
raise ValueError(".fdt ファイルが必要ですが見つかりません。.set と .fdt の両方をアップロードしてください。")
# パターン2: 直接 data
if data is None and "data" in f:
data_obj = f["data"]
if isinstance(data_obj, h5py.Dataset):
data = data_obj[()]
data_shape = data.shape
if data is None:
raise ValueError("EEGデータが見つかりません。.fdt ファイルが必要な可能性があります。")
if debug:
print(f"Data shape: {data.shape if hasattr(data, 'shape') else 'loaded from fdt'}")
# データの形状を調整
if data.ndim != 2:
raise ValueError(f"予期しないデータ次元: {data.ndim}")
dim0, dim1 = data.shape
# nbchan情報があればそれを使う
if nbchan is not None:
if dim0 == nbchan:
# (C, T) 形式
x_tc = data.T.astype(np.float32)
elif dim1 == nbchan:
# (T, C) 形式
x_tc = data.astype(np.float32)
else:
# nbchanと一致しない場合は小さい方をチャンネル数と仮定
if dim0 < dim1:
x_tc = data.T.astype(np.float32)
else:
x_tc = data.astype(np.float32)
else:
# 一般的な判定: 小さい方がチャンネル数
if dim0 < dim1:
x_tc = data.T.astype(np.float32)
else:
x_tc = data.astype(np.float32)
if debug:
print(f"Final shape (T, C): {x_tc.shape}")
return x_tc, fs
def _load_fdt_file(fdt_path: str, nbchan: Optional[int], pnts: Optional[int], debug: bool = False) -> np.ndarray:
"""
Load .fdt file (raw binary float32 data).
EEGLAB .fdt files are stored as float32 in (C, T) order.
"""
if debug:
print(f"Loading .fdt file: {fdt_path}")
# .fdt ファイルは float32 のバイナリデータ
data = np.fromfile(fdt_path, dtype=np.float32)
if debug:
print(f"Loaded {data.size} float32 values from .fdt")
# チャンネル数とサンプル数がわかっている場合はリシェイプ
if nbchan is not None and pnts is not None:
expected_size = nbchan * pnts
if data.size == expected_size:
# EEGLAB は (C, T) 順で保存
data = data.reshape(nbchan, pnts)
if debug:
print(f"Reshaped to ({nbchan}, {pnts})")
else:
if debug:
print(f"Warning: expected {expected_size} values but got {data.size}")
# 可能な限りリシェイプを試みる
if data.size % nbchan == 0:
data = data.reshape(nbchan, -1)
elif data.size % pnts == 0:
data = data.reshape(-1, pnts)
else:
raise ValueError(f"Cannot reshape data of size {data.size} with nbchan={nbchan}, pnts={pnts}")
else:
raise ValueError("nbchan と pnts の情報が必要です")
return data
def load_eeglab_tc_from_bytes(
set_bytes: bytes,
set_name: str,
fdt_bytes: Optional[bytes] = None,
fdt_name: Optional[str] = None,
):
"""
Load EEGLAB .set (and optional .fdt) from bytes using MNE or h5py.
Returns:
tuple: (x_tc, fs, electrode_pos_2d, electrode_pos_3d)
x_tc: (T, C) float32
fs: sampling rate (Hz)
electrode_pos_2d: (C, 2) float32 or None - 電極の2D座標
electrode_pos_3d: (C, 3) float32 or None - 電極の3D座標
Notes:
- 多くのEEGLABは .set が .fdt を参照するため、同じディレクトリに同名で置く必要があります。
- .set単体で完結している場合は fdt_* を省略可能にしています。
- MATLAB v7.3 (HDF5) 形式の .set にも対応しています。
"""
if fdt_bytes is not None or fdt_name is not None:
if fdt_bytes is None or fdt_name is None:
raise ValueError("fdt_bytes と fdt_name は両方指定してください。")
if not same_stem(set_name, fdt_name):
raise ValueError(f".set と .fdt のファイル名(拡張子除く)が一致していません: {set_name} vs {fdt_name}")
with tempfile.TemporaryDirectory() as tmpdir:
set_path = os.path.join(tmpdir, os.path.basename(set_name))
with open(set_path, "wb") as f:
f.write(set_bytes)
fdt_path = None # 初期化
if fdt_bytes is not None and fdt_name is not None:
fdt_path = os.path.join(tmpdir, os.path.basename(fdt_name))
with open(fdt_path, "wb") as f:
f.write(fdt_bytes)
# 1) Rawとして読む(通常のEEGLAB形式)
try:
raw = mne.io.read_raw_eeglab(set_path, preload=True, verbose=False)
fs = float(raw.info["sfreq"])
x_tc = raw.get_data().T # (T,C)
# 電極位置を取得
result = extract_electrode_positions_2d(set_path)
if result is not None:
electrode_pos_2d, electrode_pos_3d = result
else:
electrode_pos_2d, electrode_pos_3d = None, None
return x_tc.astype(np.float32), fs, electrode_pos_2d, electrode_pos_3d
except Exception as e_raw:
# 2) Epochsとして読む(エポックデータ用)
try:
epochs = mne.io.read_epochs_eeglab(set_path, verbose=False, montage_units="cm")
fs = float(epochs.info["sfreq"])
x = epochs.get_data(copy=True) # (n_epochs, n_channels, n_times)
# ここは方針を選ぶ:平均 or 連結
x_mean = x.mean(axis=0) # (C,T)
x_tc = x_mean.T # (T,C)
# 電極位置を取得(epochsからも取得可能)
result = extract_electrode_positions_2d(set_path)
if result is not None:
electrode_pos_2d, electrode_pos_3d = result
else:
electrode_pos_2d, electrode_pos_3d = None, None
return x_tc.astype(np.float32), fs, electrode_pos_2d, electrode_pos_3d
except Exception as e_ep:
# 3) HDF5形式として読む(MATLAB v7.3)
try:
# デバッグモードを有効化(環境変数で制御可能)
debug = os.environ.get("EEGLAB_DEBUG", "0") == "1"
# Streamlit環境では常にデバッグ情報を表示
import sys
if 'streamlit' in sys.modules:
debug = True
try:
x_tc, fs = _load_eeglab_hdf5(set_path, fdt_path=fdt_path, debug=debug)
except Exception as e_hdf5_inner:
import traceback
print("HDF5読み込みの詳細エラー:")
print(traceback.format_exc())
raise e_hdf5_inner
# HDF5の場合、電極位置をHDF5から直接取得を試みる
electrode_pos_2d, electrode_pos_3d = extract_electrode_positions_from_hdf5(set_path)
if debug and electrode_pos_2d is not None:
print(f"HDF5から電極位置を取得しました: {electrode_pos_2d.shape}")
return x_tc, fs, electrode_pos_2d, electrode_pos_3d
except Exception as e_hdf5:
import traceback
# すべて失敗した場合
msg = (
"EEGLABの読み込みに失敗しました。\n"
f"- read_raw_eeglab error: {e_raw}\n"
f"- read_epochs_eeglab error: {e_ep}\n"
f"- HDF5読み込み error: {e_hdf5}\n"
f"\n詳細トレースバック:\n{traceback.format_exc()}"
)
raise RuntimeError(msg) from e_hdf5
# ============================================================
# MAT loader (.mat)
# ============================================================
def _mat_keys_loadmat(mat_dict: Dict[str, Any]) -> List[str]:
return sorted([k for k in mat_dict.keys() if not k.startswith("__")])
def _try_get_numeric_arrays_loadmat(mat_dict: Dict[str, Any]) -> Dict[str, np.ndarray]:
"""
loadmatで読んだdictから、1D/2Dの数値ndarrayだけ抽出して返す。
3次元配列も含める(エポックデータの可能性)。
"""
out: Dict[str, np.ndarray] = {}
for k in _mat_keys_loadmat(mat_dict):
v = mat_dict[k]
if isinstance(v, np.ndarray) and v.size > 0:
# 数値型かどうかチェック
if np.issubdtype(v.dtype, np.number):
if v.ndim in (1, 2):
out[k] = v
elif v.ndim == 3:
# 3次元配列の場合は (epochs, channels, time) の可能性
# 平均を取って2次元にする、または連結する
out[k + "_mean"] = v.mean(axis=0) # (C, T)
out[k + "_concat"] = v.reshape(-1, v.shape[-1]) # (epochs*C, T)
return out
def _load_mat_v72(bytes_data: bytes) -> Dict[str, Any]:
# v7.2以前のMAT(一般的なMAT)
return loadmat(io.BytesIO(bytes_data), squeeze_me=False, struct_as_record=False)
def _load_mat_v73_candidates(bytes_data: bytes) -> Dict[str, np.ndarray]:
"""
v7.3(HDF5)のMATから、数値1D/2D/3D dataset を拾って返す。
keyは HDF5内のパスになります(例: 'group/data')。
修正: h5pyの新しいバージョンに対応。BytesIOではなく一時ファイルを使用。
"""
if h5py is None:
raise RuntimeError("MAT v7.3(HDF5) 形式の可能性がありますが、h5py が入っていません。pip install h5py を実行してください。")
out: Dict[str, np.ndarray] = {}
# h5pyの新しいバージョンではBytesIOから直接開けない場合があるため、一時ファイルを使用
with tempfile.NamedTemporaryFile(suffix='.mat', delete=False) as tmp:
tmp.write(bytes_data)
tmp_path = tmp.name
try:
with h5py.File(tmp_path, "r") as f:
def visitor(name, obj):
if not isinstance(obj, h5py.Dataset):
return
try:
arr = obj[()]
except Exception:
return
# MATLABの文字列/参照等は除外して、数値だけ
if isinstance(arr, np.ndarray) and arr.size > 0 and np.issubdtype(arr.dtype, np.number):
if arr.ndim in (1, 2):
out[name] = arr
elif arr.ndim == 3:
# 3次元配列も含める
out[name + "_mean"] = arr.mean(axis=0)
out[name + "_concat"] = arr.reshape(-1, arr.shape[-1])
f.visititems(lambda name, obj: visitor(name, obj))
finally:
# 一時ファイルを削除
try:
os.unlink(tmp_path)
except Exception:
pass
return out
def load_mat_candidates(bytes_data: bytes) -> Dict[str, np.ndarray]:
"""
Return dict: variable_name -> ndarray(1D/2D numeric)
Tries v7.2 (scipy.io.loadmat). If it fails, tries v7.3 (h5py).
"""
try:
md = _load_mat_v72(bytes_data)
cands = _try_get_numeric_arrays_loadmat(md)
return cands
except Exception:
return _load_mat_v73_candidates(bytes_data)