Package pycv :: Package cs :: Package ml :: Package cla :: Package boost :: Module cascade
[hide private]
[frames] | no frames]

Source Code for Module pycv.cs.ml.cla.boost.cascade

  1  # PyCV - A Computer Vision Package for Python Incorporating Fast Training of Face Detection 
  2   
  3  # Copyright 2007 Nanyang Technological University, Singapore. 
  4  # Authors: Minh-Tri Pham, Viet-Dung D. Hoang, and Tat-Jen Cham. 
  5   
  6  # This file is part of PyCV. 
  7   
  8  # PyCV is free software: you can redistribute it and/or modify 
  9  # it under the terms of the GNU General Public  
 10  # License as published by the Free Software Foundation, either version  
 11  # 3 of the License, or (at your option) any later version. 
 12   
 13  # PyCV is distributed in the hope that it will be useful, 
 14  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 15  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 16  # GNU General Public License for more details. 
 17   
 18  # You should have received a copy of the GNU General Public License 
 19  # along with this program.  If not, see <http://www.gnu.org/licenses/>. 
 20   
 21  # --------------------------------------------------------------------- 
 22  #!/usr/bin/env python 
 23   
 24   
 25  __all__ = ['SimpleCascade','train_SC','DirectFilteringCascade','train_DFC', 
 26      'GeneralizedCascade'] 
 27   
 28  from math import log, exp, sqrt 
 29  from numpy import array, where, zeros, ones, dot, prod, concatenate 
 30  from numpy import exp as NP_exp 
 31  from numpy import sqrt as NP_sqrt 
 32   
 33  from adaboost import train_PC 
 34  from pycv.cs.ml.cla import BinaryClassifier, WeightedCDataset, train_NBClassifier 
 35  from pycv import tprint 
 36   
 37   
 38  #------------------------------------------------------------------------------- 
 39  # A simple cascade of binary classifiers. The cascade predicts true iff all 
 40  # binary classifiers predict true 
 41  #------------------------------------------------------------------------------- 
 42   
43 -class SimpleCascade(BinaryClassifier):
44 """A cascade of binary classifiers. The cascade predicts true iff all binary 45 classifiers predict true. 46 """ 47
48 - def __init__(self,*binary_classifiers):
49 BinaryClassifier.__init__(self) 50 self.bclassifiers = list(binary_classifiers)
51
52 - def add_binary_classifier(self,binary_classifier):
53 self.bclassifiers.append(binary_classifier)
54
55 - def predict(self, input_point, *args, **kwds):
56 for x in self.bclassifiers: 57 if x.predict(input_point, *args, **kwds) == 0: 58 return 0 59 return 1
60 61
62 -def train_SC( cd, trainfunc, maxM, maxFR = 0.9 ):
63 """Train a cascade of at most maxM filters (binary classifiers). 64 65 Input: 66 cd: a WeightedCDataset -- modifiable 67 trainfunc: a function that takes a WeightedCDataset as input 68 and returns a BinaryClassifier 69 maxM: the maximum number of filters to be trained 70 maxFR: if the filtering rate (the number of negatives after training 71 divided by the number of negatives before training) rises above 72 'maxFR' then stop 73 Output: 74 cascade: a Cascade of filters 75 new_classification_dataset: a new WeightedCDataset filtered by the 76 cascade, useful for subsequent training 77 """ 78 cascade = Cascade() 79 tprint("Training the cascade...") 80 for i in xrange(maxM): 81 tprint("Training the "+str(i+1)+"th filter with instances="+str(cd.nspc)) 82 bc = trainfunc(cd) 83 cascade.add_binary_classifier(bc) 84 85 testres = bc.test(cd.input_data[0]).astype('bool') 86 FR = cd.dofilter(0,testres) 87 tprint("Filtering rate="+str(FR)) 88 if FR > maxFR: 89 break 90 tprint("Instances after training the cascade="+str(cd.nspc)) 91 return (cascade, cd)
92 93 94 #------------------------------------------------------------------------------- 95 # Cascade with scores, abstract class 96 #-------------------------------------------------------------------------------
97 -class ScoredCascade(BinaryClassifier):
98 - def __init__(self):
100
101 - def score(self, input_point, *args, **kwds):
102 """Compute the score of an input point. 103 104 Input: 105 input_point: an input point 106 Output: 107 z: (boolean) whether it passes the cascade 108 y: predicted class 109 s: the score value 110 """ 111 pass
112
113 - def scores(self, input_data, *args, **kwds):
114 """An array version of score(). Take an array of input points and produce their scores. 115 116 Input: 117 input_data: an array of input points 118 Output: 119 z: a boolean array telling whether the points pass the cascade 120 y: an array of predicted classes 121 s: an array of score values 122 """ 123 N = len(input_data) 124 z = zeros(N,'bool') 125 y = zeros(N,'int') 126 s = zeros(N,'double') 127 for i in xrange(N): 128 z[i], y[i], s[i] = self.score(input_data[i], *args, **kwds) 129 return z, y, s
130
131 - def generate_negatives(self, nsamples, cdgenerator, *args, **kwds):
132 """Generate 'nsamples' negative samples that pass the cascade and have scores >= 0. 133 May take time if samples are getting more difficult to pass the cascade. 134 135 Input: 136 nsamples: number of negative samples to be generated 137 cdgenerator: a CDGenerator that can generate negative samples 138 Output: 139 train_neg: a numpy.array of newly generated samples 140 scores_neg: an array of scores 141 """ 142 train_neg = zeros((nsamples,)+cdgenerator.input_shape,cdgenerator.dtype) 143 scores_neg = zeros(nsamples) 144 145 for n in xrange(nsamples): 146 while True: 147 ipoint = cdgenerator.generate(0) 148 (z,y,s) = self.score(ipoint, *args, **kwds) 149 if z and s > 0: break 150 train_neg[n] = ipoint 151 scores_neg[n] = s 152 153 return (train_neg,scores_neg)
154
155 - def predict(self, input_point, *args, **kwds):
156 (z,y,s) = self.score(input_point, *args, **kwds) 157 return y
158 159 160 161 #------------------------------------------------------------------------------- 162 # A cascade of binary classifiers that propagates the difficulty over the 163 # binary classifiers. 164 #------------------------------------------------------------------------------- 165
166 -class DirectFilteringCascade(ScoredCascade):
167 """ 168 A cascaded rejector of M*N weak rejectors f_1(x), ..., f_{M*N}(x). 169 170 At the m-th stage, let: 171 F_M(x) = \sum_{m=1}^M c_m f_m(x), 172 where 173 c_m: voting weight 174 f_m(x): a binary classifier outputing {-1,1} 175 176 At any stage m \in {N, 2*N, ..., M*N}, if F_m(x) < 0 then classify as 0. 177 Else classify as 1. 178 """ 179
180 - def __init__(self,M,N,weaks,c):
181 ScoredCascade.__init__(self) 182 self.M = M 183 self.N = N 184 self.weaks = weaks 185 self.c = c
186
187 - def score(self, input_point, *args, **kwds):
188 c0 = 0 189 for m in xrange(M*N): 190 if self.weaks[m].predict(input_point, *args, **kwds) > 0: 191 c0 += self.c[m] 192 else: 193 c0 -= self.c[m] 194 if m%N == 0 and c0 < 0: return (False,0,c0) 195 return (True,1,c0)
196 197
198 -def train_DFC( cd, cdgenerator, initfunc, trainfunc, M, N, alpha ):
199 """ 200 Train a DirectFilteringCascade using my method in my notebook dated 2 May 2007. 201 202 Goal: to train a cascade such that at decisive stages, FAR is minimized 203 and FRR is upper-bounded by alpha. 204 Input: 205 cd: a ScoredWCDataset of 2 classes -- modifiable 206 cdgenerator: a CDataGenerator for the 'cd' 207 initfunc(cd): a function that takes WeightedCDataset 'cd', 208 initializes some data, and returns 'data' 209 trainfunc(data,minDR): a function that takes initalized 'data' 210 and constant 'minDR' as input and returns a BinaryClassifier. The 211 function must be able to train to have at least 'minDR' detection 212 rate. 213 M: the maximum number of stages 214 N: the number of weak classifiers per stage 215 alpha: the maximum value of an exponential-based upper bound of FRR, 216 0 <= alpha <= 1 217 Output: 218 a DirectFilteringCascade 219 """ 220 cascade = None 221 222 weaks = [] 223 cs = [] 224 tprint("Training a cascaded rejector with at most "+str(M)+" weak rejectors...") 225 for m in xrange(M): 226 # train the new weak classifier 227 tprint("Training the "+str(m+1)+"th stage...") 228 229 for n in xrange(N): 230 cd.update_weights() 231 data = initfunc(cd) 232 w0 = cd.weights[0] 233 234 # start with a guess of c (setting maxFRR to 1) 235 c = -log(alpha)+0.00001 236 bcrit = 10000000 237 238 while True: 239 # given c, train weak classifier f(z) where 240 # z = maxFRR 241 # f(z)= maxFAR(maxFRR) 242 maxFRR = (alpha - exp(-c)) / (exp(c) - exp(-c)) 243 f = trainfunc( data, 1-maxFRR ) 244 245 # estimate maxFAR 246 err0 = f.test(cd.input_data[0]) 247 maxFAR = w0[err0.astype('bool')].sum() / w0.sum() 248 249 # compare with the current 250 crit = exp(-c)*(1-maxFAR) + exp(c)*maxFAR 251 if crit*1.005 < bcrit: 252 bcrit = crit 253 bc = c 254 bmaxFRR = maxFRR 255 bmaxFAR = maxFAR 256 bf = f 257 else: 258 break 259 260 # given maxFRR, maxFAR, train c 261 delta = alpha*alpha - 4*bmaxFRR*(1-bmaxFRR) 262 if delta < 0: 263 break 264 c1 = log(0.5*(alpha-sqrt(delta))/maxFRR) 265 if c1 < 0: c1 = 0 266 c2 = log(0.5*(alpha+sqrt(delta))/maxFRR) 267 if c1 > c2: 268 break 269 270 c = 0.5 * log((1-maxFAR)/maxFAR) 271 if c < c1: c = c1 272 if c > c2: c = c2 273 274 275 # append to the boost 276 weaks.append(bf) 277 cs.append(bc) 278 279 tprint("Weak filter: (c,maxFRR,maxFAR)=("+str(bc)+","+str(bmaxFRR)+ 280 ","+str(maxFAR)+")") 281 282 # estimate errors 283 err = [bf.test(cd.input_data[j]) != j for j in xrange(2)] 284 285 # update the scores 286 for j in xrange(2): cd.scores[j] += err[j]*(2*bc)-bc 287 288 # measure errors 289 FAR = (cd.scores[0] >= 0).sum() / cd.nspc[0] 290 FRR = (cd.scores[1] < 0).sum() / cd.nspc[1] 291 tprint("Measured errors: (FRR,FAR)=("+str(FRR)+","+str(FAR)+")") 292 293 # filter negatively rejected data 294 tprint("Filtering training samples:") 295 for j in xrange(2): 296 ar = cd.dofilter(j,cd.scores[j] >= 0) 297 tprint(" Acceptance rate for class "+str(j)+"="+str(ar)) 298 299 # build up the current cascade 300 cascade = DirectFilteringCascade(m,N,weaks,array(cs)) 301 302 # generate new negative samples 303 if cd.nspc[0] < cd.nspc[1]: 304 Nnew = cd.nspc[1]-cd.nspc[0] 305 tprint("needs "+str(Nnew)+" more nonfaces") 306 (tn, sn) = cascade.generate_negatives(Nnew,cdgenerator) 307 cd.concat(0,tn,sn) 308 309 tprint("nspc="+str(cd.nspc)) 310 311 return cascade
312 313
314 -class GeneralizedCascade(ScoredCascade):
315 """A cascade of M weak binary classifiers. Each classifier f_m(x) \in 316 {-1,+1} is associated with an score c_m, and a boolean value z_m which 317 tells whether the weak classifier is 'an improver' (z_m = 1) or 'a filter' 318 (z_m = 0) (to be explained below). Define the total scores of point x 319 up to stage m: 320 F_m(x) = \sum{i=1}^m z_i c_i f_i(x) 321 An improver at stage m does not classify x, but improves F_m(x) 322 to reach a certain goal (e.g. to minimize the total classification error up 323 to stage m). A filter, on the other hand, classifies x as negative 324 immediately iff: 325 G_m(x) = F_{m-1}(x) + c_i f_i(x) < 0 326 Or pass x to the next stage. Finally, the cascade classifies x as 327 positive iff F_M(x) >= 0, and negative otherwise. 328 """
329 - def __init__(self):
330 ScoredCascade.__init__(self) 331 self.weaks = [BinaryClassifier()] 332 self.c = array([-1],'double') 333 self.z = ones(1,'bool')
334
335 - def add_binary_classifier(self,binary_classifier,c,is_improver):
336 self.weaks.append(binary_classifier) 337 self.c = concatenate([self.c,[c]]) 338 self.z = concatenate([self.z,[is_improver]])
339
340 - def score(self,input_point, *args, **kwds):
341 s = 0 342 for m in xrange(len(self.weaks)): 343 f_m = self.weaks[m].predict(input_point, *args, **kwds) 344 s2 = s + self.c[m] * (f_m*2 - 1) 345 if not self.z[m]: # filter 346 if s2 < 0: 347 return (False,0,s2) 348 else: 349 s = s2 350 return (True,s >= 0,s)
351 352 353 354 355 #------------------------------------------------------------------------------- 356 # A nested cascade of boosted classifiers. The cascade predicts true iff all 357 # binary classifiers predict true. Succeeding classifiers use 358 # the confidence value of previous classifiers to help improve 359 # their performances. 360 #------------------------------------------------------------------------------- 361
362 -class NestedCascade(BinaryClassifier):
363 """Nested Cascade 364 365 Our nested cascade is different from Bo Wu's or Xiao et. al.'s 366 cascade. At each chain connector, we apply another threshold to 367 adapt to the new problem. They don't. 368 369 Description to be put in here. 370 """
371 - def __init__(self, is_nested = True):
372 BinaryClassifier.__init__(self) 373 self.is_nested = is_nested 374 self.crbclassifiers = [] 375 self.thresholds = []
376 377
378 -def main():
379 # main idea is for testing 380 a = [array([[0,1],[1,1],[1,0]]),array([[0,2],[1,2],[2,2],[2,1],[2,0]])] 381 b = WeightedCDataset(a) 382 c = train_PC(b,train_NBClassifier,10,10000) 383 tprint(c.test(a[0])) 384 tprint(c.test(a[1]))
385 386 if __name__ == '__main__': 387 main() 388