This repository has been archived on 2020-08-22. You can view files and clone it, but cannot push or open issues or pull requests.
pyaudviz/src/core.py

543 lines
18 KiB
Python

import sys
import os
from PyQt5 import QtCore, QtGui, uic
from os.path import expanduser
import subprocess as sp
import numpy
from PIL import Image
from shutil import rmtree
import time
from collections import OrderedDict
import json
from importlib import import_module
from PyQt5.QtCore import QStandardPaths
import string
class Core():
def __init__(self):
self.FFMPEG_BIN = self.findFfmpeg()
self.dataDir = QStandardPaths.writableLocation(
QStandardPaths.AppConfigLocation
)
self.presetDir = os.path.join(self.dataDir, 'presets')
if getattr(sys, 'frozen', False):
# frozen
self.wd = os.path.dirname(sys.executable)
else:
# unfrozen
self.wd = os.path.dirname(os.path.realpath(__file__))
self.componentsPath = os.path.join(self.wd, 'components')
self.settings = QtCore.QSettings(
os.path.join(self.dataDir, 'settings.ini'),
QtCore.QSettings.IniFormat
)
self.loadEncoderOptions()
self.videoFormats = Core.appendUppercase([
'*.mp4',
'*.mov',
'*.mkv',
'*.avi',
'*.webm',
'*.flv',
])
self.audioFormats = Core.appendUppercase([
'*.mp3',
'*.wav',
'*.ogg',
'*.fla',
'*.flac',
'*.aac',
])
self.imageFormats = Core.appendUppercase([
'*.png',
'*.jpg',
'*.tif',
'*.tiff',
'*.gif',
'*.bmp',
'*.ico',
'*.xbm',
'*.xpm',
])
self.findComponents()
self.selectedComponents = []
# copies of named presets to detect modification
self.savedPresets = {}
def findComponents(self):
def findComponents():
for f in sorted(os.listdir(self.componentsPath)):
name, ext = os.path.splitext(f)
if name.startswith("__"):
continue
elif ext == '.py':
yield name
self.modules = [
import_module('components.%s' % name)
for name in findComponents()
]
# store canonical module names and indexes
self.moduleIndexes = [i for i in range(len(self.modules))]
self.compNames = [mod.Component.__doc__ for mod in self.modules]
self.altCompNames = []
# store alternative names for modules
for i, mod in enumerate(self.modules):
if hasattr(mod.Component, 'names'):
for name in mod.Component.names():
self.altCompNames.append((name, i))
def componentListChanged(self):
for i, component in enumerate(self.selectedComponents):
component.compPos = i
def insertComponent(self, compPos, moduleIndex, loader):
'''Creates a new component'''
if compPos < 0 or compPos > len(self.selectedComponents):
compPos = len(self.selectedComponents)
if len(self.selectedComponents) > 50:
return None
component = self.modules[moduleIndex].Component(
moduleIndex, compPos, self
)
self.selectedComponents.insert(
compPos,
component
)
self.componentListChanged()
# init component's widget for loading/saving presets
self.selectedComponents[compPos].widget(loader)
self.updateComponent(compPos)
if hasattr(loader, 'insertComponent'):
loader.insertComponent(compPos)
return compPos
def moveComponent(self, startI, endI):
comp = self.selectedComponents.pop(startI)
self.selectedComponents.insert(endI, comp)
self.componentListChanged()
return endI
def removeComponent(self, i):
self.selectedComponents.pop(i)
self.componentListChanged()
def clearComponents(self):
self.selectedComponents = list()
self.componentListChanged()
def updateComponent(self, i):
# print('updating %s' % self.selectedComponents[i])
self.selectedComponents[i].update()
def moduleIndexFor(self, compName):
try:
index = self.compNames.index(compName)
return self.moduleIndexes[index]
except ValueError:
for altName, modI in self.altCompNames:
if altName == compName:
return self.moduleIndexes[modI]
def clearPreset(self, compIndex):
self.selectedComponents[compIndex].currentPreset = None
def openPreset(self, filepath, compIndex, presetName):
'''Applies a preset to a specific component'''
saveValueStore = self.getPreset(filepath)
if not saveValueStore:
return False
try:
self.selectedComponents[compIndex].loadPreset(
saveValueStore,
presetName
)
except KeyError as e:
print('preset missing value: %s' % e)
self.savedPresets[presetName] = dict(saveValueStore)
return True
def getPresetDir(self, comp):
return os.path.join(
self.presetDir, str(comp), str(comp.version()))
def getPreset(self, filepath):
'''Returns the preset dict stored at this filepath'''
if not os.path.exists(filepath):
return False
with open(filepath, 'r') as f:
for line in f:
saveValueStore = Core.presetFromString(line.strip())
break
return saveValueStore
def openProject(self, loader, filepath):
''' loader is the object calling this method which must have
its own showMessage(**kwargs) method for displaying errors.
'''
if not os.path.exists(filepath):
loader.showMessage(msg='Project file not found.')
return
errcode, data = self.parseAvFile(filepath)
if errcode == 0:
try:
if hasattr(loader, 'window'):
for widget, value in data['WindowFields']:
widget = eval('loader.window.%s' % widget)
widget.blockSignals(True)
widget.setText(value)
widget.blockSignals(False)
for key, value in data['Settings']:
self.settings.setValue(key, value)
for tup in data['Components']:
name, vers, preset = tup
clearThis = False
modified = False
# add loaded named presets to savedPresets dict
if 'preset' in preset and preset['preset'] is not None:
nam = preset['preset']
filepath2 = os.path.join(
self.presetDir, name, str(vers), nam)
origSaveValueStore = self.getPreset(filepath2)
if origSaveValueStore:
self.savedPresets[nam] = dict(origSaveValueStore)
modified = not origSaveValueStore == preset
else:
# saved preset was renamed or deleted
clearThis = True
# create the actual component object & get its index
i = self.insertComponent(
-1,
self.moduleIndexFor(name),
loader)
if i is None:
loader.showMessage(msg="Too many components!")
break
try:
if 'preset' in preset and preset['preset'] is not None:
self.selectedComponents[i].loadPreset(
preset
)
else:
self.selectedComponents[i].loadPreset(
preset,
preset['preset']
)
except KeyError as e:
print('%s missing value: %s' % (
self.selectedComponents[i], e)
)
if clearThis:
self.clearPreset(i)
if hasattr(loader, 'updateComponentTitle'):
loader.updateComponentTitle(i, modified)
except:
errcode = 1
data = sys.exc_info()
if errcode == 1:
typ, value, tb = data
if typ.__name__ == 'KeyError':
# probably just an old version, still loadable
print('file missing value: %s' % value)
return
if hasattr(loader, 'createNewProject'):
loader.createNewProject(prompt=False)
import traceback
msg = '%s: %s\n\nTraceback:\n' % (typ.__name__, value)
msg += "\n".join(traceback.format_tb(tb))
loader.showMessage(
msg="Project file '%s' is corrupted." % filepath,
showCancel=False,
icon='Warning',
detail=msg)
def parseAvFile(self, filepath):
'''Parses an avp (project) or avl (preset package) file.
Returns dictionary with section names as the keys, each one
contains a list of tuples: (compName, version, compPresetDict)
'''
validSections = (
'Components',
'Settings',
'WindowFields'
)
data = {sect: [] for sect in validSections}
try:
with open(filepath, 'r') as f:
def parseLine(line):
'''Decides if a file line is a section header'''
line = line.strip()
newSection = ''
if line.startswith('[') and line.endswith(']') \
and line[1:-1] in validSections:
newSection = line[1:-1]
return line, newSection
section = ''
i = 0
for line in f:
line, newSection = parseLine(line)
if newSection:
section = str(newSection)
continue
if line and section == 'Components':
if i == 0:
lastCompName = str(line)
i += 1
elif i == 1:
lastCompVers = str(line)
i += 1
elif i == 2:
lastCompPreset = Core.presetFromString(line)
data[section].append((
lastCompName,
lastCompVers,
lastCompPreset
))
i = 0
elif line and section:
key, value = line.split('=', 1)
data[section].append((key, value.strip()))
return 0, data
except:
return 1, sys.exc_info()
def importPreset(self, filepath):
errcode, data = self.parseAvFile(filepath)
returnList = []
if errcode == 0:
name, vers, preset = data['Components'][0]
presetName = preset['preset'] \
if preset['preset'] else os.path.basename(filepath)[:-4]
newPath = os.path.join(
self.presetDir,
name,
vers,
presetName
)
if os.path.exists(newPath):
return False, newPath
preset['preset'] = presetName
self.createPresetFile(
name, vers, presetName, preset
)
return True, presetName
elif errcode == 1:
# TODO: an error message
return False, ''
def exportPreset(self, exportPath, compName, vers, origName):
internalPath = os.path.join(
self.presetDir, compName, str(vers), origName
)
if not os.path.exists(internalPath):
return
if os.path.exists(exportPath):
os.remove(exportPath)
with open(internalPath, 'r') as f:
internalData = [line for line in f]
try:
saveValueStore = Core.presetFromString(internalData[0].strip())
self.createPresetFile(
compName, vers,
origName, saveValueStore,
exportPath
)
return True
except:
return False
def createPresetFile(
self, compName, vers, presetName, saveValueStore, filepath=''):
'''Create a preset file (.avl) at filepath using args.
Or if filepath is empty, create an internal preset using args'''
if not filepath:
dirname = os.path.join(self.presetDir, compName, str(vers))
if not os.path.exists(dirname):
os.makedirs(dirname)
filepath = os.path.join(dirname, presetName)
internal = True
else:
if not filepath.endswith('.avl'):
filepath += '.avl'
internal = False
with open(filepath, 'w') as f:
if not internal:
f.write('[Components]\n')
f.write('%s\n' % compName)
f.write('%s\n' % str(vers))
f.write(Core.presetToString(saveValueStore))
def createProjectFile(self, filepath, window=None):
'''Create a project file (.avp) using the current program state'''
settingsKeys = [
'componentDir',
'inputDir',
'outputDir',
'presetDir',
'projectDir',
]
try:
if not filepath.endswith(".avp"):
filepath += '.avp'
if os.path.exists(filepath):
os.remove(filepath)
with open(filepath, 'w') as f:
print('creating %s' % filepath)
f.write('[Components]\n')
for comp in self.selectedComponents:
saveValueStore = comp.savePreset()
f.write('%s\n' % str(comp))
f.write('%s\n' % str(comp.version()))
f.write('%s\n' % Core.presetToString(saveValueStore))
f.write('\n[Settings]\n')
for key in self.settings.allKeys():
if key in settingsKeys:
f.write('%s=%s\n' % (key, self.settings.value(key)))
if window:
f.write('\n[WindowFields]\n')
f.write(
'lineEdit_audioFile=%s\n'
'lineEdit_outputFile=%s\n' % (
window.lineEdit_audioFile.text(),
window.lineEdit_outputFile.text()
)
)
return True
except:
return False
def loadEncoderOptions(self):
file_path = os.path.join(self.wd, 'encoder-options.json')
with open(file_path) as json_file:
self.encoder_options = json.load(json_file)
def findFfmpeg(self):
if sys.platform == "win32":
return "ffmpeg.exe"
else:
try:
with open(os.devnull, "w") as f:
sp.check_call(['ffmpeg', '-version'], stdout=f, stderr=f)
return "ffmpeg"
except:
return "avconv"
def readAudioFile(self, filename, parent):
command = [self.FFMPEG_BIN, '-i', filename]
try:
fileInfo = sp.check_output(command, stderr=sp.STDOUT, shell=False)
except sp.CalledProcessError as ex:
fileInfo = ex.output
pass
info = fileInfo.decode("utf-8").split('\n')
for line in info:
if 'Duration' in line:
d = line.split(',')[0]
d = d.split(' ')[3]
d = d.split(':')
duration = float(d[0])*3600 + float(d[1])*60 + float(d[2])
command = [
self.FFMPEG_BIN,
'-i', filename,
'-f', 's16le',
'-acodec', 'pcm_s16le',
'-ar', '44100', # ouput will have 44100 Hz
'-ac', '1', # mono (set to '2' for stereo)
'-']
in_pipe = sp.Popen(
command, stdout=sp.PIPE, stderr=sp.DEVNULL, bufsize=10**8)
completeAudioArray = numpy.empty(0, dtype="int16")
progress = 0
lastPercent = None
while True:
if self.canceled:
break
# read 2 seconds of audio
progress = progress + 4
raw_audio = in_pipe.stdout.read(88200*4)
if len(raw_audio) == 0:
break
audio_array = numpy.fromstring(raw_audio, dtype="int16")
completeAudioArray = numpy.append(completeAudioArray, audio_array)
percent = int(100*(progress/duration))
if percent >= 100:
percent = 100
if lastPercent != percent:
string = 'Loading audio file: '+str(percent)+'%'
parent.progressBarSetText.emit(string)
parent.progressBarUpdate.emit(percent)
lastPercent = percent
in_pipe.kill()
in_pipe.wait()
# add 0s the end
completeAudioArrayCopy = numpy.zeros(
len(completeAudioArray) + 44100, dtype="int16")
completeAudioArrayCopy[:len(completeAudioArray)] = completeAudioArray
completeAudioArray = completeAudioArrayCopy
return completeAudioArray
def cancel(self):
self.canceled = True
def reset(self):
self.canceled = False
@staticmethod
def badName(name):
'''Returns whether a name contains non-alphanumeric chars'''
return any([letter in string.punctuation for letter in name])
@staticmethod
def presetToString(dictionary):
'''Alphabetizes a dict into OrderedDict & returns string repr'''
return repr(
OrderedDict(sorted(dictionary.items(), key=lambda t: t[0]))
)
@staticmethod
def presetFromString(string):
'''Turns a string repr of OrderedDict into a regular dict'''
return dict(eval(string))
@staticmethod
def appendUppercase(lst):
for form, i in zip(lst, range(len(lst))):
lst.append(form.upper())
return lst