diff --git a/image_compressor.py b/image_compressor.py index 576a8c5..cc634b0 100644 --- a/image_compressor.py +++ b/image_compressor.py @@ -12,22 +12,32 @@ from PIL import Image, ImageFile from pathlib import Path from concurrent.futures import ThreadPoolExecutor, as_completed +# Константы +TARGET_SIZE = 2 * 1024 * 1024 +MIN_SIZE = TARGET_SIZE +MAX_WORKERS = min(32, (multiprocessing.cpu_count() or 1) * 5) +DB_PATH = "image_compressor.db" + +# Настройки Pillow Image.MAX_IMAGE_PIXELS = None -warnings.simplefilter("ignore", Image.DecompressionBombWarning) ImageFile.LOAD_TRUNCATED_IMAGES = True warnings.filterwarnings("ignore", category=UserWarning, module="PIL") +warnings.simplefilter("ignore", Image.DecompressionBombWarning) +# Логирование logging.basicConfig( filename="image_compressor.log", level=logging.INFO, format="%(asctime)s - %(message)s", ) -MIN_SIZE = 2 * 1024 * 1024 -TARGET_SIZE_MB = 2 * 1024 * 1024 -MAX_WORKERS = min(32, (multiprocessing.cpu_count() or 1) * 5) +# Глобальные счётчики +processed_count = 0 +skipped_count = 0 +total_saved_bytes = 0 +db_lock = threading.Lock() -DB_PATH = "image_compressor.db" +# Инициализация БД conn = sqlite3.connect(DB_PATH, check_same_thread=False) cursor = conn.cursor() cursor.execute( @@ -35,27 +45,25 @@ cursor.execute( ) conn.commit() -processed_count = 0 -skipped_count = 0 -total_saved_bytes = 0 -db_lock = threading.Lock() + +# --- Утилиты --- -def get_tool_path(tool_name): +def get_tool_path(name: str) -> Path: if hasattr(sys, "_MEIPASS"): - return Path(sys._MEIPASS) / "tools" / tool_name - return Path("tools") / tool_name + return Path(sys._MEIPASS) / "tools" / name + return Path("tools") / name -def file_hash(path): +def file_hash(path: Path) -> str: hasher = hashlib.sha256() - with open(path, "rb") as f: + with path.open("rb") as f: for chunk in iter(lambda: f.read(65536), b""): hasher.update(chunk) return hasher.hexdigest() -def extract_exif(path): +def extract_exif(path: Path): try: with Image.open(path) as img: return img.info.get("exif") @@ -63,18 +71,20 @@ def extract_exif(path): return None -def inject_exif(jpeg_path, exif_bytes): +def inject_exif(path: Path, exif): try: - with Image.open(jpeg_path) as img: - rgb = img.convert("RGB") - rgb.save(jpeg_path, "JPEG", exif=exif_bytes) + with Image.open(path) as img: + img.convert("RGB").save(path, "JPEG", exif=exif) except Exception as e: - logging.error(f"Ошибка при вставке EXIF в {jpeg_path}: {e}") + logging.error(f"Не удалось вставить EXIF в {path}: {e}") + + +# --- Сжатие --- def convert_png_to_jpeg(path: Path) -> Path | None: + temp_path = path.with_suffix(".jpg") try: - temp_path = path.with_suffix(".jpg") with Image.open(path) as img: img.convert("RGB").save( temp_path, "JPEG", quality=85, optimize=True @@ -82,103 +92,99 @@ def convert_png_to_jpeg(path: Path) -> Path | None: if temp_path.stat().st_size < path.stat().st_size: path.unlink() return temp_path - else: - temp_path.unlink() + temp_path.unlink() except Exception as e: - logging.error(f"Ошибка при конвертации PNG в JPEG для {path}: {e}") + logging.error(f"Ошибка при конвертации PNG в JPEG: {path}: {e}") return None -def compress_with_external(path: str, ext: str) -> tuple[bool, Path]: - path = Path(path) +def compress_with_external( + path: Path, ext: str +) -> tuple[bool, Path] | tuple[None, Path]: original_size = path.stat().st_size tmp_path = path.with_name(path.stem + ".compressed" + path.suffix) - target_size = TARGET_SIZE_MB - - exif_data = extract_exif(path) + exif = extract_exif(path) try: if ext == ".png": - new_path = convert_png_to_jpeg(path) - if not new_path: + converted = convert_png_to_jpeg(path) + if not converted: return False, path - path = new_path + path = converted ext = ".jpg" + tool = None + args = [] + quality = 85 + if ext in [".jpg", ".jpeg"]: tool = get_tool_path("cjpeg-static.exe") - quality = 85 - while True: - subprocess.run( - [ - tool, - "-quality", - str(quality), - "-outfile", - str(tmp_path), - str(path), - ], - check=True, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - if os.path.getsize(tmp_path) <= target_size or quality < 50: - break - quality -= 5 - + args_base = [ + tool, + "-quality", + "", + "-outfile", + str(tmp_path), + str(path), + ] elif ext == ".webp": tool = get_tool_path("cwebp.exe") - quality = 80 - while True: - subprocess.run( - [ - tool, - str(path), - "-o", - str(tmp_path), - "-m", - "6", - "-q", - str(quality), - "-metadata", - "all", - ], - check=True, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - if os.path.getsize(tmp_path) <= target_size or quality < 50: - break - quality -= 5 + args_base = [ + tool, + str(path), + "-o", + str(tmp_path), + "-m", + "6", + "-q", + "", + "-metadata", + "all", + ] else: return False, path + + while quality >= 50: + args = args_base.copy() + if ext == ".webp": + args[args.index("")] = str(quality) + else: + args[args.index("")] = str(quality) + + subprocess.run( + args, + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + if tmp_path.stat().st_size <= TARGET_SIZE or quality <= 50: + break + quality -= 5 + except FileNotFoundError: return None, path except Exception as e: - logging.error(f"Ошибка внешнего сжатия {path}: {e}") + logging.error(f"Ошибка при сжатии {path} внешней утилитой: {e}") return False, path if tmp_path.exists(): - if exif_data: - inject_exif(tmp_path, exif_data) - new_size = tmp_path.stat().st_size - if new_size < original_size: + if tmp_path.stat().st_size < original_size: + if exif: + inject_exif(tmp_path, exif) tmp_path.replace(path) return True, path - else: - tmp_path.unlink() + tmp_path.unlink() return False, path -def compress_with_pillow(path: str) -> tuple[bool, Path]: - path = Path(path) +def compress_with_pillow(path: Path) -> tuple[bool, Path]: original_size = path.stat().st_size temp_path = path.with_name(path.stem + ".pillowtmp" + path.suffix) try: with Image.open(path) as img: + exif = img.info.get("exif") img_format = img.format - exif = img.info.get("exif", None) quality = 85 while quality >= 50: img.save( @@ -188,32 +194,26 @@ def compress_with_pillow(path: str) -> tuple[bool, Path]: quality=quality, exif=exif, ) - if temp_path.stat().st_size <= TARGET_SIZE_MB: + if temp_path.stat().st_size <= TARGET_SIZE: break quality -= 5 if temp_path.exists() and temp_path.stat().st_size < original_size: temp_path.replace(path) return True, path - elif temp_path.exists(): - temp_path.unlink() - return False, path + temp_path.unlink() except Exception as e: - logging.error(f"Ошибка Pillow для {path}: {e}") + logging.error(f"Pillow не смог сжать {path}: {e}") return False, path -def compress_image(path: str, fallback_to_pillow: bool = False): +def compress_image(path: Path, use_fallback: bool = False): global processed_count, skipped_count, total_saved_bytes try: - path = Path(path) - original_size = path.stat().st_size - if original_size < MIN_SIZE: + if path.stat().st_size < MIN_SIZE: skipped_count += 1 - logging.info( - f"Пропущено (уже малый): {path} ({original_size / 1024:.1f} KB)" - ) + logging.info(f"Пропущено (малый размер): {path}") return h = file_hash(path) @@ -223,101 +223,88 @@ def compress_image(path: str, fallback_to_pillow: bool = False): ) if cursor.fetchone(): skipped_count += 1 - logging.info( - f"Пропущено (уже сжато): {path} ({original_size / 1024:.1f} KB)" - ) + logging.info(f"Пропущено (уже обработано): {path}") return ext = path.suffix.lower() - result, path = compress_with_external(path, ext) + result, final_path = compress_with_external(path, ext) - if result is None and fallback_to_pillow: - result, path = compress_with_pillow(path) + if result is None and use_fallback: + result, final_path = compress_with_pillow(path) + + new_size = final_path.stat().st_size + original_size = path.stat().st_size - new_size = path.stat().st_size if result and new_size < original_size: - saved_bytes = original_size - new_size - total_saved_bytes += saved_bytes + saved = original_size - new_size + total_saved_bytes += saved percent = (1 - new_size / original_size) * 100 logging.info( - f"Сжато: {path} ({original_size / 1024:.1f} KB -> {new_size / 1024:.1f} KB, сохранено {percent:.2f}%)" + f"Сжато: {path} ({original_size//1024}KB -> {new_size//1024}KB, -{percent:.2f}%)" ) else: - logging.info( - f"Пропущено (не меньше): {path} ({original_size / 1024:.1f} KB)" - ) + logging.info(f"Пропущено (не уменьшилось): {path}") - h = file_hash(path) + h = file_hash(final_path) with db_lock: cursor.execute( "INSERT INTO processed_images(hash, filename) VALUES(?, ?)", - (h, path.name), + (h, final_path.name), ) conn.commit() processed_count += 1 except Exception as e: - logging.error(f"Ошибка обработки {path}: {e}") + logging.error(f"Ошибка при обработке {path}: {e}") -def find_images(root: str): - exts = {".png", ".jpg", ".jpeg", ".webp"} +# --- Основной процесс --- + + +def find_images(root: Path): + exts = {".jpg", ".jpeg", ".png", ".webp"} for dirpath, _, filenames in os.walk(root): - for f in filenames: - if Path(f).suffix.lower() in exts: - yield Path(dirpath) / f + for name in filenames: + if Path(name).suffix.lower() in exts: + yield Path(dirpath) / name def main(): - parser = argparse.ArgumentParser(description="Компрессор изображенеий") + parser = argparse.ArgumentParser( + description="Сжатие изображений до заданного размера" + ) parser.add_argument( - "--input", - help="Путь для сканирования. По умолчанию текущая директория.", - default=None, + "--input", help="Папка со входными изображениями", default=os.getcwd() ) args = parser.parse_args() - if args.input: - input_dir = Path(args.input) - else: - print( - "Не указан путь. Обрабатывать текущую папку и все подпапки? [y/n]" - ) - choice = input().strip().lower() - if choice != "y": - print("Отменено.") - return - input_dir = Path(os.getcwd()) + input_dir = Path(args.input) - print("Проверка утилит...") - required_tools = ["cjpeg-static.exe", "cwebp.exe"] - missing = [ - tool - for tool in required_tools - if not os.path.exists(get_tool_path(tool)) - ] + print("Проверка необходимых инструментов...") + required = ["cjpeg-static.exe", "cwebp.exe"] + missing = [t for t in required if not get_tool_path(t).exists()] - fallback = False + use_fallback = False if missing: - print("Не найдены внешние утилиты:", ", ".join(missing)) - print("Использовать Pillow вместо них, где возможно? [y/n]") - if input().strip().lower() == "y": - fallback = True - else: - print("Без утилит работа невозможна.") + print("Не найдены:", ", ".join(missing)) + choice = input("Использовать Pillow? [y/n]: ").strip().lower() + if choice != "y": + print("Работа прервана.") return + use_fallback = True files = list(find_images(input_dir)) print(f"Найдено {len(files)} изображений.") with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: - futures = [executor.submit(compress_image, f, fallback) for f in files] + futures = [ + executor.submit(compress_image, f, use_fallback) for f in files + ] for i, _ in enumerate(as_completed(futures), 1): - print(f"\rОбработка изображений: {i}/{len(files)}", end="") + print(f"\rОбработка: {i}/{len(files)}", end="") - print("\nОбработка завершена.") - print(f"Всего обработано: {processed_count}") - print(f"Пропущено: {skipped_count}") + print("\nГотово.") + print(f"Обработано: {processed_count}, Пропущено: {skipped_count}") print(f"Сэкономлено: {total_saved_bytes / 1024 / 1024:.2f} MB") logging.info( f"Завершено. Обработано: {processed_count}, Пропущено: {skipped_count}, Сэкономлено: {total_saved_bytes / 1024 / 1024:.2f} MB" @@ -329,4 +316,4 @@ if __name__ == "__main__": main() except Exception as e: logging.exception("Ошибка в main()") - input() + input("Нажмите Enter для выхода...")