# a - добавить зону # t - перенести зоны с этого кадра на следующий кадр # x - посчитать оптический поток по зонам # z - очистить все # b - создать стандартные зоны и посчитать оптический поток # o, p - следующий / предыдущий кадр # e - перейти в режим выравнивания гистограммы from numpy.core.fromnumeric import shape from trfparser import TrfFrameCompressed, TrfFrame, TrfLocalMotion, parse_trf, write_trf import cv2 import sys import numpy as np import pickle import math import random def filter_keypoints(kps, bboxes): filtered_kps = [] for kp in list(kps): hitsbbox = False for bbox in bboxes: bb = bbox['bb'] if (kp[0, 0] > bb[0] and kp[0, 1] > bb[1] and kp[0, 0] < bb[0] + bb[2] and kp[0, 1] < bb[1] + bb[3]): hitsbbox = True break if not hitsbbox: filtered_kps.append(kp) return np.array(filtered_kps) def transform_from_trf_lms(local_motions, return_identity_on_fail=True): if len(local_motions) < 3: return np.array([[1, 0, 0], [0, 1, 0]], dtype=np.float32) if return_identity_on_fail else None prev_kps = [[lm.x, lm.y] for lm in local_motions] cur_kps = [[lm.x + lm.dx, lm.y + lm.dy] for lm in local_motions] # теперь affine2d, раз уж мы его используем trans, _ = cv2.estimateAffine2D(np.array(prev_kps, dtype=np.float32), np.array(cur_kps, dtype=np.float32), refineIters=20) if trans is None: return np.array([[1, 0, 0], [0, 1, 0]], dtype=np.float32) if return_identity_on_fail else None return trans def trans2xy(trans): if trans is None: return np.array((0, 0)) return np.array((trans[0, 2], trans[1, 2])) def paint_1(img, local_motions, scale): # local_motions = trf_frame.local_motions for lm in local_motions: # img[int(lm.x / scale), int(lm.y / scale)] = (0, 255, 0) x1 = int(lm.x / scale) y1 = int(lm.y / scale) x2 = x1 + int(lm.dx / scale) y2 = y1 + int(lm.dy / scale) cv2.circle(img, (x1, y1), 3, (0, 255, 0)) cv2.line(img, (x1, y1), (x2, y2), (0, 0, 255), thickness=2) trans = transform_from_trf_lms(local_motions) cv2.putText(img, f'LMs: {len(local_motions)}', (50, 210), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2) cv2.putText(img, f'SPEED: {trans2xy(trans)}', (50, 180), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2) # outImg = np.zeros(frame_gray.shape, np.uint8,order='C').ravel() # outImg = cv2.drawKeypoints(frame_gray, ti.kps_raw, outImg) # for sk in skaters: # if not (sk.bbox is None): # bb = sk.bbox # cv2.rectangle(outImg, (int(bb[0]), int(bb[1])), (int(bb[2] + bb[0]), int(bb[3] + bb[1])), (255, 255, 255), thickness=5) # if not (sk.mask is None): # mask1 = np.zeros((sk.mask.shape[0], sk.mask.shape[1], 3), dtype='uint8') # mask1[:,:,0] = sk.mask # mask1[:,:,1] = sk.mask # mask1[:,:,2] = sk.mask # outImg = ((0.4 * np.array([0, 255, 0])) * mask1 / 255 + (0.6 * outImg)).astype("uint8") # cv2.rectangle(outImg, (ti.bbox[0], ti.bbox[1]), (ti.bbox[0] + ti.bbox[2], ti.bbox[1] + ti.bbox[3]), (0, 255, 0), 3) # for kpp0, kpp1, is_inlier in zip(ti.prev_inlier_keypoints, ti.cur_inlier_keypoints, ti.inliers): # color = (0, 255, 0) if is_inlier else (0, 0, 255) # cv2.line(outImg, (int(kpp0[0]), int(kpp0[1])), (int(kpp1[0]), int(kpp1[1])), (0,0,255), 1) # cv2.putText(outImg, f'FRAME {frame_idx}', (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2) # cv2.putText(outImg, f'PREDICTED {predicted_motion}', (50, 80), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2) # cv2.putText(outImg, f'REAL {_trans_xy(ti.transform)}', (50, 130), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2) def probabilistic_round(x): return int(math.floor(x + random.random())) # перенести функцию в библиотеку # def write_trf_frame(f, frame, startframe=0): # lm_str_list = [] # if hasattr(frame, 'local_motions_compressed'): # for dx, dy, x, y in frame.local_motions_compressed: # kp_str = '(LM {0} {1} {2} {3} 112 0.5 0.5)'.format(dx, dy, x, y) # lm_str_list.append(kp_str) # else: # for lm in frame.local_motions: # kp_str = '(LM {0} {1} {2} {3} 112 0.5 0.5)'.format( # probabilistic_round(lm.dx), # probabilistic_round(lm.dy), # probabilistic_round(lm.x), # probabilistic_round(lm.y)) # lm_str_list.append(kp_str) # f.write('Frame {0} (List {1} [{2}])\n'.format(frame.idx - startframe, len(lm_str_list), ','.join(lm_str_list))) def convert_points_to_localmotions(flow_data): if flow_data is None: return [] pts0 = flow_data['pts0'] pts1 = flow_data['pts1'] return [TrfLocalMotion(p0[0] - p1[0], p0[1] - p1[1], p1[0], p1[1]) for p0, p1 in zip(pts0, pts1)] def convert_points_to_localmotions2(flow_data): if flow_data is None or len(flow_data['pts0']) == 0: return [] pts0 = np.array(flow_data['pts0']) pts1 = np.array(flow_data['pts1']) res = np.concatenate([pts0[:, 0] - pts1[:, 0], pts0[:, 1] - pts1[:, 1], pts1[:, 0], pts1[:, 1]]).reshape((4, -1)).T res += np.random.rand(*res.shape) return np.int16(np.floor(res)) def convert_points_to_cv2keypoints(pts, radius=0.5): return [cv2.KeyPoint(float(pt[0]), float(pt[1]), radius) for pt in pts] def kp_grid(bbox, step=9): x = np.arange(bbox[0], bbox[0] + bbox[2], step) y = np.arange(bbox[1], bbox[1] + bbox[3], step) xv, yv = np.meshgrid(x, y) grid = np.array((xv, yv)).T return grid.reshape((-1, 2)) def save_trf(outputpath, picklepath, trfframes, bboxes, new_tracking_cached, duplicates=[]): with open(picklepath, 'wb') as f: pickle.dump(bboxes, f) new_frames = [] for frame in trfframes: if frame.idx in new_tracking_cached: # new_frames.append(TrfFrame(frame.idx, local_motions=convert_points_to_localmotions(new_tracking_cached[frame.idx]))) new_frames.append(TrfFrameCompressed(frame.idx, local_motions=convert_points_to_localmotions2(new_tracking_cached[frame.idx]))) else: new_frames.append(frame) write_trf(outputpath, new_frames, 0) # import cv2.optflow def gen_motions_pyrlk(prev_frame, frame, bboxes, draw_equi): win_size = 30 if len(bboxes) == 0: return None if draw_equi: prev_frame = cv2.equalizeHist(cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)) frame = cv2.equalizeHist(cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)) # kps_grids = np.concatenate([np.array(kp_grid(bbox['bb']), dtype=np.float32) for bbox in bboxes]).reshape(-1, 1, 2) kps_grids = np.concatenate([np.array(kp_grid(bbox['bb']), dtype=np.float32) for bbox in bboxes]).reshape(-1, 1, 2) cur_kps, status, err = cv2.calcOpticalFlowPyrLK(prev_frame, frame, kps_grids, None, winSize=(win_size, win_size), maxLevel=5) if status is None: return None prev_matched_kps = [] cur_matched_kps = [] for prev_kp, cur_kp, match in zip(kps_grids, cur_kps, status): if match: prev_matched_kps.append(prev_kp[0]) cur_matched_kps.append(cur_kp[0]) if len(prev_matched_kps) < 3: prev_matched_kps = [] cur_matched_kps = [] trans = np.array([[1, 0, 0], [0, 1, 0]], dtype=np.float32) else: trans, inliers = cv2.estimateAffine2D(np.array(prev_matched_kps), np.array(cur_matched_kps), refineIters=20) prev_matched_kps = [kp for kp, is_inlier in zip(prev_matched_kps, inliers) if is_inlier] cur_matched_kps = [kp for kp, is_inlier in zip(cur_matched_kps, inliers) if is_inlier] return { 'pts0': prev_matched_kps, 'pts1': cur_matched_kps, 'trans': trans } import inver_stab rlof_tracker = inver_stab.InverTracker() rlof_tracker.init(1920, 1080) def gen_motions_rlof(prev_frame, frame, bboxes, draw_equi, use_affine): win_size = 30 if len(bboxes) == 0: return None if draw_equi: prev_frame = cv2.equalizeHist(cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)) frame = cv2.equalizeHist(cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)) # kps_grids = np.concatenate([np.array(kp_grid(bbox['bb']), dtype=np.float32) for bbox in bboxes]).reshape(-1, 1, 2) kps_grids = np.concatenate([np.array(kp_grid(bbox['bb'], step=20), dtype=np.float32) for bbox in bboxes]).reshape(-1, 1, 2) # rlofParam = cv2.optflow.RLOFOpticalFlowParameter_create() # rlofParam.setUseInitialFlow(False) # # rlofParam.setSupportRegionType(cv2.optflow.SR_CROSS) # rlofParam.setGlobalMotionRansacThreshold(10) # rlofParam.setUseIlluminationModel(True) # rlofParam.setMaxLevel(300) # rlofParam.setLargeWinSize(30 + 1) # rlofParam.setMaxIteration(300) # rlof = cv2.optflow.SparseRLOFOpticalFlow_create(rlofParam, 0) # cur_kps = kps_grids.copy() # status = np.zeros((kps_grids.shape[0],), dtype=np.uint8) # rlof.calc(prev_frame, frame, kps_grids, cur_kps, status) # print(rlof_tracker.rlof1(np.array(prev_frame, dtype=np.uint8).copy(), np.array(frame, dtype=np.uint8).copy())) try: cur_kps, status = rlof_tracker.rlof1(prev_frame, frame, kps_grids) except: return None # cur_kps1, status1, err1 = cv2.calcOpticalFlowPyrLK(prev_frame, frame, kps_grids, None) # import time # time.sleep(0.1) # np # cur_kps, status, err = cv2.optflow.calcOpticalFlowSparseRLOF(prev_frame, frame, kps_grids, None, rlofParam = rlofParam) # cur_kps, status, err = cv2.optflow.calcOpticalFlowSparseRLOF(prev_frame, frame, kps_grids, None) if status is None: return None prev_matched_kps = [] cur_matched_kps = [] for prev_kp, cur_kp, match in zip(kps_grids, cur_kps, status): if match: prev_matched_kps.append(prev_kp[0]) cur_matched_kps.append(cur_kp[0]) if len(prev_matched_kps) < 3: prev_matched_kps = [] cur_matched_kps = [] trans = np.array([[1, 0, 0], [0, 1, 0]], dtype=np.float32) else: fun = cv2.estimateAffine2D if use_affine else cv2.estimateAffinePartial2D trans, inliers = fun(np.array(prev_matched_kps), np.array(cur_matched_kps), refineIters=20) prev_matched_kps = [kp for kp, is_inlier in zip(prev_matched_kps, inliers) if is_inlier] cur_matched_kps = [kp for kp, is_inlier in zip(cur_matched_kps, inliers) if is_inlier] return { 'pts0': prev_matched_kps, 'pts1': cur_matched_kps, 'trans': trans } gen_motions = gen_motions_rlof import os, json def create_std_bbox(): W = 1920 H = 1080 SH = 400 SW = 600 OFFSET = 30 return [ {'bb': np.array([OFFSET, OFFSET, SW, SH])}, {'bb': np.array([W - OFFSET - SW, H - OFFSET - SH, SW, SH])}, {'bb': np.array([W - OFFSET - SW, OFFSET, SW, SH])}, {'bb': np.array([OFFSET, H - OFFSET - SH, SW, SH])}] def heal_trf_interactively(vidname, trfnames, picklename, outputname, badruns): video = cv2.VideoCapture(vidname) dupfile_name = vidname + '.duplicates.json' dups = [] dups_pos = -1 try: with open(dupfile_name, 'r') as f: dups = json.load(f) print(f'loaded dups from {dupfile_name}') except: print(f'file {dupfile_name} doesn\'t exits, can\'t load') # Exit if video not opened. if not video.isOpened(): print("Could not open video") sys.exit() print('Parsing TRF') trfframes = [] for trfname in trfnames: trf_flag = '' trf_offset = 0 if len(trfname) == 3: trfname, trf_flag, trf_offset = trfname new_frames = parse_trf(trfname) if trf_flag == 'append': trfframes += [TrfFrame(len(trfframes) + 1 + i, []) for i in range(trf_offset)] for frm in new_frames: frm.idx = len(trfframes) + 1 + trf_offset trfframes.append(frm) else: if len(new_frames) >= len(trfframes): trfframes = new_frames else: trfframes[0:len(new_frames)-1] = new_frames fix_numbers = True if fix_numbers: for i in range(len(trfframes)): trfframes[i].idx = i + 1 print(f'Loaded {len(trfframes)} frames...') frame_idx = 0 ok, frame = video.read() if not ok: print('Cannot read video file') sys.exit() draw_equi = False trf_frame = None prev_frame = None bboxes = [] scale_factor = 2 bbox_data = {} new_tracking_cached = {} status_string = '' cur_bad_run_idx = 0 is_saved = True try: if os.path.exists(picklename): with open(picklename, 'rb') as f: bbox_data = pickle.load(f) x = bbox_data[list(bbox_data.keys())[0]] except Exception: print('failed to load pickle') bbox_data = {} while True: if prev_frame is None: prev_frame = np.zeros(frame.shape, dtype='uint8') frame_resized = cv2.resize(frame, (960, 540)) frame_idx = int(video.get(cv2.CAP_PROP_POS_FRAMES)) def change_frame(old_frame_idx, new_frame_idx, save_bboxes=False): nonlocal bboxes bbox_data[old_frame_idx] = bboxes.copy() if not save_bboxes: if new_frame_idx in bbox_data: bboxes = bbox_data[new_frame_idx] else: bboxes = [] else: bbox_data[new_frame_idx] = bboxes.copy() def update_bbox_data(): nonlocal bboxes, is_saved bbox_data[frame_idx] = bboxes.copy() is_saved = False def go_to_frame(frame_to_go): nonlocal frame_idx, prev_frame, frame old_frame_idx = frame_idx video.set(cv2.CAP_PROP_POS_FRAMES, frame_to_go - 2) frame_idx = frame_to_go ok, prev_frame = video.read() if not ok: print('Cannot read video file') sys.exit() ok, frame = video.read() if not ok: print('Cannot read video file') sys.exit() change_frame(old_frame_idx, frame_to_go) if frame_idx > 1: trf_frame = trfframes[frame_idx - 1] if draw_equi: frame_resized_gray = cv2.cvtColor(frame_resized, cv2.COLOR_BGR2GRAY) frame_resized = cv2.cvtColor(cv2.equalizeHist(frame_resized_gray), cv2.COLOR_GRAY2BGR) cv2.putText(frame_resized, f'FRAME {frame_idx} trf={trf_frame.idx if trf_frame is not None else -1}', (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2) if cur_bad_run_idx < len(badruns) and cur_bad_run_idx >= 0: cv2.putText(frame_resized, f'Run N{cur_bad_run_idx}: {badruns[cur_bad_run_idx]}', (100,200), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (50,170,50), 2) if not is_saved: cv2.putText(frame_resized, f'NOT SAVED', (200, 300), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (50,50,170), 2) if trf_frame is not None: scale = 2 if frame_idx in new_tracking_cached: paint_1(frame_resized, convert_points_to_localmotions(new_tracking_cached[frame_idx]), scale) else: paint_1(frame_resized, trf_frame.local_motions, scale) for bbox in bboxes: bb = bbox['bb'] scaled_bbox = bb / scale_factor p1 = (int(scaled_bbox[0]), int(scaled_bbox[1])) p2 = (int(scaled_bbox[0] + scaled_bbox[2]), int(scaled_bbox[1] + scaled_bbox[3])) cv2.rectangle(frame_resized, p1, p2, (255,0,0), 2, 1) def do_tracking(): if len(bboxes) == 0 and frame_idx in new_tracking_cached: del new_tracking_cached[frame_idx] else: motions = gen_motions(prev_frame, frame, bboxes, draw_equi, True) if motions is not None: new_tracking_cached[frame_idx] = motions cv2.imshow("Tracking", frame_resized) k = cv2.waitKey(0) & 0xff print(k) if k == ord('q'): print('y to exit') k = cv2.waitKey(0) & 0xff if k == ord('y'): break elif k == ord('e'): draw_equi = not draw_equi elif k == 191: # F2 print(f'Saving... {outputname}') save_trf(outputname, picklename, trfframes, bbox_data, new_tracking_cached) print('saved') is_saved = True elif k == ord('d'): if len(dups) > 0: dups_pos += 1 if dups_pos >= len(dups): dups_pos = 0 go_to_frame(dups[dups_pos]) elif k == ord('a'): cv2.destroyAllWindows() bboxes.append({ 'bb': np.array(cv2.selectROI(frame_resized, False)) * scale_factor }) cv2.destroyAllWindows() update_bbox_data() elif k == ord('z'): # remove any data, rely on predicted new_tracking_cached[frame_idx] = { 'pts0': [], 'pts1': [], 'trans': None } is_saved = False elif k == ord('x') or k == ord('f'): # track use_affine = k==ord('x') if len(bboxes) == 0 and frame_idx in new_tracking_cached: del new_tracking_cached[frame_idx] else: motions = gen_motions(prev_frame, frame, bboxes, draw_equi, use_affine=use_affine) if motions is not None: new_tracking_cached[frame_idx] = motions is_saved = False elif k == ord('c'): # remove all boxes bboxes = [] update_bbox_data() elif k == ord('r'): # remove box if len(bboxes) > 0: bboxes = bboxes[0:-1] update_bbox_data() elif k == ord('b'): bboxes = create_std_bbox() update_bbox_data() do_tracking() elif k == ord('n') or k == ord('m'): nn = frame_idx + 1 for _ in range(len(trfframes)): if nn >= len(trfframes): nn = 1 print('skipped to the end') numlm = 0 if nn in new_tracking_cached and 'p0' in new_tracking_cached: numlm = len(new_tracking_cached['p0']) else: numlm = len(trfframes[nn - 1].local_motions) is_box = nn in bbox_data and len(bbox_data[nn]) > 0 if numlm < 100 or (is_box and k == ord('m')): go_to_frame(nn) break nn += 1 # elif k in [ord('0')]: # cur_bad_run_idx = min(len(badruns) - 1, cur_bad_run_idx + 1) # go_to_frame(badruns[cur_bad_run_idx][0]) # elif k in [ord('9')]: # cur_bad_run_idx = max(0, cur_bad_run_idx - 1) # go_to_frame(badruns[cur_bad_run_idx][0]) elif k in [ord('p'), ord('t'), ord('u')]: prev_frame = frame ok, frame = video.read() save_bbox = k == ord('t') if k == ord('u') and frame_idx in new_tracking_cached: dx, dy = trans2xy(new_tracking_cached[frame_idx]['trans']) else: dx = 0 dy = 0 if save_bbox: new_bboxes = [] for bbox in bboxes: new_bbox = bbox.copy() new_bbox['bb'] = bbox['bb'].copy() new_bbox['bb'][0] += dx new_bbox['bb'][1] += dy new_bboxes.append(new_bbox) if not ok: print('Cannot read video file. Reached the end') frame = prev_frame.copy() else: frame_idx += 1 change_frame(frame_idx - 1, frame_idx, save_bboxes = save_bbox) elif k == ord('o'): video.set(cv2.CAP_PROP_POS_FRAMES, frame_idx - 3) ok, prev_frame = video.read() if not ok: print('Cannot read video file') sys.exit() ok, frame = video.read() if not ok: print('Cannot read video file') sys.exit() frame_idx -= 1 change_frame(frame_idx + 1, frame_idx) elif k == ord('g'): try: # cv2.imshow("Go to", frame_resized) inp_str = '' while True: k = cv2.waitKey(0) & 0xff if k in map(ord, '0123456789'): inp_str += str(k - ord('0')) else: break print(inp_str) if k != ord('\n') and k != ord('\r'): continue try: frame_to_go = int(inp_str) except: continue go_to_frame(frame_to_go) except: print('not a number') elif k == ord('h'): try: start_range = int(input('Enter start range to replace keypoints: ')) end_range = input('Enter end range to replace keypoints: ') if end_range == 'all': end_range = len(trfframes) - 1 else: end_range = int(start_range) do_only_with_bboxes = str(input('Do only non-empty with bboxes (y): ')) == 'y' except: print('incorrect imput') continue for frm_idx in range(start_range, end_range + 1): nn = frm_idx + 1 if nn >= len(trfframes): nn = 1 print('skipped to the end') numlm = 0 if nn in new_tracking_cached and 'p0' in new_tracking_cached: numlm = len(new_tracking_cached['p0']) else: numlm = len(trfframes[nn - 1].local_motions) if not do_only_with_bboxes: print(f'doing frame {frm_idx}') go_to_frame(frm_idx) bboxes = create_std_bbox() update_bbox_data() do_tracking() elif len(bbox_data.get(frm_idx, [])) > 0 and numlm > 0: print(f'doing frame {frm_idx}') go_to_frame(frm_idx) do_tracking() def heal_trf_interactively_std(path): heal_trf_interactively(path, [f'{path}.trf'], f'{path}.healed.trf.pickle', f'{path}.trf', badruns=[]) heal_trf_interactively_std('/home/vlad/Загрузки/DSC_6300 Samodelkina warmup 1.mp4')