#############################################################
#
# FILE: CircAqNumPyArr.py
# DATE: 04/28/2022
# COMPANY: BitFlow, Inc.
# DESCRIPTION: Example illustrating the capture of image data
# into a Numpy array for further processing in
# Python. The image is being displayed using an
# OpenCV imshow window.
#
#############################################################
import platform
import sys
if(platform.system() == 'Windows'):
import msvcrt
if (sys.version_info.major >= 3 and sys.version_info.minor >= 8):
import os
#Following lines specifying, the location of DLLs, are required for Python Versions 3.8 and greater
os.add_dll_directory("C:\BitFlow SDK 6.5\Bin64")
os.add_dll_directory("C:\Program Files\CameraLink\Serial")
import BFModule.BufferAcquisition as Buf
import numpy, cv2
import threading
import time
from datetime import datetime
def AcqThread(ThreadName, CirAq, BufArray):
global keepUpdating
while(keepUpdating):
try:
CurBuf = CirAq.WaitForFrame(1000)
#converting to 16 bits, as OpenCV imshow does not support uint32 images
img = BufArray[CurBuf.BufferNumber].astype(numpy.uint16)
cv2.imshow("Image Window", img)
cv2.waitKey(1)
except Exception as e:
print(e)
pass
# The main function
def main():
global keepUpdating
print('Circular Acquisition Example')
print('---------------------------------')
CirAq = Buf.clsCircularAcquisition(Buf.ErrorMode.ErIgnore)
#CirAq = Buf.clsCircularAcquisition()
#Call Open board function by showing the Board select dialog
CirAq.Open()
#Get board info parameters to create and open a display surface
imageBitsPerPix = CirAq.GetBoardInfo(Buf.InquireParams.BitsPerPix)
#Get the number of buffers to allocate
while 1:
try:
numBuffers = int(input('\nEnter number of buffers to allocate: '))
except ValueError:
print('Invalid entry. Try again.')
else:
break
#Allocate the requested number of buffers
BufArray = CirAq.BufferSetup(numBuffers)
#Setup acquisition using the default options
CirAq.AqSetup(Buf.SetupOptions.setupDefault)
t1 = threading.Thread(target=AcqThread, args=('AcqThread', CirAq, BufArray))
keepUpdating = True
t1.start()
print("\nPress \'G\' to start Acquisition")
print("Press \'S\' to Stop Acquisition")
if(CirAq.GetTriggerMode() != Buf.TriggerModes.FreeRun):
print("Press \'T\' to issue a SW Trigger")
print("Press \'R\' to read a register value")
print("Press \'W\' to write to a register")
print("Press \'A\' to get Acquisition Status")
print("\nPress \'X\' to exit test\n")
while keepUpdating:
ch = input() #input('\nEnter command: ')
if(ch.upper()=='X'):
if(CirAq.GetAcqStatus().Start):
CirAq.AqControl(Buf.AcqCommands.Stop, Buf.AcqControlOptions.Wait)
print('Exit Program.')
keepUpdating = False
break
elif (ch.upper() == 'G'):
print('Aquisition Started.')
#Start acquisition
CirAq.AqControl(Buf.AcqCommands.Start, Buf.AcqControlOptions.Async)
elif(ch.upper() == 'S'):
#Stop acquisition
CirAq.AqControl(Buf.AcqCommands.Stop, Buf.AcqControlOptions.Async)
print('Aquisition Stopped.')
CirAq.GetCaptureStatus()
elif(ch.upper() == 'T'):
#Issue trigger
CirAq.IssueSoftwareTrigger()
print('Trigger Sent.')
elif(ch.upper() == 'R'):
print('Reading register value . . .')
regName = input('Enter Register Name:')
try:
regID = CirAq.RegId(regName)
except Exception as e:
print(e)
else:
print('Reg ', regName, ' value = ', CirAq.RegPeek(regID))
elif(ch.upper() == 'W'):
print('Writing to a register . . .')
regName = input('Enter Register Name:')
try:
regID = CirAq.RegId(regName)
except Exception as e:
print(e)
else:
regVal = int(input("Enter value to write: "))
CirAq.RegPoke(regID, regVal)
elif(ch.upper() == 'A'):
print('Acquisition Status:')
AqStat = CirAq.GetAcqStatus()
print(' -Start:\t', AqStat.Start)
print(' -Stop:\t', AqStat.Stop)
print(' -Abort:\t', AqStat.Abort)
print(' -Pause:\t', AqStat.Pause)
print(' -Cleanup:\t', AqStat.Cleanup)
print(CirAq.GetCaptureStatus())
keepUpdating = False
t1.join()
# destroy the OpenCV image window
cv2.destroyAllWindows()
#Clean up acquisition
CirAq.AqCleanup()
#Free allocated resources
CirAq.BufferCleanup()
#Close the board
CirAq.Close()
if __name__ == "__main__":
main()