| | import multiprocessing |
| | import operator |
| | import pickle |
| | import traceback |
| | from pathlib import Path |
| |
|
| | import samplelib.PackedFaceset |
| | from core import pathex |
| | from core.mplib import MPSharedList |
| | from core.interact import interact as io |
| | from core.joblib import Subprocessor |
| | from DFLIMG import * |
| | from facelib import FaceType, LandmarksProcessor |
| |
|
| | from .Sample import Sample, SampleType |
| |
|
| |
|
| | class SampleLoader: |
| | samples_cache = dict() |
| | @staticmethod |
| | def get_person_id_max_count(samples_path): |
| | samples = None |
| | try: |
| | samples = samplelib.PackedFaceset.load(samples_path) |
| | except: |
| | io.log_err(f"Error occured while loading samplelib.PackedFaceset.load {str(samples_path)}, {traceback.format_exc()}") |
| |
|
| | if samples is None: |
| | raise ValueError("packed faceset not found.") |
| | persons_name_idxs = {} |
| | for sample in samples: |
| | persons_name_idxs[sample.person_name] = 0 |
| | return len(list(persons_name_idxs.keys())) |
| |
|
| | @staticmethod |
| | def load(sample_type, samples_path, subdirs=False): |
| | """ |
| | Return MPSharedList of samples |
| | """ |
| | samples_cache = SampleLoader.samples_cache |
| |
|
| | if str(samples_path) not in samples_cache.keys(): |
| | samples_cache[str(samples_path)] = [None]*SampleType.QTY |
| |
|
| | samples = samples_cache[str(samples_path)] |
| |
|
| | if sample_type == SampleType.IMAGE: |
| | if samples[sample_type] is None: |
| | samples[sample_type] = [ Sample(filename=filename) for filename in io.progress_bar_generator( pathex.get_image_paths(samples_path, subdirs=subdirs), "Loading") ] |
| |
|
| | elif sample_type == SampleType.FACE: |
| | if samples[sample_type] is None: |
| | try: |
| | result = samplelib.PackedFaceset.load(samples_path) |
| | except: |
| | io.log_err(f"Error occured while loading samplelib.PackedFaceset.load {str(samples_dat_path)}, {traceback.format_exc()}") |
| |
|
| | if result is not None: |
| | io.log_info (f"Loaded {len(result)} packed faces from {samples_path}") |
| |
|
| | if result is None: |
| | result = SampleLoader.load_face_samples( pathex.get_image_paths(samples_path, subdirs=subdirs) ) |
| |
|
| | samples[sample_type] = MPSharedList(result) |
| | elif sample_type == SampleType.FACE_TEMPORAL_SORTED: |
| | result = SampleLoader.load (SampleType.FACE, samples_path) |
| | result = SampleLoader.upgradeToFaceTemporalSortedSamples(result) |
| | samples[sample_type] = MPSharedList(result) |
| |
|
| | return samples[sample_type] |
| |
|
| | @staticmethod |
| | def load_face_samples ( image_paths): |
| | result = FaceSamplesLoaderSubprocessor(image_paths).run() |
| | sample_list = [] |
| |
|
| | for filename, data in result: |
| | if data is None: |
| | continue |
| | ( face_type, |
| | shape, |
| | landmarks, |
| | seg_ie_polys, |
| | xseg_mask_compressed, |
| | eyebrows_expand_mod, |
| | source_filename ) = data |
| | |
| | sample_list.append( Sample(filename=filename, |
| | sample_type=SampleType.FACE, |
| | face_type=FaceType.fromString (face_type), |
| | shape=shape, |
| | landmarks=landmarks, |
| | seg_ie_polys=seg_ie_polys, |
| | xseg_mask_compressed=xseg_mask_compressed, |
| | eyebrows_expand_mod=eyebrows_expand_mod, |
| | source_filename=source_filename, |
| | )) |
| | return sample_list |
| |
|
| | @staticmethod |
| | def upgradeToFaceTemporalSortedSamples( samples ): |
| | new_s = [ (s, s.source_filename) for s in samples] |
| | new_s = sorted(new_s, key=operator.itemgetter(1)) |
| |
|
| | return [ s[0] for s in new_s] |
| |
|
| |
|
| | class FaceSamplesLoaderSubprocessor(Subprocessor): |
| | |
| | def __init__(self, image_paths ): |
| | self.image_paths = image_paths |
| | self.image_paths_len = len(image_paths) |
| | self.idxs = [*range(self.image_paths_len)] |
| | self.result = [None]*self.image_paths_len |
| | super().__init__('FaceSamplesLoader', FaceSamplesLoaderSubprocessor.Cli, 60) |
| |
|
| | |
| | def on_clients_initialized(self): |
| | io.progress_bar ("Loading samples", len (self.image_paths)) |
| |
|
| | |
| | def on_clients_finalized(self): |
| | io.progress_bar_close() |
| |
|
| | |
| | def process_info_generator(self): |
| | for i in range(min(multiprocessing.cpu_count(), 8) ): |
| | yield 'CPU%d' % (i), {}, {} |
| |
|
| | |
| | def get_data(self, host_dict): |
| | if len (self.idxs) > 0: |
| | idx = self.idxs.pop(0) |
| | return idx, self.image_paths[idx] |
| |
|
| | return None |
| |
|
| | |
| | def on_data_return (self, host_dict, data): |
| | self.idxs.insert(0, data[0]) |
| |
|
| | |
| | def on_result (self, host_dict, data, result): |
| | idx, dflimg = result |
| | self.result[idx] = (self.image_paths[idx], dflimg) |
| | io.progress_bar_inc(1) |
| |
|
| | |
| | def get_result(self): |
| | return self.result |
| |
|
| | class Cli(Subprocessor.Cli): |
| | |
| | def process_data(self, data): |
| | idx, filename = data |
| | dflimg = DFLIMG.load (Path(filename)) |
| |
|
| | if dflimg is None or not dflimg.has_data(): |
| | self.log_err (f"FaceSamplesLoader: {filename} is not a dfl image file.") |
| | data = None |
| | else: |
| | data = (dflimg.get_face_type(), |
| | dflimg.get_shape(), |
| | dflimg.get_landmarks(), |
| | dflimg.get_seg_ie_polys(), |
| | dflimg.get_xseg_mask_compressed(), |
| | dflimg.get_eyebrows_expand_mod(), |
| | dflimg.get_source_filename() ) |
| |
|
| | return idx, data |
| |
|
| | |
| | def get_data_name (self, data): |
| | |
| | return data[1] |
| |
|