Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Write video to file #154

Open
Naomipython opened this issue Mar 29, 2023 · 2 comments
Open

Write video to file #154

Naomipython opened this issue Mar 29, 2023 · 2 comments

Comments

@Naomipython
Copy link

Hello!

I am trying to write frames to a file, but it does not work. There are no error messages, but the file I write the frames to does not work. We are using the USB camera (1800 U-240c).

I used the Asynchronous Grab with OpenCV Example to start with, and modified it to write the file with open cv. When I run the script, it does show the current frames (although it is slow) but when I click on the file that was made (open with standard windows media player or similar), the media player gives an error message.

I am just a beginner with open cv and vimba, so I hope someone here knows more.

import threading
import numpy as np
import sys
import cv2
from typing import Optional
from vimba import *
from datetime import datetime
import time
from cv2 import VideoWriter
from cv2 import VideoWriter_fourcc
import pandas as pd


def print_preamble():
    print('///////////////////////////////////////////////////////')
    print('/// Vimba API Asynchronous Grab with OpenCV Example ///')
    print('///////////////////////////////////////////////////////\n')


def print_usage():
    print('Usage:')
    print('    python asynchronous_grab_opencv.py [camera_id]')
    print('    python asynchronous_grab_opencv.py [/h] [-h]')
    print()
    print('Parameters:')
    print('    camera_id   ID of the camera to use (using first camera if not specified)')
    print()


def abort(reason: str, return_code: int = 1, usage: bool = False):
    print(reason + '\n')

    if usage:
        print_usage()

    sys.exit(return_code)


def parse_args() -> Optional[str]:
    args = sys.argv[1:]
    argc = len(args)

    for arg in args:
        if arg in ('/h', '-h'):
            print_usage()
            sys.exit(0)

    if argc > 1:
        abort(reason="Invalid number of arguments. Abort.", return_code=2, usage=True)

    return None if argc == 0 else args[0]


def get_camera(camera_id: Optional[str]) -> Camera:
    with Vimba.get_instance() as vimba:
        if camera_id:
            try:
                return vimba.get_camera_by_id(camera_id)

            except VimbaCameraError:
                abort('Failed to access Camera \'{}\'. Abort.'.format(camera_id))

        else:
            cams = vimba.get_all_cameras()
            if not cams:
                abort('No Cameras accessible. Abort.')

            return cams[0]


def setup_camera(cam: Camera):
    with cam:
        # Enable auto exposure time setting if camera supports it
        try:
            cam.ExposureAuto.set('Continuous')

        except (AttributeError, VimbaFeatureError):
            pass

        # Enable white balancing if camera supports it
        try:
            cam.BalanceWhiteAuto.set('Continuous')

        except (AttributeError, VimbaFeatureError):
            pass

        # Try to adjust GeV packet size. This Feature is only available for GigE - Cameras.
        try:
            cam.GVSPAdjustPacketSize.run()

            while not cam.GVSPAdjustPacketSize.is_done():
                pass

        except (AttributeError, VimbaFeatureError):
            pass

        # Query available, open_cv compatible pixel formats
        # prefer color formats over monochrome formats
        cv_fmts = intersect_pixel_formats(cam.get_pixel_formats(), OPENCV_PIXEL_FORMATS)
        color_fmts = intersect_pixel_formats(cv_fmts, COLOR_PIXEL_FORMATS)

        if color_fmts:
            cam.set_pixel_format(color_fmts[0])

        else:
            mono_fmts = intersect_pixel_formats(cv_fmts, MONO_PIXEL_FORMATS)

            if mono_fmts:
                cam.set_pixel_format(mono_fmts[0])
                print(cam.set_pixel_format(mono_fmts[0]))

            else:
                abort('Camera does not support a OpenCV compatible format natively. Abort.')


def main():
    print_preamble()

    with Vimba.get_instance() as vimba:
        cams = vimba.get_all_cameras()
        with cams[0] as cam:
            setup_camera(cam)
            video = VideoWriter('newvid.avi', VideoWriter_fourcc(*'MJPG'), 25.0, (1216, 1936))

            while True:
                frame = cam.get_frame()
                frame.convert_pixel_format(PixelFormat.Bgra8)
                frame = frame.as_opencv_image()

                cv2.imshow('show', frame)

                video.write(frame)
                # check if frame size is correct
                # newframe = np.reshape(np.array(frame), (frame.shape[0], frame.shape[1]))
                # DF = pd.DataFrame(newframe)
                # DF.to_csv('data1.csv')

                if cv2.waitKey(1) & 0xFF == ord('d'):
                    break

            cam.stop_streaming()
            cv2.destroyAllWindows()
            video.release()


if __name__ == '__main__':
    main()
@arunprakash-avt
Copy link

With respect tot the above issue I would request you to refer tot the below example code:

import sys
import threading
from typing import Optional
from vimba import *
import cv2
import queue

frame_queue = queue.Queue()

writer = None

def print_preamble():
print('///////////////////////////////////////////')
print('/// Vimba API Asynchronous Grab Example ///')
print('///////////////////////////////////////////\n')

def print_usage():
print('Usage:')
print(' python asynchronous_grab.py [camera_id]')
print(' python asynchronous_grab.py [/h] [-h]')
print()
print('Parameters:')
print(' camera_id ID of the camera to use (using first camera if not specified)')
print()

def abort(reason: str, return_code: int = 1, usage: bool = False):
print(reason + '\n')

if usage:
    print_usage()

sys.exit(return_code)

def parse_args() -> Optional[str]:
args = sys.argv[1:]
argc = len(args)

for arg in args:
    if arg in ('/h', '-h'):
        print_usage()
        sys.exit(0)

if argc > 1:
    abort(reason="Invalid number of arguments. Abort.", return_code=2, usage=True)

return None if argc == 0 else args[0]

def get_camera(camera_id: Optional[str]) -> Camera:
with Vimba.get_instance() as vimba:
if camera_id:
try:
return vimba.get_camera_by_id(camera_id)

        except VimbaCameraError:
            abort('Failed to access Camera \'{}\'. Abort.'.format(camera_id))

    else:
        cams = vimba.get_all_cameras()
        if not cams:
            abort('No Cameras accessible. Abort.')

        return cams[0]

def setup_camera(cam: Camera):
with cam:
# Try to adjust GeV packet size. This Feature is only available for GigE - Cameras.
try:
cam.GVSPAdjustPacketSize.run()

        while not cam.GVSPAdjustPacketSize.is_done():
            pass

    except (AttributeError, VimbaFeatureError):
        pass
    # Set acquistion frame rate to 25 fps
    try:
        cam.TriggerSource.Set("FixedRate")
        cam.AcquisitionFrameRateAbs.Set(25)

    except (AttributeError, VimbaFeatureError):
        pass

def frame_handler(cam: Camera, frame: Frame):
print('{} acquired {}'.format(cam, frame), flush=True)

global frame_queue
frame_queue.put(frame.as_opencv_image())
cv_image = frame.as_opencv_image()
cv2.imshow("Display", cv_image)
cv2.waitKey(1)
cam.queue_frame(frame)

def write_to_file():
global frame_queue
global writer

while True:
    img = frame_queue.get()
    print("took image from queue")
    if writer is None:
        height, width = img.shape[0:2]
        size = (width, height)
        print(size)
        fourcc = cv2.VideoWriter_fourcc(*'MJPG')
        writer = cv2.VideoWriter('filename2.avi',
                                 fourcc,
                                 25,
                                 size)
    
    writer.write(cv2.cvtColor(img, cv2.COLOR_GRAY2BGR))
    frame_queue.task_done()

def main():
print_preamble()
cam_id = parse_args()

with Vimba.get_instance():
    with get_camera(cam_id) as cam:

        setup_camera(cam)
        print('Press <enter> to stop Frame acquisition.')

        try:
            # Start Streaming with a custom a buffer of 10 Frames (defaults to 5)
            threading.Thread(target=write_to_file, daemon=True).start()
            cam.start_streaming(handler=frame_handler, buffer_count=10)
            input()

        finally:
            cam.stop_streaming()
            frame_queue.join()
            writer.release()

if name == 'main':
main()

@aakash-sarin
Copy link

Hi,
Im hitting enter after the required number of frames have been captured but it returns an AttributeError: 'numpy.ndarray' object has no attribute 'join'.
How do i resolve this?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants