This repository has been archived on 2020-08-22. You can view files and clone it, but cannot push or open issues or pull requests.
pyaudviz/core.py

414 lines
14 KiB
Python

import sys
import io
import os
from PyQt4 import QtCore, QtGui, uic
from os.path import expanduser
import subprocess as sp
import numpy
from PIL import Image
#import tempfile
from shutil import rmtree
#import atexit
import time
from collections import OrderedDict
import json
from importlib import import_module
from PyQt4.QtGui import QDesktopServices
import string
class Core():
def __init__(self):
self.FFMPEG_BIN = self.findFfmpeg()
#self.tempDir = os.path.join(
# tempfile.gettempdir(), 'audio-visualizer-python-data')
#if not os.path.exists(self.tempDir):
# os.makedirs(self.tempDir)
#atexit.register(self.deleteTempDir)
self.dataDir = QDesktopServices.storageLocation(
QDesktopServices.DataLocation)
self.presetDir = os.path.join(self.dataDir, 'presets')
self.wd = os.path.dirname(os.path.realpath(__file__))
self.loadEncoderOptions()
self.findComponents()
self.selectedComponents = []
self.modifiedComponents = {}
def findComponents(self):
def findComponents():
srcPath = os.path.join(self.wd, 'components')
if os.path.exists(srcPath):
for f in sorted(os.listdir(srcPath)):
name, ext = os.path.splitext(f)
if name.startswith("__"):
continue
elif ext == '.py':
yield name
self.modules = [
import_module('components.%s' % name)
for name in findComponents()]
self.moduleIndexes = [i for i in range(len(self.modules))]
def componentListChanged(self):
for i, component in enumerate(self.selectedComponents):
component.compPos = i
# print('Modified Components: ', self.modifiedComponents)
def insertComponent(self, compPos, moduleIndex):
if compPos < 0:
compPos = len(self.selectedComponents) -1
component = self.modules[moduleIndex].Component(
moduleIndex, compPos)
self.selectedComponents.insert(
compPos,
component)
newDict = {}
for i, val in self.modifiedComponents.items():
if i >= compPos:
newDict[i+1] = bool(val)
else:
newDict[i] = bool(val)
self.modifiedComponents.clear()
self.modifiedComponents = newDict
self.componentListChanged()
return compPos
def moveComponent(self, startI, endI):
def insert(target, comp):
self.selectedComponents.insert(target, comp)
comp = self.selectedComponents.pop(startI)
insert(endI, comp)
try:
oldModified = self.modifiedComponents.pop(startI)
if endI in self.modifiedComponents:
self.modifiedComponents[startI] = \
self.modifiedComponents.pop(endI)
self.modifiedComponents[endI] = oldModified
except KeyError:
pass
self.componentListChanged()
return endI
def removeComponent(self, i):
self.selectedComponents.pop(i)
try:
self.modifiedComponents.pop(i)
except KeyError:
pass
newDict = {}
for index, val in self.modifiedComponents.items():
if index >= i:
newDict[index-1] = bool(val)
else:
newDict[index] = bool(val)
self.modifiedComponents.clear()
self.modifiedComponents = newDict
self.componentListChanged()
def updateComponent(self, i):
# print('updating %s' % self.selectedComponents[i])
self.selectedComponents[i].update()
def componentModified(self, i):
'''Triggered by mainwindow.updateComponentTitle()
Tracks temporary state of whether components are modified or not
for retrieval upon loading a project file'''
self.modifiedComponents[i] = True
def componentUnmodified(self, i):
try:
self.modifiedComponents.pop(i)
except KeyError:
pass
def moduleIndexFor(self, compName):
compNames = [mod.Component.__doc__ for mod in self.modules]
index = compNames.index(compName)
return self.moduleIndexes[index]
def openProject(self, loader, filepath):
'''loader is the object calling this method (mainwindow/command)
which implements an insertComponent method'''
errcode, data = self.parseAvFile(filepath)
if errcode == 0:
for i, tup in enumerate(data['Components']):
name, vers, preset = tup
loader.insertComponent(
self.moduleIndexFor(name), -1)
self.selectedComponents[-1].loadPreset(
preset)
if data['Modified'][i] == True:
self.componentModified(i)
loader.updateComponentTitle(i, True)
else:
loader.updateComponentTitle(i)
elif errcode == 1:
typ, value, _ = data
if typ.__name__ == KeyError:
# probably just an old version, still loadable
print('file missing value: %s' % value)
return
loader.createNewProject()
msg = '%s: %s' % (typ.__name__, value)
loader.showMessage(
msg="Project file '%s' is corrupted." % filepath,
showCancel=False,
icon=QtGui.QMessageBox.Warning,
detail=msg)
def parseAvFile(self, filepath):
'''Parses an avp (project) or avl (preset package) file.
Returns data usable by another method.'''
data = {}
try:
with open(filepath, 'r') as f:
def parseLine(line):
'''Decides if a given avp or avl line is a section header'''
validSections = ('Components', 'Modified')
line = line.strip()
newSection = ''
if line.startswith('[') and line.endswith(']') \
and line[1:-1] in validSections:
newSection = line[1:-1]
return line, newSection
section = ''
i = 0
for line in f:
line, newSection = parseLine(line)
if newSection:
section = str(newSection)
data[section] = []
continue
if line and section == 'Components':
if i == 0:
lastCompName = str(line)
i += 1
elif i == 1:
lastCompVers = str(line)
i += 1
elif i == 2:
lastCompPreset = Core.presetFromString(line)
data[section].append(
(lastCompName,
lastCompVers,
lastCompPreset)
)
i = 0
if line and section == 'Modified':
data[section].append(eval(line))
return 0, data
except:
return 1, sys.exc_info()
def importPreset(self, filepath):
errcode, data = self.parseAvFile(filepath)
returnList = []
if errcode == 0:
name, vers, preset = data['Components'][0]
presetName = preset['preset'] \
if preset['preset'] else os.path.basename(filepath)[:-4]
newPath = os.path.join(
self.presetDir,
name,
vers,
presetName
)
if os.path.exists(newPath):
return False, newPath
preset['preset'] = presetName
self.createPresetFile(
name, vers, presetName, preset
)
return True, presetName
elif errcode == 1:
# TODO: an error message
return False, ''
def exportPreset(self, exportPath, compName, vers, origName):
internalPath = os.path.join(self.presetDir, compName, str(vers), origName)
if not os.path.exists(internalPath):
return
if os.path.exists(exportPath):
os.remove(exportPath)
with open(internalPath, 'r') as f:
internalData = [line for line in f]
try:
saveValueStore = Core.presetFromString(internalData[0].strip())
self.createPresetFile(
compName, vers,
origName, saveValueStore,
exportPath
)
return True
except:
return False
def createPresetFile(
self, compName, vers, presetName, saveValueStore, filepath=''):
'''Create a preset file (.avl) at filepath using args.
Or if filepath is empty, create an internal preset using
the args for the filepath.'''
if not filepath:
dirname = os.path.join(self.presetDir, compName, str(vers))
if not os.path.exists(dirname):
os.makedirs(dirname)
filepath = os.path.join(dirname, presetName)
internal = True
else:
if not filepath.endswith('.avl'):
filepath += '.avl'
internal = False
with open(filepath, 'w') as f:
if not internal:
f.write('[Components]\n')
f.write('%s\n' % compName)
f.write('%s\n' % str(vers))
f.write(Core.presetToString(saveValueStore))
def createProjectFile(self, filepath):
'''Create a project file (.avp) using the current program state'''
try:
if not filepath.endswith(".avp"):
filepath += '.avp'
if os.path.exists(filepath):
os.remove(filepath)
with open(filepath, 'w') as f:
print('creating %s' % filepath)
f.write('[Components]\n')
for comp in self.selectedComponents:
saveValueStore = comp.savePreset()
f.write('%s\n' % str(comp))
f.write('%s\n' % str(comp.version()))
f.write('%s\n' % Core.presetToString(saveValueStore))
f.write('[Modified]\n')
for i in range(len(self.selectedComponents)):
if i in self.modifiedComponents:
f.write('%s\n' % repr(True))
else:
f.write('%s\n' % repr(False))
return True
except:
return False
def loadEncoderOptions(self):
file_path = os.path.join(self.wd, 'encoder-options.json')
with open(file_path) as json_file:
self.encoder_options = json.load(json_file)
def findFfmpeg(self):
if sys.platform == "win32":
return "ffmpeg.exe"
else:
try:
with open(os.devnull, "w") as f:
sp.check_call(['ffmpeg', '-version'], stdout=f, stderr=f)
return "ffmpeg"
except:
return "avconv"
def readAudioFile(self, filename, parent):
command = [self.FFMPEG_BIN, '-i', filename]
try:
fileInfo = sp.check_output(command, stderr=sp.STDOUT, shell=False)
except sp.CalledProcessError as ex:
fileInfo = ex.output
pass
info = fileInfo.decode("utf-8").split('\n')
for line in info:
if 'Duration' in line:
d = line.split(',')[0]
d = d.split(' ')[3]
d = d.split(':')
duration = float(d[0])*3600 + float(d[1])*60 + float(d[2])
command = [
self.FFMPEG_BIN,
'-i', filename,
'-f', 's16le',
'-acodec', 'pcm_s16le',
'-ar', '44100', # ouput will have 44100 Hz
'-ac', '1', # mono (set to '2' for stereo)
'-']
in_pipe = sp.Popen(
command, stdout=sp.PIPE, stderr=sp.DEVNULL, bufsize=10**8)
completeAudioArray = numpy.empty(0, dtype="int16")
progress = 0
lastPercent = None
while True:
if self.canceled:
break
# read 2 seconds of audio
progress = progress + 4
raw_audio = in_pipe.stdout.read(88200*4)
if len(raw_audio) == 0:
break
audio_array = numpy.fromstring(raw_audio, dtype="int16")
completeAudioArray = numpy.append(completeAudioArray, audio_array)
percent = int(100*(progress/duration))
if percent >= 100:
percent = 100
if lastPercent != percent:
string = 'Loading audio file: '+str(percent)+'%'
parent.progressBarSetText.emit(string)
parent.progressBarUpdate.emit(percent)
lastPercent = percent
in_pipe.kill()
in_pipe.wait()
# add 0s the end
completeAudioArrayCopy = numpy.zeros(
len(completeAudioArray) + 44100, dtype="int16")
completeAudioArrayCopy[:len(completeAudioArray)] = completeAudioArray
completeAudioArray = completeAudioArrayCopy
return completeAudioArray
#def deleteTempDir(self):
# try:
# rmtree(self.tempDir)
# except FileNotFoundError:
# pass
def cancel(self):
self.canceled = True
def reset(self):
self.canceled = False
@staticmethod
def badName(name):
'''Returns whether a name contains non-alphanumeric chars'''
return any([letter in string.punctuation for letter in name])
@staticmethod
def presetToString(dictionary):
'''Alphabetizes a dict into OrderedDict & returns string repr'''
return repr(OrderedDict(sorted(dictionary.items(), key=lambda t: t[0])))
@staticmethod
def presetFromString(string):
'''Turns a string repr of OrderedDict into a regular dict'''
return dict(eval(string))