import cv2 import numpy as np import pyclipper from shapely.geometry import Polygon from collections import namedtuple import warnings import torch warnings.filterwarnings('ignore') def iou_rotate(box_a, box_b, method='union'): rect_a = cv2.minAreaRect(box_a) rect_b = cv2.minAreaRect(box_b) r1 = cv2.rotatedRectangleIntersection(rect_a, rect_b) if r1[0] == 0: return 0 else: inter_area = cv2.contourArea(r1[1]) area_a = cv2.contourArea(box_a) area_b = cv2.contourArea(box_b) union_area = area_a + area_b - inter_area if union_area == 0 or inter_area == 0: return 0 if method == 'union': iou = inter_area / union_area elif method == 'intersection': iou = inter_area / min(area_a, area_b) else: raise NotImplementedError return iou class SegDetectorRepresenter(): def __init__(self, thresh=0.3, box_thresh=0.7, max_candidates=1000, unclip_ratio=1.5): self.min_size = 3 self.thresh = thresh self.box_thresh = box_thresh self.max_candidates = max_candidates self.unclip_ratio = unclip_ratio def __call__(self, batch, pred, is_output_polygon=False, height=None, width=None): ''' batch: (image, polygons, ignore_tags batch: a dict produced by dataloaders. image: tensor of shape (N, C, H, W). polygons: tensor of shape (N, K, 4, 2), the polygons of objective regions. ignore_tags: tensor of shape (N, K), indicates whether a region is ignorable or not. shape: the original shape of images. filename: the original filenames of images. pred: binary: text region segmentation map, with shape (N, H, W) thresh: [if exists] thresh hold prediction with shape (N, H, W) thresh_binary: [if exists] binarized with threshold, (N, H, W) ''' pred = pred[:, 0, :, :] segmentation = self.binarize(pred) boxes_batch = [] scores_batch = [] # print(pred.size()) batch_size = pred.size(0) if isinstance(pred, torch.Tensor) else pred.shape[0] if height is None: height = pred.shape[1] if width is None: width = pred.shape[2] for batch_index in range(batch_size): if is_output_polygon: boxes, scores = self.polygons_from_bitmap(pred[batch_index], segmentation[batch_index], width, height) else: boxes, scores = self.boxes_from_bitmap(pred[batch_index], segmentation[batch_index], width, height) boxes_batch.append(boxes) scores_batch.append(scores) return boxes_batch, scores_batch def binarize(self, pred) -> np.ndarray: return pred > self.thresh def polygons_from_bitmap(self, pred, _bitmap, dest_width, dest_height): ''' _bitmap: single map with shape (H, W), whose values are binarized as {0, 1} ''' assert len(_bitmap.shape) == 2 bitmap = _bitmap.cpu().numpy() # The first channel pred = pred.cpu().detach().numpy() height, width = bitmap.shape boxes = [] scores = [] contours, _ = cv2.findContours((bitmap * 255).astype(np.uint8), cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE) for contour in contours[:self.max_candidates]: epsilon = 0.005 * cv2.arcLength(contour, True) approx = cv2.approxPolyDP(contour, epsilon, True) points = approx.reshape((-1, 2)) if points.shape[0] < 4: continue # _, sside = self.get_mini_boxes(contour) # if sside < self.min_size: # continue score = self.box_score_fast(pred, contour.squeeze(1)) if self.box_thresh > score: continue if points.shape[0] > 2: box = self.unclip(points, unclip_ratio=self.unclip_ratio) if len(box) > 1: continue else: continue box = box.reshape(-1, 2) _, sside = self.get_mini_boxes(box.reshape((-1, 1, 2))) if sside < self.min_size + 2: continue if not isinstance(dest_width, int): dest_width = dest_width.item() dest_height = dest_height.item() box[:, 0] = np.clip(np.round(box[:, 0] / width * dest_width), 0, dest_width) box[:, 1] = np.clip(np.round(box[:, 1] / height * dest_height), 0, dest_height) boxes.append(box) scores.append(score) return boxes, scores def boxes_from_bitmap(self, pred, _bitmap, dest_width, dest_height): ''' _bitmap: single map with shape (H, W), whose values are binarized as {0, 1} ''' assert len(_bitmap.shape) == 2 if isinstance(pred, torch.Tensor): bitmap = _bitmap.cpu().numpy() # The first channel pred = pred.cpu().detach().numpy() else: bitmap = _bitmap # cv2.imwrite('tmp.png', (bitmap*255).astype(np.uint8)) height, width = bitmap.shape contours, _ = cv2.findContours((bitmap * 255).astype(np.uint8), cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE) num_contours = min(len(contours), self.max_candidates) boxes = np.zeros((num_contours, 4, 2), dtype=np.int64) scores = np.zeros((num_contours,), dtype=np.float32) for index in range(num_contours): contour = contours[index].squeeze(1) points, sside = self.get_mini_boxes(contour) # if sside < self.min_size: # continue if sside < 2: continue points = np.array(points) score = self.box_score_fast(pred, contour) # if self.box_thresh > score: # continue box = self.unclip(points, unclip_ratio=self.unclip_ratio).reshape(-1, 1, 2) box, sside = self.get_mini_boxes(box) # if sside < 5: # continue box = np.array(box) if not isinstance(dest_width, int): dest_width = dest_width.item() dest_height = dest_height.item() box[:, 0] = np.clip(np.round(box[:, 0] / width * dest_width), 0, dest_width) box[:, 1] = np.clip(np.round(box[:, 1] / height * dest_height), 0, dest_height) boxes[index, :, :] = box.astype(np.int64) scores[index] = score return boxes, scores def unclip(self, box, unclip_ratio=1.5): poly = Polygon(box) distance = poly.area * unclip_ratio / poly.length offset = pyclipper.PyclipperOffset() offset.AddPath(box, pyclipper.JT_ROUND, pyclipper.ET_CLOSEDPOLYGON) expanded = np.array(offset.Execute(distance)) return expanded def get_mini_boxes(self, contour): bounding_box = cv2.minAreaRect(contour) points = sorted(list(cv2.boxPoints(bounding_box)), key=lambda x: x[0]) index_1, index_2, index_3, index_4 = 0, 1, 2, 3 if points[1][1] > points[0][1]: index_1 = 0 index_4 = 1 else: index_1 = 1 index_4 = 0 if points[3][1] > points[2][1]: index_2 = 2 index_3 = 3 else: index_2 = 3 index_3 = 2 box = [points[index_1], points[index_2], points[index_3], points[index_4]] return box, min(bounding_box[1]) def box_score_fast(self, bitmap, _box): h, w = bitmap.shape[:2] box = _box.copy() xmin = np.clip(np.floor(box[:, 0].min()).astype(np.int32), 0, w - 1) xmax = np.clip(np.ceil(box[:, 0].max()).astype(np.int32), 0, w - 1) ymin = np.clip(np.floor(box[:, 1].min()).astype(np.int32), 0, h - 1) ymax = np.clip(np.ceil(box[:, 1].max()).astype(np.int32), 0, h - 1) mask = np.zeros((ymax - ymin + 1, xmax - xmin + 1), dtype=np.uint8) box[:, 0] = box[:, 0] - xmin box[:, 1] = box[:, 1] - ymin cv2.fillPoly(mask, box.reshape(1, -1, 2).astype(np.int32), 1) if bitmap.dtype == np.float16: bitmap = bitmap.astype(np.float32) return cv2.mean(bitmap[ymin:ymax + 1, xmin:xmax + 1], mask)[0] class AverageMeter(object): """Computes and stores the average and current value""" def __init__(self): self.reset() def reset(self): self.val = 0 self.avg = 0 self.sum = 0 self.count = 0 def update(self, val, n=1): self.val = val self.sum += val * n self.count += n self.avg = self.sum / self.count return self class DetectionIoUEvaluator(object): def __init__(self, is_output_polygon=False, iou_constraint=0.5, area_precision_constraint=0.5): self.is_output_polygon = is_output_polygon self.iou_constraint = iou_constraint self.area_precision_constraint = area_precision_constraint def evaluate_image(self, gt, pred): def get_union(pD, pG): return Polygon(pD).union(Polygon(pG)).area def get_intersection_over_union(pD, pG): return get_intersection(pD, pG) / get_union(pD, pG) def get_intersection(pD, pG): return Polygon(pD).intersection(Polygon(pG)).area def compute_ap(confList, matchList, numGtCare): correct = 0 AP = 0 if len(confList) > 0: confList = np.array(confList) matchList = np.array(matchList) sorted_ind = np.argsort(-confList) confList = confList[sorted_ind] matchList = matchList[sorted_ind] for n in range(len(confList)): match = matchList[n] if match: correct += 1 AP += float(correct) / (n + 1) if numGtCare > 0: AP /= numGtCare return AP perSampleMetrics = {} matchedSum = 0 Rectangle = namedtuple('Rectangle', 'xmin ymin xmax ymax') numGlobalCareGt = 0 numGlobalCareDet = 0 arrGlobalConfidences = [] arrGlobalMatches = [] recall = 0 precision = 0 hmean = 0 detMatched = 0 iouMat = np.empty([1, 1]) gtPols = [] detPols = [] gtPolPoints = [] detPolPoints = [] # Array of Ground Truth Polygons' keys marked as don't Care gtDontCarePolsNum = [] # Array of Detected Polygons' matched with a don't Care GT detDontCarePolsNum = [] pairs = [] detMatchedNums = [] arrSampleConfidences = [] arrSampleMatch = [] evaluationLog = "" for n in range(len(gt)): points = gt[n]['points'] # transcription = gt[n]['text'] dontCare = gt[n]['ignore'] if not Polygon(points).is_valid or not Polygon(points).is_simple: continue gtPol = points gtPols.append(gtPol) gtPolPoints.append(points) if dontCare: gtDontCarePolsNum.append(len(gtPols) - 1) evaluationLog += "GT polygons: " + str(len(gtPols)) + (" (" + str(len( gtDontCarePolsNum)) + " don't care)\n" if len(gtDontCarePolsNum) > 0 else "\n") for n in range(len(pred)): points = pred[n]['points'] if not Polygon(points).is_valid or not Polygon(points).is_simple: continue detPol = points detPols.append(detPol) detPolPoints.append(points) if len(gtDontCarePolsNum) > 0: for dontCarePol in gtDontCarePolsNum: dontCarePol = gtPols[dontCarePol] intersected_area = get_intersection(dontCarePol, detPol) pdDimensions = Polygon(detPol).area precision = 0 if pdDimensions == 0 else intersected_area / pdDimensions if (precision > self.area_precision_constraint): detDontCarePolsNum.append(len(detPols) - 1) break evaluationLog += "DET polygons: " + str(len(detPols)) + (" (" + str(len( detDontCarePolsNum)) + " don't care)\n" if len(detDontCarePolsNum) > 0 else "\n") if len(gtPols) > 0 and len(detPols) > 0: # Calculate IoU and precision matrixs outputShape = [len(gtPols), len(detPols)] iouMat = np.empty(outputShape) gtRectMat = np.zeros(len(gtPols), np.int8) detRectMat = np.zeros(len(detPols), np.int8) if self.is_output_polygon: for gtNum in range(len(gtPols)): for detNum in range(len(detPols)): pG = gtPols[gtNum] pD = detPols[detNum] iouMat[gtNum, detNum] = get_intersection_over_union(pD, pG) else: # gtPols = np.float32(gtPols) # detPols = np.float32(detPols) for gtNum in range(len(gtPols)): for detNum in range(len(detPols)): pG = np.float32(gtPols[gtNum]) pD = np.float32(detPols[detNum]) iouMat[gtNum, detNum] = iou_rotate(pD, pG) for gtNum in range(len(gtPols)): for detNum in range(len(detPols)): if gtRectMat[gtNum] == 0 and detRectMat[ detNum] == 0 and gtNum not in gtDontCarePolsNum and detNum not in detDontCarePolsNum: if iouMat[gtNum, detNum] > self.iou_constraint: gtRectMat[gtNum] = 1 detRectMat[detNum] = 1 detMatched += 1 pairs.append({'gt': gtNum, 'det': detNum}) detMatchedNums.append(detNum) evaluationLog += "Match GT #" + \ str(gtNum) + " with Det #" + str(detNum) + "\n" numGtCare = (len(gtPols) - len(gtDontCarePolsNum)) numDetCare = (len(detPols) - len(detDontCarePolsNum)) if numGtCare == 0: recall = float(1) precision = float(0) if numDetCare > 0 else float(1) else: recall = float(detMatched) / numGtCare precision = 0 if numDetCare == 0 else float( detMatched) / numDetCare hmean = 0 if (precision + recall) == 0 else 2.0 * \ precision * recall / (precision + recall) matchedSum += detMatched numGlobalCareGt += numGtCare numGlobalCareDet += numDetCare perSampleMetrics = { 'precision': precision, 'recall': recall, 'hmean': hmean, 'pairs': pairs, 'iouMat': [] if len(detPols) > 100 else iouMat.tolist(), 'gtPolPoints': gtPolPoints, 'detPolPoints': detPolPoints, 'gtCare': numGtCare, 'detCare': numDetCare, 'gtDontCare': gtDontCarePolsNum, 'detDontCare': detDontCarePolsNum, 'detMatched': detMatched, 'evaluationLog': evaluationLog } return perSampleMetrics def combine_results(self, results): numGlobalCareGt = 0 numGlobalCareDet = 0 matchedSum = 0 for result in results: numGlobalCareGt += result['gtCare'] numGlobalCareDet += result['detCare'] matchedSum += result['detMatched'] methodRecall = 0 if numGlobalCareGt == 0 else float( matchedSum) / numGlobalCareGt methodPrecision = 0 if numGlobalCareDet == 0 else float( matchedSum) / numGlobalCareDet methodHmean = 0 if methodRecall + methodPrecision == 0 else 2 * \ methodRecall * methodPrecision / ( methodRecall + methodPrecision) methodMetrics = {'precision': methodPrecision, 'recall': methodRecall, 'hmean': methodHmean} return methodMetrics class QuadMetric(): def __init__(self, is_output_polygon=False): self.is_output_polygon = is_output_polygon self.evaluator = DetectionIoUEvaluator(is_output_polygon=is_output_polygon) def measure(self, batch, output, box_thresh=0.6): ''' batch: (image, polygons, ignore_tags batch: a dict produced by dataloaders. image: tensor of shape (N, C, H, W). polygons: tensor of shape (N, K, 4, 2), the polygons of objective regions. ignore_tags: tensor of shape (N, K), indicates whether a region is ignorable or not. shape: the original shape of images. filename: the original filenames of images. output: (polygons, ...) ''' results = [] gt_polyons_batch = batch['text_polys'] ignore_tags_batch = batch['ignore_tags'] pred_polygons_batch = np.array(output[0]) pred_scores_batch = np.array(output[1]) for polygons, pred_polygons, pred_scores, ignore_tags in zip(gt_polyons_batch, pred_polygons_batch, pred_scores_batch, ignore_tags_batch): gt = [dict(points=np.int64(polygons[i]), ignore=ignore_tags[i]) for i in range(len(polygons))] if self.is_output_polygon: pred = [dict(points=pred_polygons[i]) for i in range(len(pred_polygons))] else: pred = [] # print(pred_polygons.shape) for i in range(pred_polygons.shape[0]): if pred_scores[i] >= box_thresh: # print(pred_polygons[i,:,:].tolist()) pred.append(dict(points=pred_polygons[i, :, :].astype(np.int32))) # pred = [dict(points=pred_polygons[i,:,:].tolist()) if pred_scores[i] >= box_thresh for i in range(pred_polygons.shape[0])] results.append(self.evaluator.evaluate_image(gt, pred)) return results def validate_measure(self, batch, output, box_thresh=0.6): return self.measure(batch, output, box_thresh) def evaluate_measure(self, batch, output): return self.measure(batch, output), np.linspace(0, batch['image'].shape[0]).tolist() def gather_measure(self, raw_metrics): raw_metrics = [image_metrics for batch_metrics in raw_metrics for image_metrics in batch_metrics] result = self.evaluator.combine_results(raw_metrics) precision = AverageMeter() recall = AverageMeter() fmeasure = AverageMeter() precision.update(result['precision'], n=len(raw_metrics)) recall.update(result['recall'], n=len(raw_metrics)) fmeasure_score = 2 * precision.val * recall.val / (precision.val + recall.val + 1e-8) fmeasure.update(fmeasure_score) return { 'precision': precision, 'recall': recall, 'fmeasure': fmeasure } def shrink_polygon_py(polygon, shrink_ratio): """ 对框进行缩放,返回去的比例为1/shrink_ratio 即可 """ cx = polygon[:, 0].mean() cy = polygon[:, 1].mean() polygon[:, 0] = cx + (polygon[:, 0] - cx) * shrink_ratio polygon[:, 1] = cy + (polygon[:, 1] - cy) * shrink_ratio return polygon def shrink_polygon_pyclipper(polygon, shrink_ratio): from shapely.geometry import Polygon import pyclipper polygon_shape = Polygon(polygon) distance = polygon_shape.area * (1 - np.power(shrink_ratio, 2)) / polygon_shape.length subject = [tuple(l) for l in polygon] padding = pyclipper.PyclipperOffset() padding.AddPath(subject, pyclipper.JT_ROUND, pyclipper.ET_CLOSEDPOLYGON) shrunk = padding.Execute(-distance) if shrunk == []: shrunk = np.array(shrunk) else: shrunk = np.array(shrunk[0]).reshape(-1, 2) return shrunk class MakeShrinkMap(): r''' Making binary mask from detection data with ICDAR format. Typically following the process of class `MakeICDARData`. ''' def __init__(self, min_text_size=4, shrink_ratio=0.4, shrink_type='pyclipper'): shrink_func_dict = {'py': shrink_polygon_py, 'pyclipper': shrink_polygon_pyclipper} self.shrink_func = shrink_func_dict[shrink_type] self.min_text_size = min_text_size self.shrink_ratio = shrink_ratio def __call__(self, data: dict) -> dict: """ 从scales中随机选择一个尺度,对图片和文本框进行缩放 :param data: {'imgs':,'text_polys':,'texts':,'ignore_tags':} :return: """ image = data['imgs'] text_polys = data['text_polys'] ignore_tags = data['ignore_tags'] h, w = image.shape[:2] text_polys, ignore_tags = self.validate_polygons(text_polys, ignore_tags, h, w) gt = np.zeros((h, w), dtype=np.float32) mask = np.ones((h, w), dtype=np.float32) for i in range(len(text_polys)): polygon = text_polys[i] height = max(polygon[:, 1]) - min(polygon[:, 1]) width = max(polygon[:, 0]) - min(polygon[:, 0]) if ignore_tags[i] or min(height, width) < self.min_text_size: cv2.fillPoly(mask, polygon.astype(np.int32)[np.newaxis, :, :], 0) ignore_tags[i] = True else: shrunk = self.shrink_func(polygon, self.shrink_ratio) if shrunk.size == 0: cv2.fillPoly(mask, polygon.astype(np.int32)[np.newaxis, :, :], 0) ignore_tags[i] = True continue cv2.fillPoly(gt, [shrunk.astype(np.int32)], 1) data['shrink_map'] = gt data['shrink_mask'] = mask return data def validate_polygons(self, polygons, ignore_tags, h, w): ''' polygons (numpy.array, required): of shape (num_instances, num_points, 2) ''' if len(polygons) == 0: return polygons, ignore_tags assert len(polygons) == len(ignore_tags) for polygon in polygons: polygon[:, 0] = np.clip(polygon[:, 0], 0, w - 1) polygon[:, 1] = np.clip(polygon[:, 1], 0, h - 1) for i in range(len(polygons)): area = self.polygon_area(polygons[i]) if abs(area) < 1: ignore_tags[i] = True if area > 0: polygons[i] = polygons[i][::-1, :] return polygons, ignore_tags def polygon_area(self, polygon): return cv2.contourArea(polygon) class MakeBorderMap(): def __init__(self, shrink_ratio=0.4, thresh_min=0.3, thresh_max=0.7): self.shrink_ratio = shrink_ratio self.thresh_min = thresh_min self.thresh_max = thresh_max def __call__(self, data: dict) -> dict: """ 从scales中随机选择一个尺度,对图片和文本框进行缩放 :param data: {'imgs':,'text_polys':,'texts':,'ignore_tags':} :return: """ im = data['imgs'] text_polys = data['text_polys'] ignore_tags = data['ignore_tags'] canvas = np.zeros(im.shape[:2], dtype=np.float32) mask = np.zeros(im.shape[:2], dtype=np.float32) for i in range(len(text_polys)): if ignore_tags[i]: continue self.draw_border_map(text_polys[i], canvas, mask=mask) canvas = canvas * (self.thresh_max - self.thresh_min) + self.thresh_min data['threshold_map'] = canvas data['threshold_mask'] = mask return data def draw_border_map(self, polygon, canvas, mask): polygon = np.array(polygon) assert polygon.ndim == 2 assert polygon.shape[1] == 2 polygon_shape = Polygon(polygon) if polygon_shape.area <= 0: return distance = polygon_shape.area * (1 - np.power(self.shrink_ratio, 2)) / polygon_shape.length subject = [tuple(l) for l in polygon] padding = pyclipper.PyclipperOffset() padding.AddPath(subject, pyclipper.JT_ROUND, pyclipper.ET_CLOSEDPOLYGON) padded_polygon = np.array(padding.Execute(distance)[0]) cv2.fillPoly(mask, [padded_polygon.astype(np.int32)], 1.0) xmin = padded_polygon[:, 0].min() xmax = padded_polygon[:, 0].max() ymin = padded_polygon[:, 1].min() ymax = padded_polygon[:, 1].max() width = xmax - xmin + 1 height = ymax - ymin + 1 polygon[:, 0] = polygon[:, 0] - xmin polygon[:, 1] = polygon[:, 1] - ymin xs = np.broadcast_to( np.linspace(0, width - 1, num=width).reshape(1, width), (height, width)) ys = np.broadcast_to( np.linspace(0, height - 1, num=height).reshape(height, 1), (height, width)) distance_map = np.zeros( (polygon.shape[0], height, width), dtype=np.float32) for i in range(polygon.shape[0]): j = (i + 1) % polygon.shape[0] absolute_distance = self.distance(xs, ys, polygon[i], polygon[j]) distance_map[i] = np.clip(absolute_distance / distance, 0, 1) distance_map = distance_map.min(axis=0) xmin_valid = min(max(0, xmin), canvas.shape[1] - 1) xmax_valid = min(max(0, xmax), canvas.shape[1] - 1) ymin_valid = min(max(0, ymin), canvas.shape[0] - 1) ymax_valid = min(max(0, ymax), canvas.shape[0] - 1) canvas[ymin_valid:ymax_valid + 1, xmin_valid:xmax_valid + 1] = np.fmax( 1 - distance_map[ ymin_valid - ymin:ymax_valid - ymax + height, xmin_valid - xmin:xmax_valid - xmax + width], canvas[ymin_valid:ymax_valid + 1, xmin_valid:xmax_valid + 1]) def distance(self, xs, ys, point_1, point_2): ''' compute the distance from point to a line ys: coordinates in the first axis xs: coordinates in the second axis point_1, point_2: (x, y), the end of the line ''' height, width = xs.shape[:2] square_distance_1 = np.square(xs - point_1[0]) + np.square(ys - point_1[1]) square_distance_2 = np.square(xs - point_2[0]) + np.square(ys - point_2[1]) square_distance = np.square(point_1[0] - point_2[0]) + np.square(point_1[1] - point_2[1]) cosin = (square_distance - square_distance_1 - square_distance_2) / (2 * np.sqrt(square_distance_1 * square_distance_2)) square_sin = 1 - np.square(cosin) square_sin = np.nan_to_num(square_sin) result = np.sqrt(square_distance_1 * square_distance_2 * square_sin / square_distance) result[cosin < 0] = np.sqrt(np.fmin(square_distance_1, square_distance_2))[cosin < 0] return result def extend_line(self, point_1, point_2, result): ex_point_1 = (int(round(point_1[0] + (point_1[0] - point_2[0]) * (1 + self.shrink_ratio))), int(round(point_1[1] + (point_1[1] - point_2[1]) * (1 + self.shrink_ratio)))) cv2.line(result, tuple(ex_point_1), tuple(point_1), 4096.0, 1, lineType=cv2.LINE_AA, shift=0) ex_point_2 = (int(round(point_2[0] + (point_2[0] - point_1[0]) * (1 + self.shrink_ratio))), int(round(point_2[1] + (point_2[1] - point_1[1]) * (1 + self.shrink_ratio)))) cv2.line(result, tuple(ex_point_2), tuple(point_2), 4096.0, 1, lineType=cv2.LINE_AA, shift=0) return ex_point_1, ex_point_2