| | import traceback |
| | import json |
| | import multiprocessing |
| | import shutil |
| | from pathlib import Path |
| | import cv2 |
| | import numpy as np |
| |
|
| | from core import imagelib, pathex |
| | from core.cv2ex import * |
| | from core.interact import interact as io |
| | from core.joblib import Subprocessor |
| | from core.leras import nn |
| | from DFLIMG import * |
| | from facelib import FaceType, LandmarksProcessor |
| | from . import Extractor, Sorter |
| | from .Extractor import ExtractSubprocessor |
| |
|
| |
|
| | def extract_vggface2_dataset(input_dir, device_args={} ): |
| | multi_gpu = device_args.get('multi_gpu', False) |
| | cpu_only = device_args.get('cpu_only', False) |
| |
|
| | input_path = Path(input_dir) |
| | if not input_path.exists(): |
| | raise ValueError('Input directory not found. Please ensure it exists.') |
| |
|
| | bb_csv = input_path / 'loose_bb_train.csv' |
| | if not bb_csv.exists(): |
| | raise ValueError('loose_bb_train.csv found. Please ensure it exists.') |
| |
|
| | bb_lines = bb_csv.read_text().split('\n') |
| | bb_lines.pop(0) |
| |
|
| | bb_dict = {} |
| | for line in bb_lines: |
| | name, l, t, w, h = line.split(',') |
| | name = name[1:-1] |
| | l, t, w, h = [ int(x) for x in (l, t, w, h) ] |
| | bb_dict[name] = (l,t,w, h) |
| |
|
| |
|
| | output_path = input_path.parent / (input_path.name + '_out') |
| |
|
| | dir_names = pathex.get_all_dir_names(input_path) |
| |
|
| | if not output_path.exists(): |
| | output_path.mkdir(parents=True, exist_ok=True) |
| |
|
| | data = [] |
| | for dir_name in io.progress_bar_generator(dir_names, "Collecting"): |
| | cur_input_path = input_path / dir_name |
| | cur_output_path = output_path / dir_name |
| |
|
| | if not cur_output_path.exists(): |
| | cur_output_path.mkdir(parents=True, exist_ok=True) |
| |
|
| | input_path_image_paths = pathex.get_image_paths(cur_input_path) |
| |
|
| | for filename in input_path_image_paths: |
| | filename_path = Path(filename) |
| |
|
| | name = filename_path.parent.name + '/' + filename_path.stem |
| | if name not in bb_dict: |
| | continue |
| |
|
| | l,t,w,h = bb_dict[name] |
| | if min(w,h) < 128: |
| | continue |
| |
|
| | data += [ ExtractSubprocessor.Data(filename=filename,rects=[ (l,t,l+w,t+h) ], landmarks_accurate=False, force_output_path=cur_output_path ) ] |
| |
|
| | face_type = FaceType.fromString('full_face') |
| |
|
| | io.log_info ('Performing 2nd pass...') |
| | data = ExtractSubprocessor (data, 'landmarks', 256, face_type, debug_dir=None, multi_gpu=multi_gpu, cpu_only=cpu_only, manual=False).run() |
| |
|
| | io.log_info ('Performing 3rd pass...') |
| | ExtractSubprocessor (data, 'final', 256, face_type, debug_dir=None, multi_gpu=multi_gpu, cpu_only=cpu_only, manual=False, final_output_path=None).run() |
| |
|
| |
|
| | """ |
| | import code |
| | code.interact(local=dict(globals(), **locals())) |
| | |
| | data_len = len(data) |
| | i = 0 |
| | while i < data_len-1: |
| | i_name = Path(data[i].filename).parent.name |
| | |
| | sub_data = [] |
| | |
| | for j in range (i, data_len): |
| | j_name = Path(data[j].filename).parent.name |
| | if i_name == j_name: |
| | sub_data += [ data[j] ] |
| | else: |
| | break |
| | i = j |
| | |
| | cur_output_path = output_path / i_name |
| | |
| | io.log_info (f"Processing: {str(cur_output_path)}, {i}/{data_len} ") |
| | |
| | if not cur_output_path.exists(): |
| | cur_output_path.mkdir(parents=True, exist_ok=True) |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | for dir_name in dir_names: |
| | |
| | cur_input_path = input_path / dir_name |
| | cur_output_path = output_path / dir_name |
| | |
| | input_path_image_paths = pathex.get_image_paths(cur_input_path) |
| | l = len(input_path_image_paths) |
| | #if l < 250 or l > 350: |
| | # continue |
| | |
| | io.log_info (f"Processing: {str(cur_input_path)} ") |
| | |
| | if not cur_output_path.exists(): |
| | cur_output_path.mkdir(parents=True, exist_ok=True) |
| | |
| | |
| | data = [] |
| | for filename in input_path_image_paths: |
| | filename_path = Path(filename) |
| | |
| | name = filename_path.parent.name + '/' + filename_path.stem |
| | if name not in bb_dict: |
| | continue |
| | |
| | bb = bb_dict[name] |
| | l,t,w,h = bb |
| | if min(w,h) < 128: |
| | continue |
| | |
| | data += [ ExtractSubprocessor.Data(filename=filename,rects=[ (l,t,l+w,t+h) ], landmarks_accurate=False ) ] |
| | |
| | |
| | |
| | io.log_info ('Performing 2nd pass...') |
| | data = ExtractSubprocessor (data, 'landmarks', 256, face_type, debug_dir=None, multi_gpu=False, cpu_only=False, manual=False).run() |
| | |
| | io.log_info ('Performing 3rd pass...') |
| | data = ExtractSubprocessor (data, 'final', 256, face_type, debug_dir=None, multi_gpu=False, cpu_only=False, manual=False, final_output_path=cur_output_path).run() |
| | |
| | |
| | io.log_info (f"Sorting: {str(cur_output_path)} ") |
| | Sorter.main (input_path=str(cur_output_path), sort_by_method='hist') |
| | |
| | import code |
| | code.interact(local=dict(globals(), **locals())) |
| | |
| | #try: |
| | # io.log_info (f"Removing: {str(cur_input_path)} ") |
| | # shutil.rmtree(cur_input_path) |
| | #except: |
| | # io.log_info (f"unable to remove: {str(cur_input_path)} ") |
| | |
| | |
| | |
| | |
| | def extract_vggface2_dataset(input_dir, device_args={} ): |
| | multi_gpu = device_args.get('multi_gpu', False) |
| | cpu_only = device_args.get('cpu_only', False) |
| | |
| | input_path = Path(input_dir) |
| | if not input_path.exists(): |
| | raise ValueError('Input directory not found. Please ensure it exists.') |
| | |
| | output_path = input_path.parent / (input_path.name + '_out') |
| | |
| | dir_names = pathex.get_all_dir_names(input_path) |
| | |
| | if not output_path.exists(): |
| | output_path.mkdir(parents=True, exist_ok=True) |
| | |
| | |
| | |
| | for dir_name in dir_names: |
| | |
| | cur_input_path = input_path / dir_name |
| | cur_output_path = output_path / dir_name |
| | |
| | l = len(pathex.get_image_paths(cur_input_path)) |
| | if l < 250 or l > 350: |
| | continue |
| | |
| | io.log_info (f"Processing: {str(cur_input_path)} ") |
| | |
| | if not cur_output_path.exists(): |
| | cur_output_path.mkdir(parents=True, exist_ok=True) |
| | |
| | Extractor.main( str(cur_input_path), |
| | str(cur_output_path), |
| | detector='s3fd', |
| | image_size=256, |
| | face_type='full_face', |
| | max_faces_from_image=1, |
| | device_args=device_args ) |
| | |
| | io.log_info (f"Sorting: {str(cur_input_path)} ") |
| | Sorter.main (input_path=str(cur_output_path), sort_by_method='hist') |
| | |
| | try: |
| | io.log_info (f"Removing: {str(cur_input_path)} ") |
| | shutil.rmtree(cur_input_path) |
| | except: |
| | io.log_info (f"unable to remove: {str(cur_input_path)} ") |
| | |
| | """ |
| |
|
| | |
| | def dev_test_68(input_dir ): |
| | |
| | input_path = Path(input_dir) |
| | if not input_path.exists(): |
| | raise ValueError('input_dir not found. Please ensure it exists.') |
| |
|
| | output_path = input_path.parent / (input_path.name+'_aligned') |
| |
|
| | io.log_info(f'Output dir is % {output_path}') |
| |
|
| | if output_path.exists(): |
| | output_images_paths = pathex.get_image_paths(output_path) |
| | if len(output_images_paths) > 0: |
| | io.input_bool("WARNING !!! \n %s contains files! \n They will be deleted. \n Press enter to continue." % (str(output_path)), False ) |
| | for filename in output_images_paths: |
| | Path(filename).unlink() |
| | else: |
| | output_path.mkdir(parents=True, exist_ok=True) |
| |
|
| | images_paths = pathex.get_image_paths(input_path) |
| |
|
| | for filepath in io.progress_bar_generator(images_paths, "Processing"): |
| | filepath = Path(filepath) |
| |
|
| |
|
| | pts_filepath = filepath.parent / (filepath.stem+'.pts') |
| | if pts_filepath.exists(): |
| | pts = pts_filepath.read_text() |
| | pts_lines = pts.split('\n') |
| |
|
| | lmrk_lines = None |
| | for pts_line in pts_lines: |
| | if pts_line == '{': |
| | lmrk_lines = [] |
| | elif pts_line == '}': |
| | break |
| | else: |
| | if lmrk_lines is not None: |
| | lmrk_lines.append (pts_line) |
| |
|
| | if lmrk_lines is not None and len(lmrk_lines) == 68: |
| | try: |
| | lmrks = [ np.array ( lmrk_line.strip().split(' ') ).astype(np.float32).tolist() for lmrk_line in lmrk_lines] |
| | except Exception as e: |
| | print(e) |
| | print(filepath) |
| | continue |
| |
|
| | rect = LandmarksProcessor.get_rect_from_landmarks(lmrks) |
| |
|
| | output_filepath = output_path / (filepath.stem+'.jpg') |
| |
|
| | img = cv2_imread(filepath) |
| | img = imagelib.normalize_channels(img, 3) |
| | cv2_imwrite(output_filepath, img, [int(cv2.IMWRITE_JPEG_QUALITY), 95] ) |
| | |
| | raise Exception("unimplemented") |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | io.log_info("Done.") |
| |
|
| | |
| | def extract_umd_csv(input_file_csv, |
| | face_type='full_face', |
| | device_args={} ): |
| |
|
| | |
| | multi_gpu = device_args.get('multi_gpu', False) |
| | cpu_only = device_args.get('cpu_only', False) |
| | face_type = FaceType.fromString(face_type) |
| |
|
| | input_file_csv_path = Path(input_file_csv) |
| | if not input_file_csv_path.exists(): |
| | raise ValueError('input_file_csv not found. Please ensure it exists.') |
| |
|
| | input_file_csv_root_path = input_file_csv_path.parent |
| | output_path = input_file_csv_path.parent / ('aligned_' + input_file_csv_path.name) |
| |
|
| | io.log_info("Output dir is %s." % (str(output_path)) ) |
| |
|
| | if output_path.exists(): |
| | output_images_paths = pathex.get_image_paths(output_path) |
| | if len(output_images_paths) > 0: |
| | io.input_bool("WARNING !!! \n %s contains files! \n They will be deleted. \n Press enter to continue." % (str(output_path)), False ) |
| | for filename in output_images_paths: |
| | Path(filename).unlink() |
| | else: |
| | output_path.mkdir(parents=True, exist_ok=True) |
| |
|
| | try: |
| | with open( str(input_file_csv_path), 'r') as f: |
| | csv_file = f.read() |
| | except Exception as e: |
| | io.log_err("Unable to open or read file " + str(input_file_csv_path) + ": " + str(e) ) |
| | return |
| |
|
| | strings = csv_file.split('\n') |
| | keys = strings[0].split(',') |
| | keys_len = len(keys) |
| | csv_data = [] |
| | for i in range(1, len(strings)): |
| | values = strings[i].split(',') |
| | if keys_len != len(values): |
| | io.log_err("Wrong string in csv file, skipping.") |
| | continue |
| |
|
| | csv_data += [ { keys[n] : values[n] for n in range(keys_len) } ] |
| |
|
| | data = [] |
| | for d in csv_data: |
| | filename = input_file_csv_root_path / d['FILE'] |
| |
|
| |
|
| | x,y,w,h = float(d['FACE_X']), float(d['FACE_Y']), float(d['FACE_WIDTH']), float(d['FACE_HEIGHT']) |
| |
|
| | data += [ ExtractSubprocessor.Data(filename=filename, rects=[ [x,y,x+w,y+h] ]) ] |
| |
|
| | images_found = len(data) |
| | faces_detected = 0 |
| | if len(data) > 0: |
| | io.log_info ("Performing 2nd pass from csv file...") |
| | data = ExtractSubprocessor (data, 'landmarks', multi_gpu=multi_gpu, cpu_only=cpu_only).run() |
| |
|
| | io.log_info ('Performing 3rd pass...') |
| | data = ExtractSubprocessor (data, 'final', face_type, None, multi_gpu=multi_gpu, cpu_only=cpu_only, manual=False, final_output_path=output_path).run() |
| | faces_detected += sum([d.faces_detected for d in data]) |
| |
|
| |
|
| | io.log_info ('-------------------------') |
| | io.log_info ('Images found: %d' % (images_found) ) |
| | io.log_info ('Faces detected: %d' % (faces_detected) ) |
| | io.log_info ('-------------------------') |
| |
|
| |
|
| | |
| | def dev_test1(input_dir): |
| | |
| | |
| | image_size = 1024 |
| | face_type = FaceType.HEAD |
| | |
| | input_path = Path(input_dir) |
| | images_path = input_path / 'images' |
| | if not images_path.exists: |
| | raise ValueError('LaPa dataset: images folder not found.') |
| | labels_path = input_path / 'labels' |
| | if not labels_path.exists: |
| | raise ValueError('LaPa dataset: labels folder not found.') |
| | landmarks_path = input_path / 'landmarks' |
| | if not landmarks_path.exists: |
| | raise ValueError('LaPa dataset: landmarks folder not found.') |
| | |
| | output_path = input_path / 'out' |
| | if output_path.exists(): |
| | output_images_paths = pathex.get_image_paths(output_path) |
| | if len(output_images_paths) != 0: |
| | io.input(f"\n WARNING !!! \n {output_path} contains files! \n They will be deleted. \n Press enter to continue.\n") |
| | for filename in output_images_paths: |
| | Path(filename).unlink() |
| | output_path.mkdir(parents=True, exist_ok=True) |
| | |
| | data = [] |
| | |
| | img_paths = pathex.get_image_paths (images_path) |
| | for filename in img_paths: |
| | filepath = Path(filename) |
| |
|
| | landmark_filepath = landmarks_path / (filepath.stem + '.txt') |
| | if not landmark_filepath.exists(): |
| | raise ValueError(f'no landmarks for {filepath}') |
| | |
| | |
| | |
| | lm = landmark_filepath.read_text() |
| | lm = lm.split('\n') |
| | if int(lm[0]) != 106: |
| | raise ValueError(f'wrong landmarks format in {landmark_filepath}') |
| | |
| | lmrks = [] |
| | for i in range(106): |
| | x,y = lm[i+1].split(' ') |
| | x,y = float(x), float(y) |
| | lmrks.append ( (x,y) ) |
| | |
| | lmrks = np.array(lmrks) |
| | |
| | l,t = np.min(lmrks, 0) |
| | r,b = np.max(lmrks, 0) |
| | |
| | l,t,r,b = ( int(x) for x in (l,t,r,b) ) |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | data += [ ExtractSubprocessor.Data(filepath=filepath, rects=[ (l,t,r,b) ]) ] |
| |
|
| | |
| | |
| | |
| | if len(data) > 0: |
| | device_config = nn.DeviceConfig.BestGPU() |
| | |
| | io.log_info ("Performing 2nd pass...") |
| | data = ExtractSubprocessor (data, 'landmarks', image_size, 95, face_type, device_config=device_config).run() |
| | io.log_info ("Performing 3rd pass...") |
| | data = ExtractSubprocessor (data, 'final', image_size, 95, face_type, final_output_path=output_path, device_config=device_config).run() |
| |
|
| |
|
| | for filename in pathex.get_image_paths (output_path): |
| | filepath = Path(filename) |
| | |
| | |
| | dflimg = DFLJPG.load(filepath) |
| | |
| | src_filename = dflimg.get_source_filename() |
| | image_to_face_mat = dflimg.get_image_to_face_mat() |
| |
|
| | label_filepath = labels_path / ( Path(src_filename).stem + '.png') |
| | if not label_filepath.exists(): |
| | raise ValueError(f'{label_filepath} does not exist') |
| | |
| | mask = cv2_imread(label_filepath) |
| | |
| | mask[mask > 0] = 1 |
| | mask = cv2.warpAffine(mask, image_to_face_mat, (image_size, image_size), cv2.INTER_LINEAR) |
| | mask = cv2.blur(mask, (3,3) ) |
| | |
| | |
| | |
| | |
| | dflimg.set_xseg_mask(mask) |
| | dflimg.save() |
| | |
| | |
| | import code |
| | code.interact(local=dict(globals(), **locals())) |
| | |
| |
|
| | def dev_resave_pngs(input_dir): |
| | input_path = Path(input_dir) |
| | if not input_path.exists(): |
| | raise ValueError('input_dir not found. Please ensure it exists.') |
| |
|
| | images_paths = pathex.get_image_paths(input_path, image_extensions=['.png'], subdirs=True, return_Path_class=True) |
| |
|
| | for filepath in io.progress_bar_generator(images_paths,"Processing"): |
| | cv2_imwrite(filepath, cv2_imread(filepath)) |
| |
|
| |
|
| | def dev_segmented_trash(input_dir): |
| | input_path = Path(input_dir) |
| | if not input_path.exists(): |
| | raise ValueError('input_dir not found. Please ensure it exists.') |
| |
|
| | output_path = input_path.parent / (input_path.name+'_trash') |
| | output_path.mkdir(parents=True, exist_ok=True) |
| |
|
| | images_paths = pathex.get_image_paths(input_path, return_Path_class=True) |
| |
|
| | trash_paths = [] |
| | for filepath in images_paths: |
| | json_file = filepath.parent / (filepath.stem +'.json') |
| | if not json_file.exists(): |
| | trash_paths.append(filepath) |
| |
|
| | for filepath in trash_paths: |
| |
|
| | try: |
| | filepath.rename ( output_path / filepath.name ) |
| | except: |
| | io.log_info ('fail to trashing %s' % (src.name) ) |
| |
|
| |
|
| |
|
| | def dev_test(input_dir): |
| | """ |
| | extract FaceSynthetics dataset https://github.com/microsoft/FaceSynthetics |
| | |
| | BACKGROUND = 0 |
| | SKIN = 1 |
| | NOSE = 2 |
| | RIGHT_EYE = 3 |
| | LEFT_EYE = 4 |
| | RIGHT_BROW = 5 |
| | LEFT_BROW = 6 |
| | RIGHT_EAR = 7 |
| | LEFT_EAR = 8 |
| | MOUTH_INTERIOR = 9 |
| | TOP_LIP = 10 |
| | BOTTOM_LIP = 11 |
| | NECK = 12 |
| | HAIR = 13 |
| | BEARD = 14 |
| | CLOTHING = 15 |
| | GLASSES = 16 |
| | HEADWEAR = 17 |
| | FACEWEAR = 18 |
| | IGNORE = 255 |
| | """ |
| | |
| | |
| | image_size = 1024 |
| | face_type = FaceType.WHOLE_FACE |
| | |
| | input_path = Path(input_dir) |
| | |
| | |
| | |
| | output_path = input_path.parent / f'{input_path.name}_out' |
| | if output_path.exists(): |
| | output_images_paths = pathex.get_image_paths(output_path) |
| | if len(output_images_paths) != 0: |
| | io.input(f"\n WARNING !!! \n {output_path} contains files! \n They will be deleted. \n Press enter to continue.\n") |
| | for filename in output_images_paths: |
| | Path(filename).unlink() |
| | output_path.mkdir(parents=True, exist_ok=True) |
| | |
| | data = [] |
| | |
| | for filepath in io.progress_bar_generator(pathex.get_paths(input_path), "Processing"): |
| | if filepath.suffix == '.txt': |
| | |
| | image_filepath = filepath.parent / f'{filepath.name.split("_")[0]}.png' |
| | if not image_filepath.exists(): |
| | print(f'{image_filepath} does not exist, skipping') |
| | |
| | lmrks = [] |
| | for lmrk_line in filepath.read_text().split('\n'): |
| | if len(lmrk_line) == 0: |
| | continue |
| | |
| | x, y = lmrk_line.split(' ') |
| | x, y = float(x), float(y) |
| | |
| | lmrks.append( (x,y) ) |
| | |
| | lmrks = np.array(lmrks[:68], np.float32) |
| | rect = LandmarksProcessor.get_rect_from_landmarks(lmrks) |
| | data += [ ExtractSubprocessor.Data(filepath=image_filepath, rects=[rect], landmarks=[ lmrks ] ) ] |
| |
|
| | if len(data) > 0: |
| | io.log_info ("Performing 3rd pass...") |
| | data = ExtractSubprocessor (data, 'final', image_size, 95, face_type, final_output_path=output_path, device_config=nn.DeviceConfig.CPU()).run() |
| |
|
| | for filename in io.progress_bar_generator(pathex.get_image_paths (output_path), "Processing"): |
| | filepath = Path(filename) |
| | |
| | dflimg = DFLJPG.load(filepath) |
| | |
| | src_filename = dflimg.get_source_filename() |
| | image_to_face_mat = dflimg.get_image_to_face_mat() |
| | |
| | seg_filepath = input_path / ( Path(src_filename).stem + '_seg.png') |
| | if not seg_filepath.exists(): |
| | raise ValueError(f'{seg_filepath} does not exist') |
| | |
| | seg = cv2_imread(seg_filepath) |
| | seg_inds = np.isin(seg, [1,2,3,4,5,6,9,10,11]) |
| | seg[~seg_inds] = 0 |
| | seg[seg_inds] = 1 |
| | seg = seg.astype(np.float32) |
| | seg = cv2.warpAffine(seg, image_to_face_mat, (image_size, image_size), cv2.INTER_LANCZOS4) |
| | dflimg.set_xseg_mask(seg) |
| | dflimg.save() |
| | |