Added valid_record option to door_warning()
[chemdb.git] / chem_db.py
1 #!/usr/bin/python
2
3 """
4 Extend text_db with a CAS# validator, a command line interface, and document generation.
5 """
6
7 from text_db import *
8 import re
9 import os
10 import os.path
11 import time
12
13 def valid_CASno(cas_string, debug=False):
14     """
15     Check N..NN-NN-N format, and the checksum digit for valid CAS number structure.
16     see http://www.cas.org/expertise/cascontent/registry/checkdig.html
17     for N_n .. N_4 N_3 - N_2 N_1 - R
18     R = remainder([sum_{i=1}^n i N_i  ] / 10 )
19     Ignores 'na' and '+secret-non-hazardous'
20     >>> valid_CASno('107-07-3')
21     True
22     >>> valid_CASno('107-08-3')
23     False
24     >>> valid_CASno('107-083')
25     False
26     """
27     for string in ['na', '+secret-non-hazardous']:
28         # the first marks a non-existent CAS#
29         # the last marks items with secret, non-hazardous ingredients for which we have no CAS#
30         if cas_string == string:
31             return True
32     # check format,  
33     # ^ matches the start of the string
34     # \Z matches the end of the string
35     regexp = re.compile('^[0-9]{2,}[-][0-9]{2}[-][0-9]\Z')
36     if regexp.match(cas_string) == None:
37         if debug : print >> stderr, "invalid CAS# format: '%s'" % cas_string
38         return False
39     # generate check digit
40     casdigs = "".join(cas_string.split('-')) # remove '-'
41     sumdigs = list(casdigs[:-1])
42     sumdigs.reverse()
43     sum=0
44     for i in range(len(sumdigs)) :
45         sum += (i+1)*int(sumdigs[i])
46     check = sum % 10
47     if int(casdigs[-1]) == check :
48         return True
49     else :
50         if debug : print >> stderr, "invalid CAS# check: '%s' (expected %d)" % (cas_string, check)
51         return False
52
53 class MSDS_manager (object) :
54     """
55     Manage Material Saftey Data Sheets (MSDSs)
56     """
57     def __init__(self, db, dir="./MSDS/") :
58         self.db = db
59         self.dir = dir
60         self.MIMEs = ['application/pdf',
61                       'text/html',
62                       'text/plain']
63         self.MIME_exts = ['pdf', 'html', 'txt']
64         self.check_dir()
65     def check_dir(self) :
66         "Create the MSDS directory if it's missing"
67         if os.path.isdir(self.dir) :
68             return # all set to go
69         elif os.path.exists(self.dir) :
70             raise Exception, "Error: a non-directory file exists at %s" % self.dir
71         else :
72             os.mkdir(self.dir)
73     def basepath(self, id) :
74         assert type(id) == type(1), 'id must be an integer, not %s (%s)' \
75                                     % (type(id), str(id))
76         return os.path.join(self.dir, "%d" % id)
77     def local_basepath(self, id) : # for symbolic links
78         assert type(id) == type(1), 'id must be an integer, not %s (%s)' \
79                                     % (type(id), str(id))
80         return "./%d" % id
81     def MIME_ext(self, mime) :
82         assert mime in self.MIMEs, \
83             "invalid MIME type '%s'\nshould be one of %s" % (mime, self.MIMEs)
84         i = self.MIMEs.index(mime)
85         ext = self.MIME_exts[i]
86         return ext
87     def path(self, id, mime) :
88         return "%s.%s" % (self.basepath(id), self.MIME_ext(mime))
89     def local_path(self, id, mime) :
90         return "%s.%s" % (self.local_basepath(id), self.MIME_ext(mime))
91     def save(self, id, filetext, mime='application/pdf') :
92         "Save the binary byte string FILE to the path for ID"
93         print >> file(self.path(id, mime), 'wb'), filetext,
94     def link(self, id, target_id) :
95         # target_id already exists, create a symlink to it for id.
96         target_mime = self.get_MSDS_MIME(target_id)
97         target_path = self.local_path(target_id, target_mime)
98         path = self.path(id, target_mime)
99         #os.link(self.path(target_id), self.path(id))   # hard link...
100         os.symlink(target_path, path)                   # ... or soft link
101     def has_MSDS_MIME(self, id, mime) :
102         """
103         >>> m = MSDS_manager(db=None)
104         >>> print m.has_MSDS_type(102, 'pdf') # test on html
105         False
106         >>> print m.has_MSDS_type(102, 'html') # test on html
107         True
108         >>> print m.has_MSDS_type(6, 'pdf') # test on pdf symlink
109         True
110         """
111         return os.path.exists(self.path(id, mime))
112     def get_MSDS_path(self, id) :
113         """
114         >>> m = MSDS_manager(db=None)
115         >>> print m.get_MSDS_path(102) # test on html
116         ./MSDS/102.html
117         >>> print m.get_MSDS_path(1) # test on pdf
118         ./MSDS/1.pdf
119         >>> print m.get_MSDS_path(6) # test on pdf symlink
120         ./MSDS/6.pdf
121         """
122         for mime in self.MIMEs :
123             if self.has_MSDS_MIME(id, mime) :
124                 return self.path(id, mime)
125         return None
126     def get_MSDS_MIME(self, id) :
127         """
128         >>> m = MSDS_manager(db=None)
129         >>> print m.get_MSDS_MIME(102) # test on html
130         text/html
131         >>> print m.get_MSDS_MIME(1) # test on pdf
132         application/pdf
133         >>> print m.get_MSDS_MIME(6) # test on pdf symlink
134         application/pdf
135         """
136         for mime in self.MIMEs :
137             if self.has_MSDS_MIME(id, mime) :
138                 return mime
139         return None
140     def has_MSDS(self, id) :
141         if self.get_MSDS_path(id) == None :
142             return False
143         return True
144     def get_all(self, simlinks=True) :
145         ret = []
146         for record in self.db.records() :
147             p = self.get_MSDS_path( int(record['ID']) )
148             if p != None :
149                 if simlinks == False and os.path.islink( p ) :
150                     continue # ignore the symbolic link
151                 ret.append({'ID':record['ID'], 'Name':record['Name']})
152         return ret
153
154 class docgen (object) :
155     "Generate the officially required documents"
156     def __init__(self, db) :
157         self.db = db
158     def _latex_safe(self, string):
159         string = string.replace('%', '\%')
160         string = string.replace('>', '$>$')
161         string = string.replace('<', '$<$')
162         return string
163     def _set_main_target(self, target):
164         print >> file('./docs/main.tex', 'w'), \
165             """\documentclass[letterpaper]{article}
166
167 \input{%s}
168 """ % target
169     def _make_pdf(self, target_file):
170         os.system('cd ./docs && make pdf')
171         path = os.path.join('./docs/', target_file)
172         os.system('cp ./docs/main.pdf %s' % path)
173         return path
174     def inventory(self, namewidth='a') :
175         "Create a pdf list of all currently owned chemicals."
176         pp = db_pretty_printer(self.db)
177         active_ids = []
178         for record in self.db.records() :
179             if record['Disposed'] == '' : # get ids for chemicals we still have
180                 active_ids.append(record['db_id'])
181         active_fields = ['ID', 'Name', 'Amount',
182                          'H', 'F', 'R', 'O', 'M', 'C', 'T']
183         width = {}
184         for field in active_fields :
185             width[field] = 'a'
186         width['Name'] = namewidth
187         ## Plain text method
188         #string = "Chemical inventory:\t\tGenerated on %s\n\n" \
189         #         % time.strftime('%Y-%m-%d')
190         #string += pp.multi_record_string(active_ids, active_fields,
191         #                                 width=width, FS=' ')
192         # return string
193         ## Latex method
194         string = "\\begin{longtable}{l l l c c c c c c c}\n"
195         string += ('%% The header for the remaining page(s) of the table...\n'
196                    'ID & Name & Amount & H & F & R & O & M & C & T \\\\\n'
197                    '\\hline\n'
198                    '\\endhead\n')
199         for db_id in active_ids :
200             record = self.db.record(db_id)
201             string += "  %s & %s & %s & %s & %s & %s & %s & %s & %s & %s \\\\\n" \
202                       % (self._latex_safe(record['ID']),
203                          self._latex_safe(record['Name']),
204                          self._latex_safe(record['Amount']),
205                          self._latex_safe(record['H']),
206                          self._latex_safe(record['F']),
207                          self._latex_safe(record['R']),
208                          self._latex_safe(record['O']),
209                          self._latex_safe(record['M']),
210                          self._latex_safe(record['C']),
211                          self._latex_safe(record['T']))
212         string += "\\end{longtable}\n"
213         print >> file('./docs/inventory_data.tex', 'w'), string
214         ## alter main.tex to point to the inventory template.
215         self._set_main_target('inventory_template')
216         ## run latex
217         path = self._make_pdf('inventory.pdf')
218         return path
219     def door_warning(self,
220                      valid_record=lambda r: r['Disposed'] == '') :
221         """create a warning NFPA diamond and list of the most dangerous
222         chemicals for which valid_record(record) is true.  For
223         example, to generate a door warning for the front door use
224           door_warning(lambda r: r['Disposed'] == '')
225         or to generate the warning for the fridge
226           door_warning(lambda r: r['Location'] == 'Refrigerator')
227         Note that valid_record defaults to the first example.
228         """
229         pp = db_pretty_printer(self.db)
230         all_ids = range(self.db.len_records())
231
232         # Search the database to find the nasties
233         NFPA_maxs = {'H':0, 'F':0, 'R':0, 'O':[]}
234         Mutagens = []
235         Carcinogens = []
236         Teratogens = []
237         Healths = []
238         Fires = []
239         Reactivities = []
240         Others = []
241         for record in self.db.records() :
242             if valid_record(record) :
243                 for field in ['H', 'F', 'R', 'O'] :
244                     r = record[field]
245                     if r != '' and r != '?' :
246                         if field != 'O' and int(r) > NFPA_maxs[field] :
247                             NFPA_maxs[field] = int(r)
248                         elif field == 'O' and not r in NFPA_maxs['O'] :
249                             NFPA_maxs[field].append(r)
250                 for field,array in zip(['M','C','T'],
251                                        [Mutagens,
252                                         Carcinogens,
253                                         Teratogens]) :
254                     if record[field] != '' and record[field] != '?':
255                         array.append(record['db_id'])
256         # now that we've found the max NFPAs,
257         # find all the chemicals at those levels
258         for record in self.db.records() :
259             if valid_record(record) :
260                 for field,array in zip(['H', 'F', 'R', 'O'],
261                                        [Healths, Fires,
262                                         Reactivities, Others]) :
263                     r = record[field]
264                     if r != '' and r != '?' :
265                         if field != 'O' and int(r) == NFPA_maxs[field] :
266                             array.append(record['db_id'])
267                         elif field == 'O' and r in NFPA_maxs['O'] :
268                             array.append(record['db_id'])
269
270         ## generate the output
271         # first, update the NFPA grapic code
272         if 'OX' in NFPA_maxs['O'] : OX = 'y'
273         else :                      OX = 'n'
274         if 'W'  in NFPA_maxs['O'] : W  = 'y'
275         else :                      W  = 'n'
276         os.system('./docs/mp/gen_NFPA.sh %d %d %d %s %s > ./docs/mp/NFPA.mp'
277                   % (NFPA_maxs['H'], NFPA_maxs['F'], NFPA_maxs['R'], OX, W))
278         # now generate a list of the nasties ( Amount & ID & Name )
279         string = "\\begin{tabular}{r r l}\n"
280         for field,name,array in zip(['H', 'F', 'R', 'O'],
281                                     ['Health', 'Fire',
282                                      'Reactivity', 'Other'],
283                                     [Healths, Fires,
284                                      Reactivities, Others]) :
285             string += "  \multicolumn{3}{c}{\Tstrut %s : %s} \\\\\n" \
286                       % (name, NFPA_maxs[field])
287             for db_id in array :
288                 record = self.db.record(db_id)
289                 string += "  %s  &  %s &  %s \\\\\n" \
290                     % (self._latex_safe(record['Amount']),
291                        self._latex_safe(record['ID']),
292                        self._latex_safe(record['Name']))
293             if len(array) == 0 :
294                 string += "  \multicolumn{3}{c}{ --- } \\\\\n"
295         for hazard,array in zip(['Mutagens','Carcinogens','Teratogens'],
296                                 [Mutagens, Carcinogens, Teratogens]) :
297             string += "  \multicolumn{3}{c}{\Tstrut %s} \\\\\n" % (hazard)
298             for db_id in array :
299                 record = self.db.record(db_id)
300                 string += "  %s  &  %s &  %s \\\\\n" \
301                     % (self._latex_safe(record['Amount']),
302                        self._latex_safe(record['ID']),
303                        self._latex_safe(record['Name']))
304             if len(array) == 0 :
305                 string += "  \multicolumn{3}{c}{ --- } \\\\\n"                
306         string += "\\end{tabular}\n"
307         print >> file('./docs/door_data.tex', 'w'), string
308         ## alter main.tex to point to the door template.
309         self._set_main_target('door_template')
310         ## run latex
311         path = self._make_pdf('door_warning.pdf')
312         return path
313
314 def _test():
315     import doctest
316     doctest.testmod()
317
318 def open_IOfiles(ifilename=None, ofilename=None, debug=False):
319     if ifilename :
320         if debug :  print >> stderr, "open input file '%s'" % ifilename
321         ifile = file(ifilename, 'r')
322     else :
323         ifile = stdin
324     if ofilename :
325         if debug :  print >> stderr, "open output file '%s'" % ofilename
326         ofile = file(ofilename, 'w')
327     else :
328         ofile = stdout    
329     return (ifile, ofile)
330
331 def close_IOfiles(ifilename=None, ifile=stdin, 
332                   ofilename=None, ofile=stdout,
333                   debug=False):
334     if ifilename :
335         if debug :  print >> stderr, "close input file '%s'" % ifilename
336         ifile.close()
337     if ofilename :
338         if debug :  print >> stderr, "close output file '%s'" % ofilename
339         ofile.close()
340
341
342 if __name__ == "__main__" :
343     from optparse import OptionParser
344
345     parser = OptionParser(usage="usage: %prog [options]", version="%prog 0.1")
346
347     parser.add_option('-f', '--input-file', dest="ifilename",
348                       help="Read input from FILE (default stdin)",
349                       type='string', metavar="FILE")
350     parser.add_option('-o', '--output-file', dest="ofilename",
351                       help="Write output to FILE (default stdout)",
352                       type='string', metavar="FILE")
353     parser.add_option('-d', '--delimiter', dest="FS", # field seperator
354                       help="Set field delimiter (default '%default')",
355                       type='string', metavar="DELIM", default='\t')
356     parser.add_option('-p', '--print-fields', dest="print_fields",
357                       help="Only print certain fields (e.g. 0,3,4,2)",
358                       type='string', metavar="FIELDS")
359     parser.add_option('-r', '--print-records', dest="print_records",
360                       help="Only print certain records (e.g. 0:3)",
361                       type='string', metavar="RECORDS")
362     parser.add_option('-w', '--column-width', dest="width",
363                       help="Set column width for short-format output.",
364                       type='string', metavar="WIDTH")
365     parser.add_option('-L', '--long-format', dest="long_format",
366                       help="Print long format (several lines per record)",
367                       action='store_true', default=False)
368     parser.add_option('-l', '--short-format', dest="long_format",
369                       help="Print short format (default) (one lines per record)",
370                       action='store_false', default=False)
371     parser.add_option('-t', '--test', dest="test",
372                       help="Run docutils tests on db.py",
373                       action="store_true", default=False)
374     parser.add_option('-V', '--validate', dest="validate",
375                       help="Validate CAS#s (no other output)",
376                       action="store_true", default=False)
377     parser.add_option('-v', '--verbose', dest="verbose",
378                       help="Print lots of debugging information",
379                       action="store_true", default=False)
380
381     (options, args) = parser.parse_args()
382     parser.destroy()
383
384     ifile,ofile = open_IOfiles(options.ifilename, options.ofilename,
385                                options.verbose)
386
387     if options.test :
388         _test()
389     elif options.validate :
390         db = text_db(filename=None)
391         pp = db_pretty_printer(db)
392
393         # read in and parse the file
394         db._parse(ifile.read())
395
396         CAS_DELIM = ',' # seperate CAS entries for chemicals with multiple CAS numbers
397         PERCENT_DELIM = ':' # seperate CAS number from ingredient percentage
398         for record in db.records() :
399             valid = True
400             cas = record['CAS#']
401             if len(cas.split(CAS_DELIM)) == 0 : # cas = 'N...N-NN-N'
402                 if not valid_CASno(cas, options.verbose) :
403                     valid = False
404                     print >> ofile, "Invalid CAS# in record: '%s'" % cas
405             else : # cas = 'N...N-NN-N:X%,N...N-NN-N:Y%,...'
406                 for casterm in cas.split(CAS_DELIM) : # casterm = 'N...N-NN-N:X%'
407                     c = casterm.split(PERCENT_DELIM)[0]   # c = 'N...N-NN-N'
408                     if not valid_CASno(c, options.verbose) :
409                         valid = False
410                         print >> ofile, "Invalid CAS* in record: '%s'" % c
411             if not valid :
412                 print >> ofile, "in record %s: %s" % (record['ID'], record['Name'])
413                 #pp.full_record_string(record)
414
415     else :
416         db = text_db(filename=None)
417
418         # read in and parse the file
419         db._parse(ifile.read())
420         pp = db_pretty_printer(db)
421         if options.long_format :
422             for id in pp._norm_record_ids(options.print_records) :
423                 string = pp.full_record_string_id(id)
424         else :
425             # pythonize the width option
426             if (options.width != None
427                 and options.width != 'a'
428                 and len(options.width.split(':')) == 1
429                 ) :
430                 width = int(options.width)
431             elif len(options.width.split(':')) > 1 :
432                 width = {}
433                 for kv in options.width.split(',') :
434                     spl = kv.split(':')
435                     assert len(spl) == 2, 'invalid width "%s" in "%s"' % (kv, options.width)
436                     if spl[1] == 'a' :
437                         width[spl[0]] = spl[1]
438                     else :
439                         width[spl[0]] = int(spl[1])
440
441             string = pp.multi_record_string(options.print_records,
442                                             options.print_fields,
443                                             width,
444                                             options.FS)
445             print >> ofile, string,
446             
447     close_IOfiles(options.ifilename, ifile,
448                   options.ofilename, ofile, options.verbose)