Post

18. Optical Flow

18. Optical Flow

Optical Flow


Prerequisites

1
python

1. Optical Flow

Optical Flow is a computer vision technique used to estimate the motion of objects, surfaces, or pixels between consecutive image frames.

There are two main types of Optical Flow: sparse and dense. Sparse Optical Flow tracks only selected feature points such as corners or textured regions, making it faster and suitable for tracking applications. A common example is the Lucas-Kanade method. Dense Optical Flow, such as the Farneback algorithm, computes motion for every pixel in the image, providing a complete motion field but requiring more computation.

2. Optical Flow Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
if __name__ == "__main__":
    cap = cv.VideoCapture(0)

    if not cap.isOpened():
        return

    MIN_MOTION = 2.0
    MAX_MOTION = 100.0

    ret, old_frame = cap.read()

    if not ret:
        return

    old_gray = cv.cvtColor(old_frame, cv.COLOR_BGR2GRAY)
    feature_params = dict(maxCorners=80, qualityLevel=0.03, minDistance=20, blockSize=7, useHarrisDetector=True, k=0.04)
    p0 = cv.goodFeaturesToTrack(old_gray, mask=None, **feature_params)
    frame_count = 0

    viewer = view.MultiImageViewer([old_frame], True)

    while True:
        ret, frame = cap.read()

        if not ret:
            break

        frame_gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
        frame_count += 1
        display = frame.copy()
        next_points = []

        if p0 is not None and len(p0) > 0:
            lk_params = dict(winSize=(21, 21), maxLevel=3, criteria=(cv.TERM_CRITERIA_EPS | cv.TERM_CRITERIA_COUNT, 20, 0.03))
            p1, status, err = cv.calcOpticalFlowPyrLK(old_gray, frame_gray, p0, None, **lk_params)

            if p1 is not None and status is not None:
                good_new = p1[status == 1]
                good_old = p0[status == 1]

                for new, old in zip(good_new, good_old):
                    x_new, y_new = new.ravel()
                    x_old, y_old = old.ravel()

                    dx = x_new - x_old
                    dy = y_new - y_old
                    dist = np.sqrt(dx * dx + dy * dy)

                    # Remove static background points
                    if dist < MIN_MOTION:
                        continue

                    # Remove unstable/wrong tracking points
                    if dist > MAX_MOTION:
                        continue

                    cv.arrowedLine(display,(int(x_old), int(y_old)),(int(x_new), int(y_new)),(0, 255, 0),2,tipLength=0.4)
                    cv.circle(display,(int(x_new), int(y_new)),4,(0, 0, 255),-1)
                    next_points.append(new)

        if len(next_points) > 0:
            p0 = np.array(next_points, dtype=np.float32).reshape(-1, 1, 2)
        else:
            p0 = None

        # Re-detect points periodically or when points are too few
        if frame_count % 10 == 0 or p0 is None or len(p0) < 20:
            detect_mask = np.ones_like(frame_gray, dtype=np.uint8) * 255

            if p0 is not None:
                for point in p0:
                    x, y = point.ravel()
                    cv.circle(detect_mask,(int(x), int(y)),20,0,-1)

            new_points = cv.goodFeaturesToTrack(frame_gray,mask=detect_mask,**feature_params)

            if new_points is not None:
                if p0 is None:
                    p0 = new_points
                else:
                    p0 = np.vstack([p0, new_points])

        cv.putText(display,f"Tracked moving points: {0 if p0 is None else len(p0)}",(20, 30),cv.FONT_HERSHEY_SIMPLEX,0.75,(0, 255, 0),2)
        viewer.update_images(display)
        viewer.draw()

        old_gray = frame_gray.copy()

        key = cv.waitKey(1) & 0xFF

        if key == ord("q"):
            break

        if key == ord("r"):
            p0 = cv.goodFeaturesToTrack(frame_gray,mask=None,**feature_params)
            old_gray = frame_gray.copy()

    cap.release()
    cv.destroyAllWindows()
This post is licensed under CC BY 4.0 by the author.