Repository created.
[chemdb.git] / chem_db.py
1 #!/usr/bin/python
2
3 """
4 Extend text_db with a CAS# validator, a command line interface, and document generation.
5 """
6
7 from text_db import *
8 import re
9 import os
10 import os.path
11 import time
12
13 def valid_CASno(cas_string, debug=False):
14     """
15     Check N..NN-NN-N format, and the checksum digit for valid CAS number structure.
16     see http://www.cas.org/expertise/cascontent/registry/checkdig.html
17     for N_n .. N_4 N_3 - N_2 N_1 - R
18     R = remainder([sum_{i=1}^n i N_i  ] / 10 )
19     Ignores 'na' and '+secret-non-hazardous'
20     >>> valid_CASno('107-07-3')
21     True
22     >>> valid_CASno('107-08-3')
23     False
24     >>> valid_CASno('107-083')
25     False
26     """
27     for string in ['na', '+secret-non-hazardous']:
28         # the first marks a non-existent CAS#
29         # the last marks items with secret, non-hazardous ingredients for which we have no CAS#
30         if cas_string == string:
31             return True
32     # check format,  
33     # ^ matches the start of the string
34     # \Z matches the end of the string
35     regexp = re.compile('^[0-9]{2,}[-][0-9]{2}[-][0-9]\Z')
36     if regexp.match(cas_string) == None:
37         if debug : print >> stderr, "invalid CAS# format: '%s'" % cas_string
38         return False
39     # generate check digit
40     casdigs = "".join(cas_string.split('-')) # remove '-'
41     sumdigs = list(casdigs[:-1])
42     sumdigs.reverse()
43     sum=0
44     for i in range(len(sumdigs)) :
45         sum += (i+1)*int(sumdigs[i])
46     check = sum % 10
47     if int(casdigs[-1]) == check :
48         return True
49     else :
50         if debug : print >> stderr, "invalid CAS# check: '%s' (expected %d)" % (cas_string, check)
51         return False
52
53 class MSDS_manager (object) :
54     """
55     Manage Material Saftey Data Sheets (MSDSs)
56     """
57     def __init__(self, db, dir="./MSDS/") :
58         self.db = db
59         self.dir = dir
60         self.MIMEs = ['application/pdf',
61                       'text/html',
62                       'text/plain']
63         self.MIME_exts = ['pdf', 'html', 'txt']
64         self.check_dir()
65     def check_dir(self) :
66         "Create the MSDS directory if it's missing"
67         if os.path.isdir(self.dir) :
68             return # all set to go
69         elif os.path.exists(self.dir) :
70             raise Exception, "Error: a non-directory file exists at %s" % self.dir
71         else :
72             os.mkdir(self.dir)
73     def basepath(self, id) :
74         assert type(id) == type(1), 'id must be an integer, not %s (%s)' \
75                                     % (type(id), str(id))
76         return os.path.join(self.dir, "%d" % id)
77     def local_basepath(self, id) : # for symbolic links
78         assert type(id) == type(1), 'id must be an integer, not %s (%s)' \
79                                     % (type(id), str(id))
80         return "./%d" % id
81     def MIME_ext(self, mime) :
82         assert mime in self.MIMEs, \
83             "invalid MIME type '%s'\nshould be one of %s" % (mime, self.MIMEs)
84         i = self.MIMEs.index(mime)
85         ext = self.MIME_exts[i]
86         return ext
87     def path(self, id, mime) :
88         return "%s.%s" % (self.basepath(id), self.MIME_ext(mime))
89     def local_path(self, id, mime) :
90         return "%s.%s" % (self.local_basepath(id), self.MIME_ext(mime))
91     def save(self, id, filetext, mime='application/pdf') :
92         "Save the binary byte string FILE to the path for ID"
93         print >> file(self.path(id, mime), 'wb'), filetext,
94     def link(self, id, target_id) :
95         # target_id already exists, create a symlink to it for id.
96         target_mime = self.get_MSDS_MIME(target_id)
97         target_path = self.local_path(target_id, target_mime)
98         path = self.path(id, target_mime)
99         #os.link(self.path(target_id), self.path(id))   # hard link...
100         os.symlink(target_path, path)                   # ... or soft link
101     def has_MSDS_MIME(self, id, mime) :
102         """
103         >>> m = MSDS_manager(db=None)
104         >>> print m.has_MSDS_type(102, 'pdf') # test on html
105         False
106         >>> print m.has_MSDS_type(102, 'html') # test on html
107         True
108         >>> print m.has_MSDS_type(6, 'pdf') # test on pdf symlink
109         True
110         """
111         return os.path.exists(self.path(id, mime))
112     def get_MSDS_path(self, id) :
113         """
114         >>> m = MSDS_manager(db=None)
115         >>> print m.get_MSDS_path(102) # test on html
116         ./MSDS/102.html
117         >>> print m.get_MSDS_path(1) # test on pdf
118         ./MSDS/1.pdf
119         >>> print m.get_MSDS_path(6) # test on pdf symlink
120         ./MSDS/6.pdf
121         """
122         for mime in self.MIMEs :
123             if self.has_MSDS_MIME(id, mime) :
124                 return self.path(id, mime)
125         return None
126     def get_MSDS_MIME(self, id) :
127         """
128         >>> m = MSDS_manager(db=None)
129         >>> print m.get_MSDS_MIME(102) # test on html
130         text/html
131         >>> print m.get_MSDS_MIME(1) # test on pdf
132         application/pdf
133         >>> print m.get_MSDS_MIME(6) # test on pdf symlink
134         application/pdf
135         """
136         for mime in self.MIMEs :
137             if self.has_MSDS_MIME(id, mime) :
138                 return mime
139         return None
140     def has_MSDS(self, id) :
141         if self.get_MSDS_path(id) == None :
142             return False
143         return True
144     def get_all(self, simlinks=True) :
145         ret = []
146         for record in self.db.records() :
147             p = self.get_MSDS_path( int(record['ID']) )
148             if p != None :
149                 if simlinks == False and os.path.islink( p ) :
150                     continue # ignore the symbolic link
151                 ret.append({'ID':record['ID'], 'Name':record['Name']})
152         return ret
153
154 class docgen (object) :
155     "Generate the officially required documents"
156     def __init__(self, db) :
157         self.db = db
158     def _latex_safe(self, string):
159         string = string.replace('%', '\%')
160         string = string.replace('>', '$>$')
161         string = string.replace('<', '$<$')
162         return string
163     def _set_main_target(self, target):
164         print >> file('./docs/main.tex', 'w'), \
165             """\documentclass[letterpaper]{article}
166
167 \input{%s}
168 """ % target
169     def _make_pdf(self, target_file):
170         os.system('cd ./docs && make pdf')
171         path = os.path.join('./docs/', target_file)
172         os.system('cp ./docs/main.pdf %s' % path)
173         return path
174     def inventory(self, namewidth='a') :
175         pp = db_pretty_printer(self.db)
176         active_ids = []
177         for record in self.db.records() :
178             if record['Disposed'] == '' : # get ids for chemicals we still have
179                 active_ids.append(record['db_id'])
180         active_fields = ['ID', 'Name', 'Amount',
181                          'H', 'F', 'R', 'O', 'M', 'C', 'T']
182         width = {}
183         for field in active_fields :
184             width[field] = 'a'
185         width['Name'] = namewidth
186         ## Plain text method
187         #string = "Chemical inventory:\t\tGenerated on %s\n\n" \
188         #         % time.strftime('%Y-%m-%d')
189         #string += pp.multi_record_string(active_ids, active_fields,
190         #                                 width=width, FS=' ')
191         # return string
192         ## Latex method
193         string = "\\begin{longtable}{l l l c c c c c c c}\n"
194         string += ('%% The header for the remaining page(s) of the table...\n'
195                    'ID & Name & Amount & H & F & R & O & M & C & T \\\\\n'
196                    '\\hline\n'
197                    '\\endhead\n')
198         for db_id in active_ids :
199             record = self.db.record(db_id)
200             string += "  %s & %s & %s & %s & %s & %s & %s & %s & %s & %s \\\\\n" \
201                       % (self._latex_safe(record['ID']),
202                          self._latex_safe(record['Name']),
203                          self._latex_safe(record['Amount']),
204                          self._latex_safe(record['H']),
205                          self._latex_safe(record['F']),
206                          self._latex_safe(record['R']),
207                          self._latex_safe(record['O']),
208                          self._latex_safe(record['M']),
209                          self._latex_safe(record['C']),
210                          self._latex_safe(record['T']))
211         string += "\\end{longtable}\n"
212         print >> file('./docs/inventory_data.tex', 'w'), string
213         ## alter main.tex to point to the inventory template.
214         self._set_main_target('inventory_template')
215         ## run latex
216         path = self._make_pdf('inventory.pdf')
217         return path
218     def door_warning(self) :
219         pp = db_pretty_printer(self.db)
220         all_ids = range(self.db.len_records())
221
222         # Search the database to find the nasties
223         NFPA_maxs = {'H':0, 'F':0, 'R':0, 'O':[]}
224         Mutagens = []
225         Carcinogens = []
226         Teratogens = []
227         Healths = []
228         Fires = []
229         Reactivities = []
230         Others = []
231         for record in self.db.records() :
232             if record['Disposed'] == '' : # chemicals we still have
233                 for field in ['H', 'F', 'R', 'O'] :
234                     r = record[field]
235                     if r != '' and r != '?' :
236                         if field != 'O' and int(r) > NFPA_maxs[field] :
237                             NFPA_maxs[field] = int(r)
238                         elif field == 'O' and not r in NFPA_maxs['O'] :
239                             NFPA_maxs[field].append(r)
240                 for field,array in zip(['M','C','T'],
241                                        [Mutagens,
242                                         Carcinogens,
243                                         Teratogens]) :
244                     if record[field] != '' and record[field] != '?':
245                         array.append(record['db_id'])
246         # now that we've found the max NFPAs,
247         # find all the chemicals at those levels
248         for record in self.db.records() :
249             if record['Disposed'] == '' : # chemicals we still have
250                 for field,array in zip(['H', 'F', 'R', 'O'],
251                                        [Healths, Fires,
252                                         Reactivities, Others]) :
253                     r = record[field]
254                     if r != '' and r != '?' :
255                         if field != 'O' and int(r) == NFPA_maxs[field] :
256                             array.append(record['db_id'])
257                         elif field == 'O' and r in NFPA_maxs['O'] :
258                             array.append(record['db_id'])
259
260         ## generate the output
261         # first, update the NFPA grapic code
262         if 'OX' in NFPA_maxs['O'] : OX = 'y'
263         else :                      OX = 'n'
264         if 'W'  in NFPA_maxs['O'] : W  = 'y'
265         else :                      W  = 'n'
266         os.system('./docs/mp/gen_NFPA.sh %d %d %d %s %s > ./docs/mp/NFPA.mp'
267                   % (NFPA_maxs['H'], NFPA_maxs['F'], NFPA_maxs['R'], OX, W))
268         # now generate a list of the nasties ( Amount & ID & Name )
269         string = "\\begin{tabular}{r r l}\n"
270         for field,name,array in zip(['H', 'F', 'R', 'O'],
271                                     ['Health', 'Fire',
272                                      'Reactivity', 'Other'],
273                                     [Healths, Fires,
274                                      Reactivities, Others]) :
275             string += "  \multicolumn{3}{c}{\Tstrut %s : %s} \\\\\n" \
276                       % (name, NFPA_maxs[field])
277             for db_id in array :
278                 record = self.db.record(db_id)
279                 string += "  %s  &  %s &  %s \\\\\n" \
280                     % (self._latex_safe(record['Amount']),
281                        self._latex_safe(record['ID']),
282                        self._latex_safe(record['Name']))
283             if len(array) == 0 :
284                 string += "  \multicolumn{3}{c}{ --- } \\\\\n"
285         for hazard,array in zip(['Mutagens','Carcinogens','Teratogens'],
286                                 [Mutagens, Carcinogens, Teratogens]) :
287             string += "  \multicolumn{3}{c}{\Tstrut %s} \\\\\n" % (hazard)
288             for db_id in array :
289                 record = self.db.record(db_id)
290                 string += "  %s  &  %s &  %s \\\\\n" \
291                     % (self._latex_safe(record['Amount']),
292                        self._latex_safe(record['ID']),
293                        self._latex_safe(record['Name']))
294             if len(array) == 0 :
295                 string += "  \multicolumn{3}{c}{ --- } \\\\\n"                
296         string += "\\end{tabular}\n"
297         print >> file('./docs/door_data.tex', 'w'), string
298         ## alter main.tex to point to the door template.
299         self._set_main_target('door_template')
300         ## run latex
301         path = self._make_pdf('door_warning.pdf')
302         return path
303
304 def _test():
305     import doctest
306     doctest.testmod()
307
308 def open_IOfiles(ifilename=None, ofilename=None, debug=False):
309     if ifilename :
310         if debug :  print >> stderr, "open input file '%s'" % ifilename
311         ifile = file(ifilename, 'r')
312     else :
313         ifile = stdin
314     if ofilename :
315         if debug :  print >> stderr, "open output file '%s'" % ofilename
316         ofile = file(ofilename, 'w')
317     else :
318         ofile = stdout    
319     return (ifile, ofile)
320
321 def close_IOfiles(ifilename=None, ifile=stdin, 
322                   ofilename=None, ofile=stdout,
323                   debug=False):
324     if ifilename :
325         if debug :  print >> stderr, "close input file '%s'" % ifilename
326         ifile.close()
327     if ofilename :
328         if debug :  print >> stderr, "close output file '%s'" % ofilename
329         ofile.close()
330
331
332 if __name__ == "__main__" :
333     from optparse import OptionParser
334
335     parser = OptionParser(usage="usage: %prog [options]", version="%prog 0.1")
336
337     parser.add_option('-f', '--input-file', dest="ifilename",
338                       help="Read input from FILE (default stdin)",
339                       type='string', metavar="FILE")
340     parser.add_option('-o', '--output-file', dest="ofilename",
341                       help="Write output to FILE (default stdout)",
342                       type='string', metavar="FILE")
343     parser.add_option('-d', '--delimiter', dest="FS", # field seperator
344                       help="Set field delimiter (default '%default')",
345                       type='string', metavar="DELIM", default='\t')
346     parser.add_option('-p', '--print-fields', dest="print_fields",
347                       help="Only print certain fields (e.g. 0,3,4,2)",
348                       type='string', metavar="FIELDS")
349     parser.add_option('-r', '--print-records', dest="print_records",
350                       help="Only print certain records (e.g. 0:3)",
351                       type='string', metavar="RECORDS")
352     parser.add_option('-w', '--column-width', dest="width",
353                       help="Set column width for short-format output.",
354                       type='string', metavar="WIDTH")
355     parser.add_option('-L', '--long-format', dest="long_format",
356                       help="Print long format (several lines per record)",
357                       action='store_true', default=False)
358     parser.add_option('-l', '--short-format', dest="long_format",
359                       help="Print short format (default) (one lines per record)",
360                       action='store_false', default=False)
361     parser.add_option('-t', '--test', dest="test",
362                       help="Run docutils tests on db.py",
363                       action="store_true", default=False)
364     parser.add_option('-V', '--validate', dest="validate",
365                       help="Validate CAS#s (no other output)",
366                       action="store_true", default=False)
367     parser.add_option('-v', '--verbose', dest="verbose",
368                       help="Print lots of debugging information",
369                       action="store_true", default=False)
370
371     (options, args) = parser.parse_args()
372     parser.destroy()
373
374     ifile,ofile = open_IOfiles(options.ifilename, options.ofilename,
375                                options.verbose)
376
377     if options.test :
378         _test()
379     elif options.validate :
380         db = text_db(filename=None)
381         pp = db_pretty_printer(db)
382
383         # read in and parse the file
384         db._parse(ifile.read())
385
386         CAS_DELIM = ',' # seperate CAS entries for chemicals with multiple CAS numbers
387         PERCENT_DELIM = ':' # seperate CAS number from ingredient percentage
388         for record in db.records() :
389             valid = True
390             cas = record['CAS#']
391             if len(cas.split(CAS_DELIM)) == 0 : # cas = 'N...N-NN-N'
392                 if not valid_CASno(cas, options.verbose) :
393                     valid = False
394                     print >> ofile, "Invalid CAS# in record: '%s'" % cas
395             else : # cas = 'N...N-NN-N:X%,N...N-NN-N:Y%,...'
396                 for casterm in cas.split(CAS_DELIM) : # casterm = 'N...N-NN-N:X%'
397                     c = casterm.split(PERCENT_DELIM)[0]   # c = 'N...N-NN-N'
398                     if not valid_CASno(c, options.verbose) :
399                         valid = False
400                         print >> ofile, "Invalid CAS* in record: '%s'" % c
401             if not valid :
402                 print >> ofile, "in record %s: %s" % (record['ID'], record['Name'])
403                 #pp.full_record_string(record)
404
405     else :
406         db = text_db(filename=None)
407
408         # read in and parse the file
409         db._parse(ifile.read())
410         pp = db_pretty_printer(db)
411         if options.long_format :
412             for id in pp._norm_record_ids(options.print_records) :
413                 string = pp.full_record_string_id(id)
414         else :
415             # pythonize the width option
416             if (options.width != None
417                 and options.width != 'a'
418                 and len(options.width.split(':')) == 1
419                 ) :
420                 width = int(options.width)
421             elif len(options.width.split(':')) > 1 :
422                 width = {}
423                 for kv in options.width.split(',') :
424                     spl = kv.split(':')
425                     assert len(spl) == 2, 'invalid width "%s" in "%s"' % (kv, options.width)
426                     if spl[1] == 'a' :
427                         width[spl[0]] = spl[1]
428                     else :
429                         width[spl[0]] = int(spl[1])
430
431             string = pp.multi_record_string(options.print_records,
432                                             options.print_fields,
433                                             width,
434                                             options.FS)
435             print >> ofile, string,
436             
437     close_IOfiles(options.ifilename, ifile,
438                   options.ofilename, ofile, options.verbose)