python手撕分水岭算法
1 分水岭算法实现
主要思路就是:
- 利用一个优先队列与有序队列(有序队列其实可以不用)。优先队列是按像素的灰度值排列的,灰度值低的先被淹。
- 通过统计像素的附近的点的标记种类个数来确认当前像素点的标记,并把该像素的四邻域入队。
- 注意要使用梯度图像,这样使得各个部分蔓延的速度一样。
import cv2
import numpy as np
import matplotlib.pyplot as plt
import random
import plotly.express as px
from queue import Queue,PriorityQueue
import sysclass WATERSHED:def __init__(self,img,markers):if img.ndim!=2:raise ValueError("请输入灰度图")self.img = img# 求图像梯度幅值self.gradient()self.markers = markers.astype(np.int32)self.img_highest = np.max(self.img)self.img_lowest = np.min(self.img)self.WSHED = -1# 分水岭self.IN_QUEUE = -2#已入队,但未赋予标记# 根据梯度幅值图的灰度最大最小值创建有序队列self.ordered_queue = [[] for _ in range(self.img_highest-self.img_lowest+1)] def gradient(self):# self.img = cv2.Canny(self.img,80,150)d_x= cv2.Sobel(self.img,cv2.CV_64F,1,0,ksize=3)d_y= cv2.Sobel(self.img,cv2.CV_64F,0,1,ksize=3)value = cv2.magnitude(d_x,d_y)self.img = cv2.convertScaleAbs(value)def ordered_queue_push(self,i,j):gray_leve = self.img[i][j]self.ordered_queue[gray_leve-self.img_lowest].append([i,j])self.markers[i][j] = self.IN_QUEUEdef PriorityQueue_push(self,water_stage):# 将等于水位高度的像素点全部入优先队列for pix_coord in self.ordered_queue[water_stage-self.img_lowest]:self.wait_water_queue.put((water_stage,pix_coord))def wait_water_queue_push(self,x,y):self.wait_water_queue.put((self.img[x][y],[x,y]))self.markers[x][y] = self.IN_QUEUEdef get_label(self,pix_coord:list,water_stage):x,y = pix_coordneighbour_pix_values = {}for i in [-1,0,1]:for j in [-1,0,1]:pix_x,pix_y = x+i,y+jif pix_x<0 or pix_y<0 or pix_x >=self.img.shape[0] \or pix_y >=self.img.shape[1] :continue# 必须以四邻域搜索,八邻域会越过边界if abs(i+j)==1:if neighbour_pix_values.get(self.markers[pix_x][pix_y])!=None:neighbour_pix_values[self.markers[pix_x][pix_y]]+=1else:neighbour_pix_values[self.markers[pix_x][pix_y]]=1if self.markers[pix_x][pix_y]==0:#不在队列且无标签if self.img[pix_x][pix_y]<=water_stage:#低于水位,入优先队列self.wait_water_queue_push(pix_x,pix_y)else:#高于水位,还淹不到self.ordered_queue_push(pix_x,pix_y)pix_values = list(neighbour_pix_values)#去掉分水岭、入队标记和未标记区域值pix_values = [value for value in pix_values \if value>0 and value!=self.IN_QUEUE]if len(pix_values)==1: self.markers[x][y]=pix_values[0]else :self.markers[x][y]=self.WSHED def run(self):for i in range(self.img.shape[0]):for j in range(self.img.shape[1]):#已标记区域全部入队if self.markers[i][j] >0:gray_leve = self.img[i][j]self.ordered_queue[gray_leve-self.img_lowest].append([i,j])for water_stage in range(self.img_lowest,self.img_highest+1):sys.stdout.write("水位高度{}".format(water_stage))sys.stdout.flush()self.wait_water_queue = PriorityQueue()#将水位为water_stage的全部入优先队列self.PriorityQueue_push(water_stage)#每轮循环结束小于等于water_stage的像素值都得清空while self.wait_water_queue.qsize() > 0:pix_coord = self.wait_water_queue.get()[1]self.get_label(pix_coord,water_stage)# 动画plt.clf()plt.imshow(self.markers)plt.pause(0.001)return self.markers
2 获取初始标记
如果如opencv官方例程那样,用距离变换来确认标记,非常容易过分割,所以我们可以通过手动进行标记。
可以使用opencv的鼠标事件实现标记。
import cv2
import numpy as npclass draw_markers:def __init__(self,img,resize_rate:int=1):if img.ndim == 2:self.img = cv2.cvtColor(img,cv2.COLOR_GRAY2BGR)else:self.img = imgself. resize_rate = resize_rateself.r,self.w = self.img.shape[:2]self.markers = np.zeros_like(self.img)# 放大self.img = cv2.resize(self.img,(self.w*resize_rate,self.r*resize_rate))self.markers = cv2.resize(self.markers,(self.w*resize_rate,self.r*resize_rate))self.cls_num =0#标记颜色列表self.color_list = [(255,0,0),(0,255,0),(0,0,255),(255,255,0),(255,0,255),(0,255,255)]self.drawing = Falsedef mark(self,event, x, y,flags, param):if event==cv2.EVENT_LBUTTONDOWN:self.drawing = Trueif event == cv2.EVENT_LBUTTONUP:self.drawing = Falseif self.drawing:cv2.circle(self.img,(x,y),radius=1,color=self.color_list[self.cls_num],thickness=-1)cv2.circle(self.markers,(x,y),radius=1,color=self.color_list[self.cls_num],thickness=-1)def draw(self):cv2.namedWindow('image')cv2.setMouseCallback('image',self.mark)while True:cv2.imshow('image',self.img)k = cv2.waitKey(1) & 0xFFif k==ord('c'):self.cls_num +=1print("第{}类".format(self.cls_num+1))if k==27:breakcv2.destroyAllWindows()result = cv2.cvtColor(self.markers,cv2.COLOR_BGR2GRAY)#不能用插值,会改变标签值result = result[::self.resize_rate,::self.resize_rate]cv2.imwrite('./markers.png',self.img)return result
3 主函数
- 首先获得标记
- 开运算
- 分水岭
要想分割的好,要避免不同区域的连通,相同区域的分裂。锐化和高斯模糊感觉用了效果不好。
img = cv2.imread('test.png',0)
#标记图像
DM = draw_markers(img,1)
markers = DM.draw()
#开运算
kernel = np.ones((5,5),dtype=np.uint8)
img = cv2.morphologyEx(img,cv2.MORPH_OPEN,kernel,iterations=1)
# img = cv2.erode(img,kernel=np.ones((5,5)),iterations=2)
# img = cv2.GaussianBlur(img,(5,5),sigmaX=0)# #锐化
# kernel = np.array([[1, 1, 1], [1, -8, 1], [1, 1, 1]], dtype=np.float32)
# imgLaplacian = cv2.filter2D(img, cv2.CV_32F, kernel)
# sharp =np.float32(img)
# imgResult = sharp - imgLaplacian
# imgResult = np.clip(imgResult,0,255)
# imgResult = np.uint8(imgResult)
#开始分水岭算法
WSHED= watershed.WATERSHED(img,markers)
result = WSHED.run()
#将所有的分水岭转为255
result[result==-1]==255
result = result.astype(np.uint8)
imshow(result,'result.png')
4 实验结果
4.1 标记图像
4.2 分割结果
对比opencv官方例程,过分割现象减少。
opencv例程结果如下:
5 参考文献
COLOR IMAGE SEGMENTATION
Image Segmentation with Distance Transform and Watershed Algorithm
OpenCV cv::watershed 分水岭算法论文解读以及numpy实现