-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsimulator.py
78 lines (63 loc) · 2.7 KB
/
simulator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import cv2
import numpy as np
import requests
from logger import Logger
class Simulator:
def __init__(self, width=None, height=None, video_source=0, bg_path=None, vcam_source='/dev/video1',
stream_to_vcam=True):
self.logger = Logger.logger
self.stream_to_vcam = stream_to_vcam
self.videocap = cv2.VideoCapture(video_source)
if not width or not height:
width, height = int(self.videocap.get(cv2.CAP_PROP_FRAME_WIDTH)), \
int(self.videocap.get(cv2.CAP_PROP_FRAME_HEIGHT))
self.logger.info('Create a virtualwebcam with a solution of {}x{}'.format(width, height))
# Configure Videocapture
self.videocap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
self.videocap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
# Configure background from passed bg_path
background = cv2.imread(bg_path)
self.background = cv2.resize(background, (width, height))
if stream_to_vcam:
from pyfakewebcam import pyfakewebcam
self.vcam = pyfakewebcam.FakeWebcam(vcam_source, width, height)
def get_mask(self, frame, bodypix_url='http://localhost:9000'):
_, data = cv2.imencode(".jpg", frame)
r = requests.post(
url=bodypix_url,
data=data.tobytes(),
headers={'Content-Type': 'application/octet-stream'})
mask = np.frombuffer(r.content, dtype=np.uint8)
mask = mask.reshape((frame.shape[0], frame.shape[1]))
return mask
def simulate(self, show_stream=False):
self.logger.info("Create a virtual webcam.")
while True:
ret, frame = self.videocap.read()
mask = self.get_mask(frame)
# Merge original frame and its mask.
inv_mask = 1 - mask
merged = np.copy(frame)
for c in range(frame.shape[2]):
merged[:, :, c] = merged[:, :, c] * mask + self.background[:, :, c] * inv_mask
if show_stream:
cv2.imshow('Original', frame)
cv2.imshow('Mask', mask * 255)
cv2.imshow('Merged', merged)
# Write image to virtual cam input.
if self.stream_to_vcam:
merged = cv2.cvtColor(merged, cv2.COLOR_BGR2RGB)
self.vcam.schedule_frame(merged)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# When everything done, release the capture
self.videocap.release()
cv2.destroyAllWindows()
if __name__ == '__main__':
simulator = Simulator(
width=640,
height=480,
bg_path='assets/background.jpeg',
stream_to_vcam=False
)
simulator.simulate(show_stream=True)