added tasks fot onset, silence and cut
authorPaul Brossier <piem@altern.org>
Mon, 19 Dec 2005 21:24:51 +0000 (21:24 +0000)
committerPaul Brossier <piem@altern.org>
Mon, 19 Dec 2005 21:24:51 +0000 (21:24 +0000)
added tasks fot onset, silence and cut

python/aubio/tasks.py

index f6973f5252693086717db669f886819b230b68d1..cf33ceefcb31e24a7f1294ae4633673c7b2b9c87 100644 (file)
@@ -201,32 +201,44 @@ class taskparams:
                self.silence = -70
                self.derivate = False
                self.localmin = False
+               self.storefunc = False
                self.bufsize = 512
                self.hopsize = 256
                self.samplerate = 44100
                self.tol = 0.05
                self.step = float(self.hopsize)/float(self.samplerate)
                self.threshold = 0.1
-               self.mode = 'yin'
+               self.onsetmode = 'dual'
+               self.pitchmode = 'yin'
                self.omode = aubio_pitchm_freq
 
 class task(taskparams):
+       """ default template class to apply tasks on a stream """
        def __init__(self,input,output=None,params=None):
-               """ open the input file and initialize default argument """
+               """ open the input file and initialize default argument 
+               parameters should be set *before* calling this method.
+               """
                if params == None: self.params = taskparams()
                else: self.params = params
+               self.frameread = 0
+               self.readsize  = self.params.hopsize
                self.input     = input
                self.filei     = sndfile(self.input)
                self.srate     = self.filei.samplerate()
                self.channels  = self.filei.channels()
+               self.step      = float(self.srate)/float(self.params.hopsize)
+               self.myvec     = fvec(self.params.hopsize,self.channels)
                self.output    = output
-       def compute_step(self):
-               pass
+       def __call__(self):
+               self.readsize = self.filei.read(self.params.hopsize,self.myvec)
+               self.frameread += 1
+               
        def compute_all(self):
                """ Compute data """
                mylist    = []
                while(self.readsize==self.params.hopsize):
-                       mylist.append(self())
+                       tmp = self()
+                       if tmp: mylist.append(tmp)
                return mylist
 
        def eval(self,results):
@@ -237,17 +249,28 @@ class task(taskparams):
                """ Plot data """
                pass
 
+class tasksilence(task):
+       wassilence = 1
+       issilence  = 1
+       def __call__(self):
+               task.__call__(self)
+               if (aubio_silence_detection(self.myvec(),self.params.silence)==1):
+                       if self.wassilence == 1: self.issilence = 1
+                       else: self.issilence = 2
+                       self.wassilence = 1
+               else: 
+                       if self.wassilence <= 0: self.issilence = 0
+                       else: self.issilence = -1 
+                       self.wassilence = 0
+               if self.issilence == -1:
+                       return -1, self.frameread 
+               elif self.issilence == 2:
+                       return 2, self.frameread 
+
 class taskpitch(task):
-       #def __init__(self,input,output): 
-       #       pass
-       #       task.__init__(self,input)
-       #       #taskparams.__init__(self)
        def __init__(self,input,params=None):
                task.__init__(self,input,params=params)
-               self.myvec     = fvec(self.params.hopsize,self.channels)
-               self.frameread = 0
-               self.readsize  = self.params.hopsize
-               self.pitchdet  = pitchdetection(mode=get_pitch_mode(self.params.mode),
+               self.pitchdet  = pitchdetection(mode=get_pitch_mode(self.params.pitchmode),
                        bufsize=self.params.bufsize,
                        hopsize=self.params.hopsize,
                        channels=self.channels,
@@ -255,16 +278,16 @@ class taskpitch(task):
                        omode=self.params.omode)
 
        def __call__(self):
-               self.readsize = self.filei.read(self.params.hopsize,self.myvec)
-               freq = self.pitchdet(self.myvec)
                #print "%.3f     %.2f" % (now,freq)
-               self.frameread += 1
+               task.__call__(self)
+               freq = self.pitchdet(self.myvec)
                if (aubio_silence_detection(self.myvec(),self.params.silence)!=1):
                        return freq
                else: 
                        return -1.
 
        def gettruth(self):
+               """ big hack to extract midi note from /path/to/file.<midinote>.wav """
                return float(self.input.split('.')[-2])
                
 
@@ -280,9 +303,12 @@ class taskpitch(task):
                                res.append(i)
                                sum += i
                                num += 1
-               avg = aubio_freqtomidi(sum / float(num))
+               if num == 0: 
+                       avg = 0; med = 0
+               else:
+                       avg = aubio_freqtomidi(sum / float(num))
+                       med = aubio_freqtomidi(short_find(res,len(res)/2))
                avgdist = self.truth - avg
-               med = aubio_freqtomidi(short_find(res,len(res)/2))
                meddist = self.truth - med
                return avgdist, meddist
 
@@ -295,4 +321,184 @@ class taskpitch(task):
                        outplot=options.outplot)
 
 
+class taskonset(task):
+       def __init__(self,input,output=None,params=None):
+               """ open the input file and initialize arguments 
+               parameters should be set *before* calling this method.
+               """
+               task.__init__(self,input,params=params)
+               self.opick = onsetpick(self.params.bufsize,
+                       self.params.hopsize,
+                       self.channels,
+                       self.myvec,
+                       self.params.threshold,
+                       mode=get_onset_mode(self.params.onsetmode),
+                       derivate=self.params.derivate)
+               self.olist = [] 
+               self.ofunc = []
+               self.d,self.d2 = [],[]
+               self.maxofunc = 0
+               if self.params.localmin:
+                       ovalist   = [0., 0., 0., 0., 0.]
+
+       def __call__(self):
+               task.__call__(self)
+                isonset,val = self.opick.do(self.myvec)
+                if (aubio_silence_detection(self.myvec(),self.params.silence)):
+                        isonset=0
+                if self.params.storefunc:
+                        self.ofunc.append(val)
+                if self.params.localmin:
+                        if val > 0: ovalist.append(val)
+                        else: ovalist.append(0)
+                        ovalist.pop(0)
+                if (isonset == 1):
+                        if self.params.localmin:
+                                i=len(self.ovalist)-1
+                                # find local minima before peak 
+                                while self.ovalist[i-1] < self.ovalist[i] and i > 0:
+                                        i -= 1
+                                now = (self.frameread+1-i)
+                        else:
+                                now = self.frameread
+                        if now < 0 :
+                                now = 0
+                       return now, val 
+
+       def eval(self,lres):
+               from txtfile import read_datafile 
+               from onsetcompare import onset_roc
+               amode = 'roc'
+               vmode = 'verbose'
+               vmode = ''
+               for i in range(len(lres)): lres[i] = lres[i][0]*self.params.step
+               ltru = read_datafile(self.input.replace('.wav','.txt'),depth=0)
+               if vmode=='verbose':
+                       print "Running with mode %s" % self.params.mode, 
+                       print " and threshold %f" % self.params.threshold, 
+                       print " on file", input
+               #print ltru; print lres
+               if amode == 'localisation':
+                       l = onset_diffs(ltru,lres,self.params.tol)
+                       mean = 0
+                       for i in l: mean += i
+                       if len(l): print "%.3f" % (mean/len(l))
+                       else: print "?0"
+               elif amode == 'roc':
+                       self.orig, self.missed, self.merged, \
+                               self.expc, self.bad, self.doubled = \
+                               onset_roc(ltru,lres,self.params.tol)
+
+       def plot(self,onsets,ofunc):
+               import Gnuplot, Gnuplot.funcutils
+               import aubio.txtfile
+               import os.path
+               import numarray
+               from aubio.onsetcompare import onset_roc
+
+               self.lenofunc = len(ofunc) 
+               self.maxofunc = max(max(ofunc), self.maxofunc)
+               # onset detection function 
+               downtime = numarray.arange(len(ofunc))/self.step
+               self.d.append(Gnuplot.Data(downtime,ofunc,with='lines'))
+
+               # detected onsets
+               x1 = numarray.array(onsets)/self.step
+               y1 = self.maxofunc*numarray.ones(len(onsets))
+               self.d.append(Gnuplot.Data(x1,y1,with='impulses'))
+               self.d2.append(Gnuplot.Data(x1,-y1,with='impulses'))
+
+               # check if datafile exists truth
+               datafile = self.input.replace('.wav','.txt')
+               if datafile == self.input: datafile = ""
+               if not os.path.isfile(datafile):
+                       self.title = "truth file not found"
+                       t = Gnuplot.Data(0,0,with='impulses') 
+               else:
+                       t_onsets = aubio.txtfile.read_datafile(datafile)
+                       y2 = self.maxofunc*numarray.ones(len(t_onsets))
+                       x2 = numarray.array(t_onsets).resize(len(t_onsets))
+                       self.d2.append(Gnuplot.Data(x2,y2,with='impulses'))
+                       
+                       tol = 0.050 
 
+                       orig, missed, merged, expc, bad, doubled = \
+                               onset_roc(x2,x1,tol)
+                       self.title = "GD %2.3f%% FP %2.3f%%" % \
+                               ((100*float(orig-missed-merged)/(orig)),
+                                (100*float(bad+doubled)/(orig)))
+
+
+       def plotplot(self,outplot=None):
+               from aubio.gnuplot import gnuplot_init, audio_to_array, make_audio_plot
+               import re
+               # audio data
+               time,data = audio_to_array(self.input)
+               self.d2.append(make_audio_plot(time,data))
+               # prepare the plot
+               g = gnuplot_init(outplot)
+
+               g('set title \'%s %s\'' % (re.sub('.*/','',self.input),self.title))
+
+               g('set multiplot')
+
+               # hack to align left axis
+               g('set lmargin 15')
+
+               # plot waveform and onsets
+               g('set size 1,0.3')
+               g('set origin 0,0.7')
+               g('set xrange [0:%f]' % max(time)) 
+               g('set yrange [-1:1]') 
+               g.ylabel('amplitude')
+               g.plot(*self.d2)
+               
+               g('unset title')
+
+               # plot onset detection function
+               g('set size 1,0.7')
+               g('set origin 0,0')
+               g('set xrange [0:%f]' % (self.lenofunc/self.step))
+               g('set yrange [0:%f]' % (self.maxofunc*1.01))
+               g.xlabel('time')
+               g.ylabel('onset detection value')
+               g.plot(*self.d)
+
+               g('unset multiplot')
+
+class taskcut(task):
+       def __init__(self,input,slicetimes,params=None,output=None):
+               """ open the input file and initialize arguments 
+               parameters should be set *before* calling this method.
+               """
+               task.__init__(self,input,output=None,params=params)
+               self.newname   = "%s%s%09.5f%s%s" % (self.input.split(".")[0].split("/")[-1],".",
+                                       self.frameread/self.step,".",self.input.split(".")[-1])
+               self.fileo       = sndfile(self.newname,model=self.filei)
+               self.myvec       = fvec(self.params.hopsize,self.channels)
+               self.mycopy     = fvec(self.params.hopsize,self.channels)
+               self.slicetimes = slicetimes 
+
+       def __call__(self):
+               task.__call__(self)
+               # write to current file
+               if len(self.slicetimes) and self.frameread >= self.slicetimes[0]:
+                       self.slicetimes.pop(0)
+                       # write up to 1st zero crossing
+                       zerocross = 0
+                       while ( abs( self.myvec.get(zerocross,0) ) > self.params.zerothres ):
+                               zerocross += 1
+                       writesize = self.fileo.write(zerocross,self.myvec)
+                       fromcross = 0
+                       while (zerocross < self.readsize):
+                               for i in range(self.channels):
+                                       self.mycopy.set(self.myvec.get(zerocross,i),fromcross,i)
+                                       fromcross += 1
+                                       zerocross += 1
+                       del self.fileo
+                       self.fileo = sndfile("%s%s%09.5f%s%s" % 
+                               (self.input.split(".")[0].split("/")[-1],".",
+                               self.frameread/self.step,".",self.input.split(".")[-1]),model=self.filei)
+                       writesize = self.fileo.write(fromcross,self.mycopy)
+               else:
+                       writesize = self.fileo.write(self.readsize,self.myvec)