
import os |
import time |
from functools import reduce |
from threading import Thread |
from PIL import Image |
class MosaicMaker(object): |
# 内部类,执行多线程拼图的任务类 |
class __SubTask: |
def __init__(self, n, cur_sub_im, new_im, m, box): |
self.n = n |
self.cur_sub_im = cur_sub_im |
self.new_im = new_im |
self.m = m |
self.box = box |
|
def work(self): |
# print("正在拼第%d张素材" % self.n) |
# 计算key值(灰度值,平均RGB,hash值,三选一) |
cur_sub_key = self.m.cal_key(self.cur_sub_im) |
# 搜索最匹配图片(灰度值,平均RGB,hash值,三选一) |
fit_sub = self.m.find_key(cur_sub_key) |
self.new_im.paste(fit_sub, self.box) |
|
# 内部类,执行多线程读取图库的任务类 |
class __ReadTask: |
def __init__(self, n, full_path, fin_w, fin_h, m): |
self.n = n |
self.full_path = full_path |
self.fin_w = fin_w |
self.fin_h = fin_h |
self.m = m |
|
def read(self): |
print("开始读取第%d张图片" % self.n) |
cur = Image.open(self.full_path) |
# 计算key值(灰度值,平均RGB,hash值,三选一) |
key = self.m.cal_key(cur) |
# 将素材缩放到目标大小 |
cur = cur.resize((self.fin_w, self.fin_h), Image.ANTIALIAS) |
self.m.get_all_img().update({key: cur}) |
|
# 图库目录 目标文件 输出路径 子图尺寸 最小像素单位 拼图模式 默认尺寸 |
def __init__(self, db_path, aim_path, out_path, sub_width=64, sub_height=64, min_unit=5, mode="RGB", default_w=1600, |
default_h=1280): |
self.__db_path = db_path |
self.__aim_path = aim_path |
self.__out_path = out_path |
self.__sub_width = sub_width |
self.__sub_height = sub_height |
self.__min_unit = min_unit |
self.__mode = mode |
self.__default_w = default_w |
self.__default_h = default_h |
self.__all_img = dict() |
|
# 对外提供的接口 |
def make(self): |
aim_im = Image.open(self.__aim_path) |
aim_width = aim_im.size[0] |
aim_height = aim_im.size[1] |
print("计算子图尺寸") |
if not self.__divide_sub_im(aim_width, aim_height): |
print("使用默认尺寸") |
aim_im = aim_im.resize((self.__default_w, self.__default_h), Image.ANTIALIAS) |
aim_width = aim_im.size[0] |
aim_height = aim_im.size[1] |
print("读取图库") |
start = time.time() |
self.__read_all_img(self.__db_path, self.__sub_width, self.__sub_height) |
print("耗时:%f秒" % (time.time() - start)) |
self.__core(aim_im, aim_width, aim_height) |
|
def __core(self, aim_im, width, height): |
new_im = Image.new("RGB", (width, height)) |
# 每行每列的图片数 |
w = width // self.__sub_width |
print("源文件尺寸为:(w:%d h:%d)" % (width, height)) |
print("子图的尺寸为:(w:%d h:%d)" % (self.__sub_width, self.__sub_height)) |
print("w:%d" % w) |
print("开始拼图,请稍等...") |
start = time.time() |
n = 1 |
thread_list = list() |
for i in range(w): |
task_list = list() |
for j in range(w): |
# 多线程版 |
left = i * self.__sub_width |
up = j * self.__sub_height |
right = (i + 1) * self.__sub_width |
down = (j + 1) * self.__sub_height |
box = (left, up, right, down) |
cur_sub_im = aim_im.crop(box) |
t = self.__SubTask(n, cur_sub_im, new_im, self, box) |
task_list.append(t) |
n += 1 |
thread = Thread(target=self.__sub_mission, args=(task_list,)) |
thread_list.append(thread) |
for t in thread_list: |
t.start() |
for t in thread_list: |
t.join() |
print("拼图完成,共耗时%f秒" % (time.time() - start)) |
# 将原图与拼图合并,提升观感 |
new_im = Image.blend(new_im, aim_im, 0.35) |
new_im.show() |
new_im.save(self.__out_path) |
|
# 拼图库线程执行的具体函数 |
@staticmethod |
def __sub_mission(missions): |
for task in missions: |
task.work() |
|
# 计算子图大小 |
def __divide_sub_im(self, width, height): |
flag = True |
g = self.__gcd(width, height) |
if g < 20: |
flag = False |
width = self.__default_w |
height = self.__default_h |
g = 320 |
|
if g == width: |
g = 320 |
self.__sub_width = self.__min_unit * (width // g) |
self.__sub_height = self.__min_unit * (height // g) |
return flag |
|
# 读取全部图片,按(灰度值,平均RGB,hash值)保存 fin_w,fin_h素材最终尺寸 |
def __read_all_img(self, db_path, fin_w, fin_h): |
files_name = os.listdir(db_path) |
n = 1 |
# 开启5个线程加载图片 |
ts = list() |
for i in range(5): |
ts.append(list()) |
for file_name in files_name: |
full_path = db_path + "\\" + file_name |
if os.path.isfile(full_path): |
read_task = self.__ReadTask(n, full_path, fin_w, fin_h, self) |
ts[n % 5].append(read_task) |
n += 1 |
tmp = list() |
for i in ts: |
t = Thread(target=self.__read_img, args=(i,)) |
t.start() |
tmp.append(t) |
for t in tmp: |
t.join() |
|
# 读取图库线程执行的具体函数 |
@staticmethod |
def __read_img(tasks): |
for task in tasks: |
task.read() |
|
# 计算key值 |
def cal_key(self, im): |
if self.__mode == "RGB": |
return self.__cal_avg_rgb(im) |
elif self.__mode == "gray": |
return self.__cal_gray(im) |
elif self.__mode == "hash": |
return self.__cal_hash(im) |
else: |
return "" |
|
# 获取key值 |
def find_key(self, im): |
if self.__mode == "RGB": |
return self.__find_by_rgb(im) |
elif self.__mode == "gray": |
return self.__find_by_gray(im) |
elif self.__mode == "hash": |
return self.__find_by_hash(im) |
else: |
return "" |
|
# 计算灰度值 |
@staticmethod |
def __cal_gray(im): |
if im.mode != "L": |
im = im.convert("L") |
return reduce(lambda x, y: x + y, im.getdata()) // (im.size[0] * im.size[1]) |
|
# 计算平均rgb值 |
@staticmethod |
def __cal_avg_rgb(im): |
if im.mode != "RGB": |
im = im.convert("RGB") |
pix = im.load() |
avg_r, avg_g, avg_b = 0, 0, 0 |
n = 1 |
for i in range(im.size[0]): |
for j in range(im.size[1]): |
r, g, b = pix[i, j] |
avg_r += r |
avg_g += g |
avg_b += b |
n += 1 |
avg_r /= n |
avg_g /= n |
avg_b /= n |
return str(avg_r) + "-" + str(avg_g) + "-" + str(avg_b) |
|
# 计算hash |
def __cal_hash(self, im): |
im = im.resize((8, 8), Image.ANTIALIAS) |
im = im.convert("L") |
avg_gray = self.__cal_gray(im) |
k = "" |
_0 = "0" |
_1 = "1" |
for i in im.getdata(): |
if i < avg_gray: |
k += _0 |
else: |
k += _1 |
return k |
|
# 辗转相除法求最大公约数 |
@staticmethod |
def __gcd(a, b): |
while a % b: |
a, b = b, a % b |
return b |
|
# 获取最佳素材(按灰度) |
def __find_by_gray(self, gray): |
m = 255 |
k = 0 |
for key in self.__all_img.keys(): |
cur_dif = abs(key - gray) |
if cur_dif < m: |
k = key |
m = cur_dif |
return self.__all_img[k] |
|
# 获取最佳素材(按pHash) |
def __find_by_hash(self, sub_hash): |
m = 65 |
k = 0 |
for key in self.__all_img.keys(): |
cur_dif = self.__dif_num(sub_hash, key) |
if cur_dif < m: |
k = key |
m = cur_dif |
return self.__all_img[k] |
|
@staticmethod |
def __dif_num(hash1, hash2): |
n = 0 |
for i in range(64): |
if hash1[i] != hash2[i]: |
n += 1 |
return n |
|
# # 获取最佳素材(按平均rgb) |
def __find_by_rgb(self, sub_rgb): |
sub_r, sub_g, sub_b = sub_rgb.split("-") |
m = 255 |
k = "" |
for key in self.__all_img.keys(): |
src_r, src_g, src_b = key.split("-") |
cur_dif = abs(float(sub_r) - float(src_r)) + abs(float(sub_g) - float(src_g)) + abs( |
float(sub_b) - float(src_b)) |
if cur_dif < m: |
m = cur_dif |
k = key |
return self.__all_img[k] |
|
def get_all_img(self): |
return self.__all_img |
|
|
if __name__ == '__main__': |
m = MosaicMaker("E:\\image", "1.jpg", |
"2.jpg") |
m.make() |
pass |



