Fire diamonds now generated with new, PGF-based nfpa_704 LaTeX package.
[chemdb.git] / chemdb / chemdb.py
1 # Copyright
2
3 """Utilities for chemical inventories.
4
5 Includes a CAS number validator and document generation.
6 """
7
8 import re
9 import os
10 import os.path
11 import time
12 import types
13 from sys import stdin, stdout, stderr
14
15 from .db.text import DBPrettyPrinter
16
17
18 def valid_CASno(cas_string, debug=False):
19     """Validate CAS numbers.
20
21     Check `N..NN-NN-N` format, and the `checksum digit`_ for valid CAS
22     number structure.  for
23
24     .. math::
25       N_n .. N_4 N_3 - N_2 N_1 - R
26
27     The checksum digit is
28
29     .. math::
30       R = remainder([sum_{i=1}^n i N_i  ] / 10 )
31
32     .. _checksum digit:
33       http://www.cas.org/expertise/cascontent/registry/checkdig.html
34
35     >>> valid_CASno('107-07-3')
36     True
37     >>> valid_CASno('107-08-3')
38     False
39     >>> valid_CASno('107-083')
40     False
41
42     Sometimes we don't have a CAS number, or a product will contain
43     secret, non-hazardous ingredients.  Therefore we treat the strings
44     `na` and `+secret-non-hazardous` as valid CAS numbers.
45
46     >>> valid_CASno('na')
47     True
48     >>> valid_CASno('+secret-non-hazardous')
49     True
50     """
51     if cas_string in ['na', '+secret-non-hazardous']:
52         return True
53     # check format,  
54     # \A matches the start of the string
55     # \Z matches the end of the string
56     regexp = re.compile('\A[0-9]{2,}[-][0-9]{2}[-][0-9]\Z')
57     if regexp.match(cas_string) == None:
58         if debug: print >> stderr, "invalid CAS# format: '%s'" % cas_string
59         return False
60     # generate check digit
61     casdigs = [int(d) for d in ''.join(cas_string.split('-'))]
62     sumdigs = casdigs[:-1]
63     sumdigs.reverse()
64     s = sum([(i+1)*d for i,d in enumerate(sumdigs)])
65     check = s % 10
66     if check == casdigs[-1]:
67         return True
68     elif debug:
69         print >> stderr, (
70             "invalid CAS# check: '%s' (expected %d)" % (cas_string, check))
71     return False
72
73 class MSDSManager (object):
74     """Manage Material Saftey Data Sheets (MSDSs).
75     """
76     def __init__(self, db, dir="./MSDS/"):
77         self.db = db
78         self.dir = dir
79         self.MIMEs = {
80             'application/pdf': ['pdf'],
81             'text/html': ['html'],
82             'text/plain': ['txt'],
83             }
84         self.check_dir()
85
86     def check_dir(self):
87         "Create the MSDS directory if it's missing."
88         if os.path.isdir(self.dir):
89             return # all set to go
90         elif os.path.exists(self.dir):
91             raise Exception, (
92                 'Error: a non-directory file exists at %s' % self.dir)
93         else:
94             os.mkdir(self.dir)
95
96     def basepath(self, id):
97         assert isinstance(id, int), (
98             'id must be an integer, not %s (%s)' % (type(id), str(id)))
99         return os.path.join(self.dir, "%d" % id)
100
101     def local_basepath(self, id) : # for symbolic links
102         assert isinstance(id, int), (
103             'id must be an integer, not %s (%s)' % (type(id), str(id)))
104         return "./%d" % id
105
106     def MIME_ext(self, mime):
107         if mime in self.MIMEs.keys():
108             return self.MIMEs[mime][0]
109         for values in self.MIMEs.values():
110             if mime in values:
111                 return mime
112         raise ValueError(
113             "invalid MIME type '%s'\nshould be one of %s" % (mime, self.MIMEs))
114
115     def path(self, id, mime):
116         return "%s.%s" % (self.basepath(id), self.MIME_ext(mime))
117
118     def local_path(self, id, mime):
119         return "%s.%s" % (self.local_basepath(id), self.MIME_ext(mime))
120
121     def save(self, id, filetext, mime='application/pdf'):
122         "Save the binary byte string FILE to the path for ID"
123         print >> file(self.path(id, mime), 'wb'), filetext,
124
125     def link(self, id, target_id):
126         # target_id already exists, create a symlink to it for id.
127         target_mime = self.get_MSDS_MIME(target_id)
128         target_path = self.local_path(target_id, target_mime)
129         path = self.path(id, target_mime)
130         #os.link(self.path(target_id), self.path(id))   # hard link...
131         os.symlink(target_path, path)                   # ... or soft link
132
133     def has_MSDS_MIME(self, id, mime):
134         """
135         >>> m = MSDSManager(db=None)
136         >>> print m.has_MSDS_MIME(102, 'pdf') # test on html
137         False
138         >>> print m.has_MSDS_MIME(102, 'html') # test on html
139         True
140         >>> print m.has_MSDS_MIME(6, 'pdf') # test on pdf symlink
141         True
142         """
143         return os.path.exists(self.path(id, mime))
144
145     def get_MSDS_path(self, id):
146         """
147         >>> m = MSDSManager(db=None)
148         >>> print m.get_MSDS_path(102) # test on html
149         ./MSDS/102.html
150         >>> print m.get_MSDS_path(1) # test on pdf
151         ./MSDS/1.pdf
152         >>> print m.get_MSDS_path(6) # test on pdf symlink
153         ./MSDS/6.pdf
154         """
155         for mime in self.MIMEs:
156             if self.has_MSDS_MIME(id, mime):
157                 return self.path(id, mime)
158         return None
159
160     def get_MSDS_MIME(self, id):
161         """
162         >>> m = MSDSManager(db=None)
163         >>> print m.get_MSDS_MIME(102) # test on html
164         text/html
165         >>> print m.get_MSDS_MIME(1) # test on pdf
166         application/pdf
167         >>> print m.get_MSDS_MIME(6) # test on pdf symlink
168         application/pdf
169         """
170         for mime in self.MIMEs:
171             if self.has_MSDS_MIME(id, mime):
172                 return mime
173         return None
174
175     def has_MSDS(self, id):
176         if self.get_MSDS_path(id) == None:
177             return False
178         return True
179
180     def get_all(self, simlinks=True):
181         ret = []
182         for record in self.db.records():
183             p = self.get_MSDS_path( int(record['ID']) )
184             if p != None:
185                 if simlinks == False and os.path.islink( p ):
186                     continue # ignore the symbolic link
187                 ret.append({'ID':record['ID'], 'Name':record['Name']})
188         return ret
189
190
191 class DocGen (object):
192     "Generate the officially required documents"
193     def __init__(self, db, doc_root=os.path.join('template', 'doc')):
194         self.db = db
195         self.doc_root = doc_root
196
197     def _latex_safe(self, string):
198         string = string.replace('%', '\%')
199         string = string.replace('>', '$>$')
200         string = string.replace('<', '$<$')
201         return string
202
203     def _set_main_target(self, target):
204         print >> file(os.path.join(self.doc_root, 'main.tex'), 'w'), (
205             """\documentclass[letterpaper]{article}
206
207 \input{%s}
208 """ % target)
209
210     def _make_pdf(self, target_file):
211         os.system('cd %s && make pdf' % self.doc_root)
212         path = os.path.join(self.doc_root, target_file)
213         os.system('cp %s %s' % (os.path.join(self.doc_root, 'main.pdf'), path))
214         return path
215
216     def inventory(self, title=None,
217                   namewidth='a', sort_field='db_id',
218                   valid_record=lambda r: r['Disposed'] == ''):
219         """Create a pdf list of all maching chemicals.  The default is to
220         match all currently owned chemicals.  Matching chemicals can be sorted
221         by any field (defaults to 'ID')."""
222         if title == None:
223             title == 'Inventory'
224         pp = DBPrettyPrinter(self.db)
225         active_ids = []
226         for record in self.db.records():
227             if valid_record(record) : # get ids for matching chemicals
228                 active_ids.append(record['db_id'])
229         active_ids.sort(cmp=lambda a,b: cmp(self.db.record(a)[sort_field],
230                                             self.db.record(b)[sort_field]))
231         active_fields = ['ID', 'Name', 'Amount',
232                          'H', 'F', 'R', 'O', 'M', 'C', 'T']
233         width = {}
234         for field in active_fields:
235             width[field] = 'a'
236         width['Name'] = namewidth
237         ## Plain text method
238         #string = "Chemical inventory:\t\tGenerated on %s\n\n" \
239         #         % time.strftime('%Y-%m-%d')
240         #string += pp.multi_record_string(active_ids, active_fields,
241         #                                 width=width, FS=' ')
242         # return string
243         ## Latex method
244         string = "\\begin{longtable}{l l l c c c c c c c}\n"
245         string += ('%% The header for the remaining page(s) of the table...\n'
246                    'ID & Name & Amount & H & F & R & O & M & C & T \\\\\n'
247                    '\\hline\n'
248                    '\\endhead\n')
249         for db_id in active_ids:
250             record = self.db.record(db_id)
251             string += "  %s & %s & %s & %s & %s & %s & %s & %s & %s & %s \\\\\n" \
252                       % (self._latex_safe(record['ID']),
253                          self._latex_safe(record['Name']),
254                          self._latex_safe(record['Amount']),
255                          self._latex_safe(record['H']),
256                          self._latex_safe(record['F']),
257                          self._latex_safe(record['R']),
258                          self._latex_safe(record['O']),
259                          self._latex_safe(record['M']),
260                          self._latex_safe(record['C']),
261                          self._latex_safe(record['T']))
262         string += "\\end{longtable}\n"
263         print >> file(os.path.join(self.doc_root, 'inventory_title.tex'), 'w'), title
264         print >> file(os.path.join(self.doc_root, 'inventory_data.tex'), 'w'), string
265         ## alter main.tex to point to the inventory template.
266         self._set_main_target('inventory_template')
267         ## run latex
268         path = self._make_pdf('inventory.pdf')
269         return path
270
271     def door_warning(self,
272                      valid_record=lambda r: r['Disposed'] == ''):
273         """create a warning NFPA diamond and list of the most dangerous
274         chemicals for which valid_record(record) is true.  For
275         example, to generate a door warning for the front door use
276           door_warning(lambda r: r['Disposed'] == '')
277         or to generate the warning for the fridge
278           door_warning(lambda r: r['Disposed'] == '' and r['Location'] == 'Refrigerator')
279         Note that valid_record defaults to the first example.
280         """
281         # Search the database to find the nasties
282         NFPA_maxs = {'H':0, 'F':0, 'R':0, 'O':[]}
283         Mutagens = []
284         Carcinogens = []
285         Teratogens = []
286         Healths = []
287         Fires = []
288         Reactivities = []
289         Others = []
290         for record in self.db.records():
291             if valid_record(record):
292                 for field in ['H', 'F', 'R', 'O']:
293                     r = record[field]
294                     if r != '' and r != '?':
295                         if field != 'O' and int(r) > NFPA_maxs[field]:
296                             NFPA_maxs[field] = int(r)
297                         elif field == 'O' and not r in NFPA_maxs['O']:
298                             NFPA_maxs[field].append(r)
299                 for field,array in zip(['M','C','T'],
300                                        [Mutagens,
301                                         Carcinogens,
302                                         Teratogens]):
303                     if record[field] != '' and record[field] != '?':
304                         array.append(record['db_id'])
305         # now that we've found the max NFPAs,
306         # find all the chemicals at those levels
307         for record in self.db.records():
308             if valid_record(record):
309                 for field,array in zip(['H', 'F', 'R', 'O'],
310                                        [Healths, Fires,
311                                         Reactivities, Others]):
312                     r = record[field]
313                     if r != '' and r != '?':
314                         if field != 'O' and int(r) == NFPA_maxs[field]:
315                             array.append(record['db_id'])
316                         elif field == 'O' and r in NFPA_maxs['O']:
317                             array.append(record['db_id'])
318
319         ## generate the output
320         # setup the NFPA grapic
321         oxstring = 'oxidizer' if 'OX' in NFPA_maxs['O'] else None
322         wstring  = 'nowater'  if 'W'  in NFPA_maxs['O'] else None
323         extra_specials = [
324             x for x in NFPA_maxs['O'] if x not in ['OX', 'W']]
325         esstring = None
326         if len(extra_specials) > 0:
327             esstring = 'special={%s}' % (
328                 ','.join([x for x in extra_specials]))
329         NFPA_maxs['special_args'] = ', '.join([
330                 x for x in [oxstring, wstring, esstring] if x != None])
331         string = """
332 \\begin{center}
333   \\Huge
334   \\firediamond{health=%(H)s, flammability=%(F)s, reactivity=%(R)s,
335                %(special_args)s}
336 \\end{center}
337 """ % NFPA_maxs
338         # now generate a list of the nasties ( Amount & ID & Name )
339         string += """
340 \\vspacer
341
342 \\contfont
343
344 \\begin{tabular}{r r l}
345 """
346         for field,name,array in zip(['H', 'F', 'R', 'O'],
347                                     ['Health', 'Fire',
348                                      'Reactivity', 'Other'],
349                                     [Healths, Fires,
350                                      Reactivities, Others]):
351             if (not hasattr(NFPA_maxs[field], '__len__')) \
352                     or len(NFPA_maxs[field]) > 0:
353                 string += "  \multicolumn{3}{c}{\Tstrut %s : %s} \\\\\n" \
354                     % (name, NFPA_maxs[field])
355             else : # Print "Other" instead of "Other : []"
356                 string += "  \multicolumn{3}{c}{\Tstrut %s} \\\\\n" \
357                     % (name)
358             for db_id in array:
359                 record = self.db.record(db_id)
360                 string += "  %s  &  %s &  %s \\\\\n" \
361                     % (self._latex_safe(record['Amount']),
362                        self._latex_safe(record['ID']),
363                        self._latex_safe(record['Name']))
364             if len(array) == 0:
365                 string += "  \multicolumn{3}{c}{ --- } \\\\\n"
366         for hazard,array in zip(['Mutagens','Carcinogens','Teratogens'],
367                                 [Mutagens, Carcinogens, Teratogens]):
368             string += "  \multicolumn{3}{c}{\Tstrut %s} \\\\\n" % (hazard)
369             for db_id in array:
370                 record = self.db.record(db_id)
371                 string += "  %s  &  %s &  %s \\\\\n" \
372                     % (self._latex_safe(record['Amount']),
373                        self._latex_safe(record['ID']),
374                        self._latex_safe(record['Name']))
375             if len(array) == 0:
376                 string += "  \multicolumn{3}{c}{ --- } \\\\\n"                
377         string += "\\end{tabular}\n"
378         print >> file(os.path.join(self.doc_root, 'door_data.tex'), 'w'), string
379         ## alter main.tex to point to the door template.
380         self._set_main_target('door_template')
381         ## run latex
382         path = self._make_pdf('door_warning.pdf')
383         return path
384
385
386 def _test():
387     import doctest
388     doctest.testmod()
389
390 if __name__ == "__main__":
391     _test()