# -*- coding: utf-8 -*- """ Tests the speed of image updates for an ImageItem and RawImageWidget. The speed will generally depend on the type of data being shown, whether it is being scaled and/or converted by lookup table, and whether OpenGL is used by the view widget """ import argparse import sys import numpy as np import pyqtgraph as pg import pyqtgraph.ptime as ptime from pyqtgraph.Qt import QtGui, QtCore, QT_LIB import importlib ui_template = importlib.import_module(f'VideoTemplate_{QT_LIB.lower()}') try: import cupy as cp pg.setConfigOption("useCupy", True) _has_cupy = True except ImportError: cp = None _has_cupy = False parser = argparse.ArgumentParser(description="Benchmark for testing video performance") parser.add_argument('--cuda', default=False, action='store_true', help="Use CUDA to process on the GPU", dest="cuda") parser.add_argument('--dtype', default='uint8', choices=['uint8', 'uint16', 'float'], help="Image dtype (uint8, uint16, or float)") parser.add_argument('--frames', default=3, type=int, help="Number of image frames to generate (default=3)") parser.add_argument('--image-mode', default='mono', choices=['mono', 'rgb'], help="Image data mode (mono or rgb)", dest='image_mode') parser.add_argument('--levels', default=None, type=lambda s: tuple([float(x) for x in s.split(',')]), help="min,max levels to scale monochromatic image dynamic range, or rmin,rmax,gmin,gmax,bmin,bmax to scale rgb") parser.add_argument('--lut', default=False, action='store_true', help="Use color lookup table") parser.add_argument('--lut-alpha', default=False, action='store_true', help="Use alpha color lookup table", dest='lut_alpha') parser.add_argument('--size', default='512x512', type=lambda s: tuple([int(x) for x in s.split('x')]), help="WxH image dimensions default='512x512'") args = parser.parse_args(sys.argv[1:]) #QtGui.QApplication.setGraphicsSystem('raster') app = pg.mkQApp("Video Speed Test Example") win = QtGui.QMainWindow() win.setWindowTitle('pyqtgraph example: VideoSpeedTest') ui = ui_template.Ui_MainWindow() ui.setupUi(win) win.show() try: from pyqtgraph.widgets.RawImageWidget import RawImageGLWidget except ImportError: RawImageGLWidget = None ui.rawGLRadio.setEnabled(False) ui.rawGLRadio.setText(ui.rawGLRadio.text() + " (OpenGL not available)") else: ui.rawGLImg = RawImageGLWidget() ui.stack.addWidget(ui.rawGLImg) # read in CLI args ui.cudaCheck.setChecked(args.cuda and _has_cupy) ui.cudaCheck.setEnabled(_has_cupy) ui.framesSpin.setValue(args.frames) ui.widthSpin.setValue(args.size[0]) ui.heightSpin.setValue(args.size[1]) ui.dtypeCombo.setCurrentText(args.dtype) ui.rgbCheck.setChecked(args.image_mode=='rgb') ui.maxSpin1.setOpts(value=255, step=1) ui.minSpin1.setOpts(value=0, step=1) levelSpins = [ui.minSpin1, ui.maxSpin1, ui.minSpin2, ui.maxSpin2, ui.minSpin3, ui.maxSpin3] if args.cuda and _has_cupy: xp = cp else: xp = np if args.levels is None: ui.scaleCheck.setChecked(False) ui.rgbLevelsCheck.setChecked(False) else: ui.scaleCheck.setChecked(True) if len(args.levels) == 2: ui.rgbLevelsCheck.setChecked(False) ui.minSpin1.setValue(args.levels[0]) ui.maxSpin1.setValue(args.levels[1]) elif len(args.levels) == 6: ui.rgbLevelsCheck.setChecked(True) for spin,val in zip(levelSpins, args.levels): spin.setValue(val) else: raise ValueError("levels argument must be 2 or 6 comma-separated values (got %r)" % (args.levels,)) ui.lutCheck.setChecked(args.lut) ui.alphaCheck.setChecked(args.lut_alpha) #ui.graphicsView.useOpenGL() ## buggy, but you can try it if you need extra speed. vb = pg.ViewBox() ui.graphicsView.setCentralItem(vb) vb.setAspectLocked() img = pg.ImageItem() vb.addItem(img) LUT = None def updateLUT(): global LUT, ui dtype = ui.dtypeCombo.currentText() if dtype == 'uint8': n = 256 else: n = 4096 LUT = ui.gradient.getLookupTable(n, alpha=ui.alphaCheck.isChecked()) if _has_cupy and xp == cp: LUT = cp.asarray(LUT) ui.gradient.sigGradientChanged.connect(updateLUT) updateLUT() ui.alphaCheck.toggled.connect(updateLUT) def updateScale(): global ui, levelSpins if ui.rgbLevelsCheck.isChecked(): for s in levelSpins[2:]: s.setEnabled(True) else: for s in levelSpins[2:]: s.setEnabled(False) updateScale() ui.rgbLevelsCheck.toggled.connect(updateScale) cache = {} def mkData(): with pg.BusyCursor(): global data, cache, ui, xp frames = ui.framesSpin.value() width = ui.widthSpin.value() height = ui.heightSpin.value() cacheKey = (ui.dtypeCombo.currentText(), ui.rgbCheck.isChecked(), frames, width, height) if cacheKey not in cache: if cacheKey[0] == 'uint8': dt = xp.uint8 loc = 128 scale = 64 mx = 255 elif cacheKey[0] == 'uint16': dt = xp.uint16 loc = 4096 scale = 1024 mx = 2**16 elif cacheKey[0] == 'float': dt = xp.float32 loc = 1.0 scale = 0.1 mx = 1.0 else: raise ValueError(f"unable to handle dtype: {cacheKey[0]}") if ui.rgbCheck.isChecked(): data = xp.random.normal(size=(frames,width,height,3), loc=loc, scale=scale) data = pg.gaussianFilter(data, (0, 6, 6, 0)) else: data = xp.random.normal(size=(frames,width,height), loc=loc, scale=scale) data = pg.gaussianFilter(data, (0, 6, 6)) if cacheKey[0] != 'float': data = xp.clip(data, 0, mx) data = data.astype(dt) data[:, 10, 10:50] = mx data[:, 9:12, 48] = mx data[:, 8:13, 47] = mx cache = {cacheKey: data} # clear to save memory (but keep one to prevent unnecessary regeneration) data = cache[cacheKey] updateLUT() updateSize() def updateSize(): global ui, vb frames = ui.framesSpin.value() width = ui.widthSpin.value() height = ui.heightSpin.value() dtype = xp.dtype(str(ui.dtypeCombo.currentText())) rgb = 3 if ui.rgbCheck.isChecked() else 1 ui.sizeLabel.setText('%d MB' % (frames * width * height * rgb * dtype.itemsize / 1e6)) vb.setRange(QtCore.QRectF(0, 0, width, height)) def noticeCudaCheck(): global xp, cache cache = {} if ui.cudaCheck.isChecked(): if _has_cupy: xp = cp else: xp = np ui.cudaCheck.setChecked(False) else: xp = np mkData() mkData() ui.dtypeCombo.currentIndexChanged.connect(mkData) ui.rgbCheck.toggled.connect(mkData) ui.widthSpin.editingFinished.connect(mkData) ui.heightSpin.editingFinished.connect(mkData) ui.framesSpin.editingFinished.connect(mkData) ui.widthSpin.valueChanged.connect(updateSize) ui.heightSpin.valueChanged.connect(updateSize) ui.framesSpin.valueChanged.connect(updateSize) ui.cudaCheck.toggled.connect(noticeCudaCheck) ptr = 0 lastTime = ptime.time() fps = None def update(): global ui, ptr, lastTime, fps, LUT, img if ui.lutCheck.isChecked(): useLut = LUT else: useLut = None downsample = ui.downsampleCheck.isChecked() if ui.scaleCheck.isChecked(): if ui.rgbLevelsCheck.isChecked(): useScale = [ [ui.minSpin1.value(), ui.maxSpin1.value()], [ui.minSpin2.value(), ui.maxSpin2.value()], [ui.minSpin3.value(), ui.maxSpin3.value()]] else: useScale = [ui.minSpin1.value(), ui.maxSpin1.value()] else: useScale = None if ui.rawRadio.isChecked(): ui.rawImg.setImage(data[ptr%data.shape[0]], lut=useLut, levels=useScale) ui.stack.setCurrentIndex(1) elif ui.rawGLRadio.isChecked(): ui.rawGLImg.setImage(data[ptr%data.shape[0]], lut=useLut, levels=useScale) ui.stack.setCurrentIndex(2) else: img.setImage(data[ptr%data.shape[0]], autoLevels=False, levels=useScale, lut=useLut, autoDownsample=downsample) ui.stack.setCurrentIndex(0) #img.setImage(data[ptr%data.shape[0]], autoRange=False) ptr += 1 now = ptime.time() dt = now - lastTime lastTime = now if fps is None: fps = 1.0/dt else: s = np.clip(dt*3., 0, 1) fps = fps * (1-s) + (1.0/dt) * s ui.fpsLabel.setText('%0.2f fps' % fps) app.processEvents() ## force complete redraw for every plot timer = QtCore.QTimer() timer.timeout.connect(update) timer.start(0) ## Start Qt event loop unless running in interactive mode or using pyside. if __name__ == '__main__': import sys if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'): QtGui.QApplication.instance().exec_()