| |
|
| |
|
| | import os
|
| | from PIL import Image
|
| | import numpy as np
|
| | import json
|
| |
|
| | Image.MAX_IMAGE_PIXELS = None
|
| |
|
| | from concurrent.futures import ThreadPoolExecutor
|
| | from tqdm import tqdm
|
| |
|
| | max_pixels=2048*2048
|
| |
|
| | max_long_size=4096
|
| | def has_alpha(img:Image.Image):
|
| | for band in img.getbands():
|
| | if band in {'A','a','P'}:
|
| | return True
|
| | return False
|
| |
|
| | def add_white_background(img:Image.Image)->Image.Image:
|
| | img=img.convert('RGBA')
|
| | background = Image.new('RGBA', img.size, (255, 255, 255))
|
| | img = Image.alpha_composite(background, img)
|
| | return img
|
| |
|
| | def resize_image(image:Image.Image)->Image.Image:
|
| |
|
| | width, height = image.size
|
| | max_side = max(width, height)
|
| | current_pixels=width*height
|
| |
|
| |
|
| | if max_side > max_long_size or current_pixels>max_pixels:
|
| |
|
| |
|
| | scale = min((max_long_size / max_side),
|
| | ((max_pixels / current_pixels) ** 0.5))
|
| |
|
| | new_width = int(width * scale)
|
| | new_height = int(height * scale)
|
| |
|
| | resized_image = image.resize((new_width, new_height),
|
| | Image.BICUBIC
|
| | )
|
| | return resized_image
|
| |
|
| | return image
|
| |
|
| | def load_image(image_path:str)->Image.Image:
|
| | try:
|
| | with Image.open(image_path) as img:
|
| | img.load()
|
| | np.array(img)
|
| | img=resize_image(img)
|
| | if has_alpha(img):
|
| | img=add_white_background(img)
|
| | if not img.mode == "RGB":
|
| | img = img.convert("RGB")
|
| | return img
|
| | except:
|
| | return None
|
| |
|
| | def get_image_metainfo(img):
|
| | if img is None:
|
| | return None
|
| | else:
|
| | width, height = img.size
|
| | return {'width':width,
|
| | 'height':height,
|
| | 'pixel_num':width*height,
|
| |
|
| | }
|
| |
|
| |
|
| | def process_image(input_image_path:str,output_image_path:str):
|
| |
|
| | img=load_image(input_image_path)
|
| |
|
| | image_metainfo=get_image_metainfo(img)
|
| |
|
| | output_image_json_path=output_image_path.replace(".webp",".json")
|
| |
|
| |
|
| | if img is not None and image_metainfo is not None:
|
| | img.save(output_image_path,"WEBP",quality=90)
|
| | with open(output_image_json_path,'w') as f:
|
| | json.dump(image_metainfo,f,indent=4)
|
| |
|
| | def get_image_paths(input_dir, output_dir):
|
| | for root, _, files in os.walk(input_dir):
|
| | for file in files:
|
| | if file.lower().endswith(('.png', '.jpg', '.jpeg', '.webp', '.bmp')):
|
| | input_path = os.path.join(root, file)
|
| | rel_path = os.path.relpath(input_path,
|
| | input_dir)
|
| | output_path = os.path.join(output_dir,
|
| | os.path.splitext(rel_path)[0] + '.webp')
|
| | os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
| | yield input_path, output_path
|
| |
|
| | def process_images_with_thread_pool(input_image_dir:str,
|
| | output_image_dir:str,
|
| | num_threads=16):
|
| | os.makedirs(output_image_dir, exist_ok=True)
|
| | image_paths = get_image_paths(input_image_dir, output_image_dir)
|
| | with ThreadPoolExecutor(max_workers=num_threads) as executor:
|
| |
|
| | futures = []
|
| | for input_path, output_path in image_paths:
|
| | futures.append(executor.submit(process_image,
|
| | input_path,
|
| | output_path))
|
| | for _ in tqdm(
|
| | executor.map(lambda f: f.result(), futures),
|
| | total=len(futures),
|
| | desc="Processing images"):
|
| | pass
|
| |
|
| | if __name__ == "__main__":
|
| |
|
| |
|
| |
|
| | process_images_with_thread_pool(input_image_dir=r"20240808\unsplash-research-dataset-lite-latest\test",
|
| | output_image_dir=r"20240808\unsplash-research-dataset-lite-latest\output",
|
| | num_threads=16) |