mkogg.py: Fix 'self.get_mp4_metadata(self, source)'
[blog.git] / posts / Factor_analysis / color-ball.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 #
4 # Copyright (C) 2013 W. Trevor King <wking@drexel.edu>
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Lesser General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # Lesser General Public License for more details.
15 #
16 # You should have received a copy of the GNU Lesser General Public
17 # License along with this program.  If not, see
18 # <http://www.gnu.org/licenses/>.
19
20 """Understanding expectation maximization
21
22 You have a bag of red, green, and blue balls, from which you draw N
23 times with replacement and get n1 red, n2 green, and n3 blue balls.
24 The probability of any one combination of n1, n2, and n3 is given by
25 the multinomial distribution:
26
27   p(n1, n2, n3) = N! / (n1! n2! n3!)  p1^n1 p2^n2 p3^n3
28
29 From some outside information, we can parameterize this model in terms
30 of a single hidden variable p:
31
32   p1 = 1/4
33   p2 = 1/4 + p/4
34   p3 = 1/2 - p/4
35
36 If we are red/green colorblind, we only measure
37
38   m1 = n1 + n2
39   m2 = n3
40
41 What is p (the hidden variable)?  What were n1 and n2?
42 """
43
44 import numpy as _numpy
45
46
47 class BallBag (object):
48     """Color-blind ball drawings
49     """
50     def __init__(self, p=1):
51         self._p = p
52         self.pvals = [0.25, 0.25 + p/4., 0.5 - p/4.]
53
54     def draw(self, n=10):
55         """Draw `n` balls from the bag with replacement
56
57         Return (m1, m2), where m1 is the number of red or green balls
58         and m2 is the number of blue balls.
59         """
60         nvals = _numpy.random.multinomial(n=n, pvals=self.pvals)
61         m1 = sum(nvals[:2])  # red and green
62         m2 = nvals[2]
63         return (m1, m2)
64
65
66 class Analyzer (object):
67     def __init__(self, m1, m2):
68         self.m1 = m1
69         self.m2 = m2
70         self.p = self.E_n1 = self.E_n2 = None
71
72     def __call__(self):
73         pass
74
75     def print_results(self):
76         print('Results for {}:'.format(type(self).__name__))
77         for name,attr in [
78                 ('p', 'p'),
79                 ('E[n1|m1,m2]', 'E_n1'),
80                 ('E[n2|m1,m2]', 'E_n2'),
81                 ]:
82             print('  {}:  {}'.format(name, getattr(self, attr)))
83
84
85 class Naive (Analyzer):
86     """Simple analysis
87
88     With a large enough sample, the measured m1 and m2 give good
89     estimates for (p1 + p2) and p3.  You can use either of these to
90     solve for the unknown p, and then solve for E[n1] and E[n2].
91
92     While this is an easy approach for the colored ball example, it
93     doesn't generalize well to more complicated models ;).
94     """
95     def __call__(self):
96         N = self.m1 + self.m2
97         p1 = 0.25
98         p3 = self.m2 / float(N)
99         p2 = 1 - p1 - p3
100         self.p = 4*p2 - 1
101         self.E_n1 = p1 * N
102         self.E_n2 = p2 * N
103
104
105 class MaximumLikelihood (Analyzer):
106     """Analytical ML estimation
107
108     The likelihood of a general model θ looks like:
109     
110       L(x^N,θ) = sum_{n=1}^N log[p(x_n|θ)] + log[p(θ)]
111     
112     dropping the p(θ) term and applying to this situation, we have:
113     
114       L([m1,m2], p) = N! / (m1! m2!)  (p1 + p2)^m1 p3^m2
115                     = N! / (m1! m2!)  (1/2 + p/4)^m1 (1/2 - p/4)^m2
116     
117     which comes from recognizing that to a color-blind experimenter the
118     three ball colors are effectively two ball colors, so they'll have a
119     binomial distribution.  Maximizing the log-likelihood:
120     
121       log[L(m1,m2)] = log[N!…] + m1 log[1/2 + p/4] + m2 log[1/2 - p/4]
122       d/dp log[L] = m1/(1/2 + p/4)/4 + m2/(1/2 - p/4)/(-4) = 0
123       m1 (2 - p) = m2 (2 + p)
124       2 m1 - m1 p = 2 m2 + m2 p
125       (m1 + m2) p = 2 (m1 - m2)
126       p = 2 (m1 - m2) / (m1 + m2)
127     
128     Given this value of p, the the expected values of n1 and n2 are:
129     
130       E[n1|m1,m2] = p1 / (p1 + p2) * m1      # from the relative probabilities
131                   = (1/4) / (1/2 + p/4) * m1
132                   = m1 / (2 + p)
133                   = m1 / [2 + 2 (m1 - m2) / (m1 + m2)]
134                   = m1/2 * (m1 + m2) / (m1 + m2 + m1 - m2)
135                   = m1 * (m1 + m2) / (4 m1)
136                   = (m1 + m2) / 4
137       E[n2|m1,m2] = p2 / (p1 + p2) * m1
138                   = (1/4 + p/4) / (1/2 + p/4) * m1
139                   = m1 (1 + p) / (2 + p)
140                   = m1 [1 + 2 ((m1 - m2) / (m1 + m2)] /
141                        [2 + 2 (m1 - m2) / (m1 + m2)]
142                   = m1/2 * (m1 + m2 + 2m1 - 2m2) / (m1 + m2 + m1 - m2)
143                   = m1 * (3m1 - m2) / (4 m1)
144                   = (3m1 - m2) / 4
145     
146     So with a draw of m1 = 61 and m2 = 39, the ML estimates are:
147     
148       p = 0.44
149       E[n1|m1,m2] = 25.0
150       E[n2|m1,m2] = 36.0
151     """
152     def __call__(self):
153         N = self.m1 + self.m2
154         self.p = 2 * (self.m1 - self.m2) / float(N)
155         self.E_n1 = N / 4.
156         self.E_n2 = (3*self.m1 - self.m2) / 4.
157
158
159 class ExpectationMaximizer (Analyzer):
160     """Expectation maximization
161
162     Sometimes analytical ML is hard, so instead we iteratively
163     optimize:
164
165       Q(θ_t|θ_{t-1}) = E[log[p(x,y,θ_t)]|x,θ_{t-1}]
166
167     Applying to this situation, we have:
168
169       Q(p_t|p_{t-1}) = E[log[p([m1,m2],[n1,n2,n3],p_t)]|[m1,m2],p_{t-1}]
170
171     where:
172
173       p(m1,m2,n1,n2,n3,p) = δ(m1-n1-n2)δ(m2-n3)p(n1,n2,n3)
174
175     Plugging in and expanding the log:
176
177       Q(p_t|p_{t-1})
178         = E[log[δ(m1…)] + log[δ(m2…)] + log[p(n1,n2,n3,p_t)]
179             |m1,m2,p_{t-1}]
180         ≈ E[log[p(n1,n2,n3,p_t)]|m1,m2,p_{t-1}]  # drop boring δ terms
181         ≈ E[log[N!…] + n1 log[1/4] + n2 log[1/4 + p_t/4] + n3 log[1/2 - p_t/4]
182             |m1,m2,p_{t-1}]
183         ≈ E[n2 log[1/4 + p_t/4] + n3 log[1/2 - p_t/4]
184             |m1,m2,p_{t-1}]                  # drop non-p_t terms
185         ≈ E[n2|m1,m2,p_{t-1}] log[1/4 + p_t/4] + m2 log[1/2 - p_t/4]
186
187     Maximizing (the M step):
188
189       d/dp_t Q(p_t|p_{t-1})
190         ≈ E[n2|m1,m2,p_{t-1}] / (1/4 + p_t/4)/4 + m2 / (1/2 - p_t/4)/(-4)
191         = 0
192       E[n2|m1,m2,p_{t-1}] / (1 + p_t) = m2 / (2 - p_t)
193       E[n2|m1,m2,p_{t-1}] (2 - p_t) = m2 (1 + p_t)
194       p_t (E[n2|m1,m2,p_{t-1}] + m2) = 2 E[n2|m1,m2,p_{t-1}] - m2 
195       p_t = (2 E[n2|m1,m2,p_{t-1}] - m2) / (E[n2|m1,m2,p_{t-1}] + m2)
196
197     To get a value for p_t, we need to evaluate those expectations
198     (the E step).  Using a subset of the ML analysis:
199
200       E[n2|m1,m2,p_{t-1}] = m1 (1 + p_{t-1}) / (2 + p_{t-1})
201     """
202     def __init__(self, p=0, **kwargs):
203         super(ExpectationMaximizer, self).__init__(**kwargs)
204         self.p = 0    # prior belief
205
206     def _E_step(self):
207         """Caculate E[ni|m1,m2,p_{t-1}] given the prior parameter p_{t-1}
208         """
209         return {
210             'E_n1': m1 / (2. + self.p),
211             'E_n2': m1 * (1. + self.p) / (2. + self.p),
212             'E_n3': m2,
213             }
214
215     def _M_step(self, E_n1, E_n2, E_n3):
216         "Maximize Q(p_t|p_{t-1}) over p_t"
217         self.p = (2.*E_n2 - self.m2) / (E_n2 + self.m2)
218
219     def __call__(self, n=10):
220         for i in range(n):
221             print('   estimated p{}: {}'.format(i, self.p))
222             Es = self._E_step()
223             self._M_step(**Es)
224         for key,value in self._E_step().items():
225             setattr(self, key, value)
226
227
228 if __name__ == '__main__':
229     p = 0.6
230     bag = BallBag(p=p)
231     m1,m2 = bag.draw(n=100)
232     print('estimate p using m1 = {} and m2 = {}'.format(m1, m2))
233     for analyzer in [
234             ExpectationMaximizer(m1=m1, m2=m2),
235             MaximumLikelihood(m1=m1, m2=m2),
236             Naive(m1=m1, m2=m2),
237             ]:
238         analyzer()
239         analyzer.print_results()