21. Concurrency Processing
Concurrency Processing
Prerequisites
1
python
1. Concurrency Processing
In image processing or real-vision systems, heavy processing can easily block the camera input pipline. In non-concurreny example, frame cpature, image processing and display are executed sequentially in a single thread. But in the concurrency version, the camera capture and image processing are separated into different threads. The capture thread continuously reads the latest frame from the webcam and pushes it into a queue, while the processing thread independently performs the heavy operation.
This design is commonly used in real-time computer vision systems because maintaining low latency is usually more important than processing every single frame.
And this processing can be extented by save logs when processing failed or save images we want to find without blocking important main processing.
2. Concurrency Processing Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import cv2 as cv
import threading
import queue
import time
raw_queue = queue.Queue(maxsize=1)
processed_queue = queue.Queue(maxsize=1)
stop_event = threading.Event()
latest_frame = None
latest_processed = None
frame_lock = threading.Lock()
def put_latest(q, item):
if q.full():
try:
q.get_nowait()
except queue.Empty:
pass
try:
q.put_nowait(item)
except queue.Full:
pass
def capture_thread():
global latest_frame
cap = cv.VideoCapture(0)
if not cap.isOpened():
print("Cannot open camera")
stop_event.set()
return
while not stop_event.is_set():
ret, frame = cap.read()
if not ret:
continue
with frame_lock:
latest_frame = frame.copy()
put_latest(raw_queue, frame)
cap.release()
def processing_thread():
global latest_processed
while not stop_event.is_set():
try:
frame = raw_queue.get(timeout=0.1)
except queue.Empty:
continue
processed = frame.copy()
for _ in range(500):
processed = cv.GaussianBlur(processed, (11, 11), 0)
with frame_lock:
latest_processed = processed.copy()
def main():
t1 = threading.Thread(target=capture_thread)
t2 = threading.Thread(target=processing_thread)
t1.start()
t2.start()
while not stop_event.is_set():
with frame_lock:
frame = None if latest_frame is None else latest_frame.copy()
processed = None if latest_processed is None else latest_processed.copy()
if frame is not None:
cv.imshow("Original - Live", frame)
if processed is not None:
cv.imshow("Processed - Slow", processed)
if cv.waitKey(1) & 0xFF == ord("q"):
stop_event.set()
break
t1.join()
t2.join()
cv.destroyAllWindows()
if __name__ == "__main__":
main()
Result: Concureency processing
3. None Concurrency Processing Code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import cv2 as cv
def main():
cap = cv.VideoCapture(0)
if not cap.isOpened():
print("Cannot open camera")
return
while True:
ret, frame = cap.read()
if not ret:
print("Failed to read frame")
break
processed = frame.copy()
for _ in range(500):
processed = cv.GaussianBlur(processed,(11, 11),0)
cv.imshow("Original - Live", frame)
cv.imshow("Processed", processed)
if cv.waitKey(1) & 0xFF == ord("q"):
break
cap.release()
cv.destroyAllWindows()
if __name__ == "__main__":
main()

