Исправление сравнения обжатия

This commit is contained in:
2025-05-14 12:03:05 +03:00
parent ddc367baa7
commit bace507717

View File

@@ -11,41 +11,41 @@ import warnings
from PIL import Image, ImageFile
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Tuple, Optional
# Константы
TARGET_SIZE = 2 * 1024 * 1024
# --- Константы ---
TARGET_SIZE = 2 * 1024 * 1024 # 2MB
MIN_SIZE = TARGET_SIZE
MAX_WORKERS = min(32, (multiprocessing.cpu_count() or 1) * 5)
DB_PATH = "image_compressor.db"
# Настройки Pillow
# --- Настройки Pillow ---
Image.MAX_IMAGE_PIXELS = None
ImageFile.LOAD_TRUNCATED_IMAGES = True
warnings.filterwarnings("ignore", category=UserWarning, module="PIL")
warnings.simplefilter("ignore", Image.DecompressionBombWarning)
# Логирование
# --- Логирование ---
logging.basicConfig(
filename="image_compressor.log",
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
# Глобальные счётчики
# --- Глобальные переменные ---
processed_count = 0
skipped_count = 0
total_saved_bytes = 0
db_lock = threading.Lock()
# Инициализация БД
# --- База данных ---
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
cursor = conn.cursor()
cursor.execute(
"CREATE TABLE IF NOT EXISTS processed_images (hash TEXT PRIMARY KEY, filename TEXT)"
"CREATE TABLE IF NOT EXISTS processed_images (hash TEXT PRIMARY KEY, filename TEXT, reduced BOOLEAN)"
)
conn.commit()
# --- Утилиты ---
@@ -82,7 +82,7 @@ def inject_exif(path: Path, exif):
# --- Сжатие ---
def convert_png_to_jpeg(path: Path) -> Path | None:
def convert_png_to_jpeg(path: Path) -> Optional[Path]:
temp_path = path.with_suffix(".jpg")
try:
with Image.open(path) as img:
@@ -100,7 +100,7 @@ def convert_png_to_jpeg(path: Path) -> Path | None:
def compress_with_external(
path: Path, ext: str
) -> tuple[bool, Path] | tuple[None, Path]:
) -> Tuple[Optional[bool], Path]:
original_size = path.stat().st_size
tmp_path = path.with_name(path.stem + ".compressed" + path.suffix)
exif = extract_exif(path)
@@ -112,15 +112,12 @@ def compress_with_external(
return False, path
path = converted
ext = ".jpg"
tool = None
args = []
quality = 85
original_size = path.stat().st_size
if ext in [".jpg", ".jpeg"]:
tool = get_tool_path("cjpeg-static.exe")
args_base = [
tool,
str(tool),
"-quality",
"",
"-outfile",
@@ -130,7 +127,7 @@ def compress_with_external(
elif ext == ".webp":
tool = get_tool_path("cwebp.exe")
args_base = [
tool,
str(tool),
str(path),
"-o",
str(tmp_path),
@@ -144,20 +141,17 @@ def compress_with_external(
else:
return False, path
quality = 85
while quality >= 50:
args = args_base.copy()
if ext == ".webp":
args[args.index("")] = str(quality)
else:
args[args.index("")] = str(quality)
args[args.index("")] = str(quality)
subprocess.run(
args,
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
if tmp_path.stat().st_size <= TARGET_SIZE or quality <= 50:
if tmp_path.stat().st_size <= TARGET_SIZE:
break
quality -= 5
@@ -177,7 +171,7 @@ def compress_with_external(
return False, path
def compress_with_pillow(path: Path) -> tuple[bool, Path]:
def compress_with_pillow(path: Path) -> Tuple[bool, Path]:
original_size = path.stat().st_size
temp_path = path.with_name(path.stem + ".pillowtmp" + path.suffix)
@@ -211,11 +205,14 @@ def compress_image(path: Path, use_fallback: bool = False):
global processed_count, skipped_count, total_saved_bytes
try:
if path.stat().st_size < MIN_SIZE:
if not path.exists() or path.stat().st_size < MIN_SIZE:
skipped_count += 1
logging.info(f"Пропущено (малый размер): {path}")
logging.info(
f"Пропущено (малый размер или не найден): {path} ({path.stat().st_size // 1024}KB)"
)
return
original_size = path.stat().st_size
h = file_hash(path)
with db_lock:
cursor.execute(
@@ -223,7 +220,9 @@ def compress_image(path: Path, use_fallback: bool = False):
)
if cursor.fetchone():
skipped_count += 1
logging.info(f"Пропущено (уже обработано): {path}")
logging.info(
f"Пропущено (уже обработано): {path} ({original_size // 1024}KB)"
)
return
ext = path.suffix.lower()
@@ -232,26 +231,32 @@ def compress_image(path: Path, use_fallback: bool = False):
if result is None and use_fallback:
result, final_path = compress_with_pillow(path)
if not final_path.exists():
logging.warning(f"Файл не найден после сжатия: {final_path}")
return
new_size = final_path.stat().st_size
original_size = path.stat().st_size
if result and new_size < original_size:
saved = original_size - new_size
total_saved_bytes += saved
percent = (1 - new_size / original_size) * 100
logging.info(
f"Сжато: {path} ({original_size//1024}KB -> {new_size//1024}KB, -{percent:.2f}%)"
)
else:
logging.info(f"Пропущено (не уменьшилось): {path}")
if result:
if new_size < original_size:
saved = original_size - new_size
total_saved_bytes += saved
percent = (1 - new_size / original_size) * 100
logging.info(
f"Сжато: {path} ({original_size//1024}KB -> {new_size//1024}KB, -{percent:.2f}%)"
)
else:
logging.info(
f"Пропущено (не уменьшилось): {path} ({new_size // 1024}KB)"
)
h = file_hash(final_path)
with db_lock:
cursor.execute(
"INSERT INTO processed_images(hash, filename) VALUES(?, ?)",
(h, final_path.name),
)
conn.commit()
h = file_hash(final_path)
with db_lock:
cursor.execute(
"INSERT INTO processed_images(hash, filename, reduced) VALUES(?, ?, ?)",
(h, final_path.name, new_size < original_size),
)
conn.commit()
processed_count += 1
except Exception as e:
@@ -269,6 +274,23 @@ def find_images(root: Path):
yield Path(dirpath) / name
def prepare_and_copy_files(input_dir: Path, output_dir: Path) -> list[Path]:
if input_dir.resolve() == output_dir.resolve():
return list(find_images(input_dir))
output_dir.mkdir(parents=True, exist_ok=True)
copied = []
for image in find_images(input_dir):
rel_path = image.relative_to(input_dir)
dest = output_dir / rel_path
dest.parent.mkdir(parents=True, exist_ok=True)
dest.write_bytes(image.read_bytes())
copied.append(dest)
return copied
def main():
parser = argparse.ArgumentParser(
description="Сжатие изображений до заданного размера"
@@ -276,9 +298,19 @@ def main():
parser.add_argument(
"--input", help="Папка со входными изображениями", default=os.getcwd()
)
parser.add_argument(
"--output", help="Папка для сжатых изображений", default=None
)
args = parser.parse_args()
input_dir = Path(args.input)
input_dir = Path(args.input).resolve()
output_dir = Path(args.output).resolve() if args.output else input_dir
print(f"Входная папка: {input_dir}")
print(f"Выходная папка: {output_dir}")
if input("Начать обработку? [y/n]: ").strip().lower() != "y":
print("Отменено.")
return
print("Проверка необходимых инструментов...")
required = ["cjpeg-static.exe", "cwebp.exe"]
@@ -293,7 +325,7 @@ def main():
return
use_fallback = True
files = list(find_images(input_dir))
files = prepare_and_copy_files(input_dir, output_dir)
print(f"Найдено {len(files)} изображений.")
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: