Files
vid2sheet/vid2sheet.py
sakamoto f3e9dbe805 sync
2024-08-29 10:26:20 -04:00

265 lines
8.7 KiB
Python

from utils import Download
from utils import Misc
from utils import log
from utils import src, dest, alt_temp, img_format, ignore_temp, verbose, paper_size
from tqdm import tqdm
import tempfile
import os
import cv2
import re
import numpy as np
import img2pdf
import shutil
# temp
# src: str = "../test/hello.webm"
# src: str = "https://www.youtube.com/watch?v=tyloC0e-Tqk"
# dest: str = "/home/sakamoto/Public/test"
class Vid2Sheet:
def __init__(
self, src, dest, img_format: str, use_tempfile=True, ignore_temp=False
):
self.src = src
self.dest = dest
self.img_format = img_format
self.download = Download()
self.misc = Misc()
self.dest_temp_dir = os.path.join(dest, "tmp")
self.define_temp = (
tempfile.TemporaryDirectory() if use_tempfile else self.dest_temp_dir
)
self.temp_dir = (
self.define_temp.name
if isinstance(self.define_temp, tempfile.TemporaryDirectory)
else self.define_temp
)
self.stitch_dir = os.path.join(self.temp_dir, "stitch")
self.video_dir = os.path.join(self.temp_dir, "video")
self.raw_dir = os.path.join(self.temp_dir, "raw")
os.makedirs(self.dest_temp_dir, exist_ok=True)
os.makedirs(self.temp_dir, exist_ok=True)
os.makedirs(self.stitch_dir, exist_ok=True)
os.makedirs(self.video_dir, exist_ok=True)
os.makedirs(self.raw_dir, exist_ok=True)
self.previous_frame = None
log.info(f"Source: {self.src}")
log.info(f"Destination: {self.dest}")
log.debug(f"Use Tempfile? {use_tempfile}")
log.debug(f"Ignore temp delete? {ignore_temp}")
log.debug(f"Verbose? {verbose}")
log.debug(f"Temporary Directory: {self.temp_dir}")
log.debug(f"Stitch Directory: {self.stitch_dir}")
log.debug(f"Video Directory: {self.video_dir}")
def __del__(self):
if isinstance(self.define_temp, tempfile.TemporaryDirectory):
log.debug(f"Deleting: {self.temp_dir}")
self.define_temp.cleanup()
else:
try:
if ignore_temp:
log.debug(f"Deleting: {self.dest_temp_dir}")
shutil.rmtree(self.dest_temp_dir)
except Exception as e:
log.error(e)
def run(self):
self.check()
self.capture()
self.stitch(paper_size)
self.convert()
def check(self):
youtube_pattern = re.compile(
r"(https?://)?(www\.)?"
r"(youtube\.com/watch\?v=|youtu\.be/)"
r"[a-zA-Z0-9_-]{11}",
re.IGNORECASE,
)
playlist_pattern = re.compile(
r"(https?://)?(www\.)?(youtube\.com/playlist\?list=)[a-zA-Z0-9_-]+",
re.IGNORECASE,
)
if re.match(youtube_pattern, self.src):
log.info("Detected YouTube link")
self.download.video(self.src, self.video_dir)
all_entries = os.listdir(self.video_dir)
files = [
os.path.join(self.video_dir, entry)
for entry in all_entries
if os.path.isfile(os.path.join(self.video_dir, entry))
]
self.src = os.path.join(self.video_dir, files[0])
elif re.match(playlist_pattern, self.src):
log.info("Detected YouTube playlist")
self.download.playlist(self.src, self.video_dir)
self.video = cv2.VideoCapture(self.src)
self.title = os.path.splitext(os.path.basename(self.src))[0]
self.total_frames = int(self.video.get(cv2.CAP_PROP_FRAME_COUNT))
if not self.video.isOpened():
log.error(f"Could not open video: {self.src}")
return
log.debug(f"Total frames: {self.total_frames}")
def capture(self, change_threshold=12500000):
count = 0
frame_count = 0
previous_frame = None
log.info("Analyzing and capturing caught frames; this may take a while...")
self.pbar_capture = tqdm(
total=self.total_frames,
desc="Capturing significant changes",
bar_format="{l_bar}{bar} | {n_fmt}/{total_fmt} frames | {rate_fmt} | {elapsed} elapsed",
)
while True:
ret, current_frame = self.video.read()
if not ret:
break
gray_current = cv2.cvtColor(current_frame, cv2.COLOR_BGR2GRAY)
img_output = os.path.join(self.raw_dir, f"img_{count:03}.{self.img_format}")
if previous_frame is None:
log.debug(f"Starting at frame {frame_count}")
cv2.imwrite(img_output, current_frame)
count += 1
else:
frame_diff = cv2.absdiff(previous_frame, gray_current)
diff_sum = np.sum(frame_diff)
if diff_sum > change_threshold:
# log.debug(
# f"Significant change found at frame {frame_count}, saving to {os.path.join(self.raw_dir, f"{img_output}")}"
# )
cv2.imwrite(img_output, current_frame)
count += 1
frame_count += 1
previous_frame = gray_current
self.pbar_capture.update(1)
self.pbar_capture.close()
self.video.release()
log.info("Analysis complete")
def stitch(self, paper_size="A4", dpi=300):
log.info("Attempting to stitch by three for every group...")
log.info(f"Chosen paper size: {paper_size}")
paper_sizes = {
"letter": (8.5, 11),
"A4": (8.27, 11.69),
"legal": (8.5, 14),
"tabloid": (11, 17),
}
if paper_size in paper_sizes:
width_inches, height_inches = paper_sizes[paper_size]
else:
log.error(f"Unsupported paper size '{paper_size}'. Using default 'A4'.")
width_inches, height_inches = paper_sizes["A4"]
letter_width = int(width_inches * dpi)
letter_height = int(height_inches * dpi)
image_files = sorted(os.listdir(self.raw_dir))
if not image_files:
log.error("Found no images for stitching")
exit()
for group_index in range(0, len(image_files), 3):
canvas = np.ones((letter_height, letter_width, 3), dtype=np.uint8) * 255
group_files = image_files[group_index : group_index + 3]
available_height = letter_height // 3
for i, img_file in enumerate(group_files):
img_path = os.path.join(self.raw_dir, img_file)
img = cv2.imread(img_path)
if img is None:
continue
img_height, img_width = img.shape[:2]
scale_factor = min(
letter_width / img_width, available_height / img_height
)
resized_img_width = int(
img_width * scale_factor * 0.95
)
resized_img_height = int(img_height * scale_factor * 0.95)
resized_img = cv2.resize(img, (resized_img_width, resized_img_height))
x_offset = (letter_width - resized_img_width) // 2
y_offset = int(
(available_height - resized_img_height) // 2
+ i * available_height * 0.9
)
canvas[
y_offset : y_offset + resized_img_height,
x_offset : x_offset + resized_img_width,
] = resized_img
output_path = os.path.join(
self.stitch_dir,
f"stitched_page_{group_index // 3 + 1:03}.{self.img_format}",
)
cv2.imwrite(output_path, canvas)
log.debug(
f"Saved stitched image for page {group_index // 3 + 1}: {output_path}"
)
log.info("Stitching done")
def convert(self):
log.info("Attempting to convert to pdf")
all_entries = os.listdir(self.stitch_dir)
imgs = [
os.path.join(self.stitch_dir, entry)
for entry in all_entries
if os.path.isfile(os.path.join(self.stitch_dir, entry))
]
imgs.sort()
log.debug(f"images in {self.stitch_dir} to convert: {imgs}")
if not imgs:
log.error("No images found for PDF conversion.")
exit()
file_name = self.title if self.title else "output"
pdf_path = os.path.join(self.dest, f"{file_name}.pdf")
with open(pdf_path, "wb") as f:
f.write(img2pdf.convert(imgs)) # ignore the error
log.info(f"Saved file to {pdf_path}")
if __name__ == "__main__":
vid2sheet = Vid2Sheet(src, dest, img_format, alt_temp, ignore_temp)
vid2sheet.run()