4 Extend text_db with a CAS# validator, a command line interface, and document generation.
14 def valid_CASno(cas_string, debug=False):
16 Check N..NN-NN-N format, and the checksum digit for valid CAS number structure.
17 see http://www.cas.org/expertise/cascontent/registry/checkdig.html
18 for N_n .. N_4 N_3 - N_2 N_1 - R
19 R = remainder([sum_{i=1}^n i N_i ] / 10 )
20 Ignores 'na' and '+secret-non-hazardous'
21 >>> valid_CASno('107-07-3')
23 >>> valid_CASno('107-08-3')
25 >>> valid_CASno('107-083')
28 for string in ['na', '+secret-non-hazardous']:
29 # the first marks a non-existent CAS#
30 # the last marks items with secret, non-hazardous ingredients for which we have no CAS#
31 if cas_string == string:
34 # ^ matches the start of the string
35 # \Z matches the end of the string
36 regexp = re.compile('^[0-9]{2,}[-][0-9]{2}[-][0-9]\Z')
37 if regexp.match(cas_string) == None:
38 if debug : print >> stderr, "invalid CAS# format: '%s'" % cas_string
40 # generate check digit
41 casdigs = "".join(cas_string.split('-')) # remove '-'
42 sumdigs = list(casdigs[:-1])
45 for i in range(len(sumdigs)) :
46 sum += (i+1)*int(sumdigs[i])
48 if int(casdigs[-1]) == check :
51 if debug : print >> stderr, "invalid CAS# check: '%s' (expected %d)" % (cas_string, check)
54 class MSDS_manager (object) :
56 Manage Material Saftey Data Sheets (MSDSs)
58 def __init__(self, db, dir="./MSDS/") :
61 self.MIMEs = ['application/pdf',
64 self.MIME_exts = ['pdf', 'html', 'txt']
67 "Create the MSDS directory if it's missing"
68 if os.path.isdir(self.dir) :
69 return # all set to go
70 elif os.path.exists(self.dir) :
71 raise Exception, "Error: a non-directory file exists at %s" % self.dir
74 def basepath(self, id) :
75 assert type(id) == types.IntType, 'id must be an integer, not %s (%s)' \
77 return os.path.join(self.dir, "%d" % id)
78 def local_basepath(self, id) : # for symbolic links
79 assert type(id) == types.IntType, 'id must be an integer, not %s (%s)' \
82 def MIME_ext(self, mime) :
83 assert mime in self.MIMEs, \
84 "invalid MIME type '%s'\nshould be one of %s" % (mime, self.MIMEs)
85 i = self.MIMEs.index(mime)
86 ext = self.MIME_exts[i]
88 def path(self, id, mime) :
89 return "%s.%s" % (self.basepath(id), self.MIME_ext(mime))
90 def local_path(self, id, mime) :
91 return "%s.%s" % (self.local_basepath(id), self.MIME_ext(mime))
92 def save(self, id, filetext, mime='application/pdf') :
93 "Save the binary byte string FILE to the path for ID"
94 print >> file(self.path(id, mime), 'wb'), filetext,
95 def link(self, id, target_id) :
96 # target_id already exists, create a symlink to it for id.
97 target_mime = self.get_MSDS_MIME(target_id)
98 target_path = self.local_path(target_id, target_mime)
99 path = self.path(id, target_mime)
100 #os.link(self.path(target_id), self.path(id)) # hard link...
101 os.symlink(target_path, path) # ... or soft link
102 def has_MSDS_MIME(self, id, mime) :
104 >>> m = MSDS_manager(db=None)
105 >>> print m.has_MSDS_type(102, 'pdf') # test on html
107 >>> print m.has_MSDS_type(102, 'html') # test on html
109 >>> print m.has_MSDS_type(6, 'pdf') # test on pdf symlink
112 return os.path.exists(self.path(id, mime))
113 def get_MSDS_path(self, id) :
115 >>> m = MSDS_manager(db=None)
116 >>> print m.get_MSDS_path(102) # test on html
118 >>> print m.get_MSDS_path(1) # test on pdf
120 >>> print m.get_MSDS_path(6) # test on pdf symlink
123 for mime in self.MIMEs :
124 if self.has_MSDS_MIME(id, mime) :
125 return self.path(id, mime)
127 def get_MSDS_MIME(self, id) :
129 >>> m = MSDS_manager(db=None)
130 >>> print m.get_MSDS_MIME(102) # test on html
132 >>> print m.get_MSDS_MIME(1) # test on pdf
134 >>> print m.get_MSDS_MIME(6) # test on pdf symlink
137 for mime in self.MIMEs :
138 if self.has_MSDS_MIME(id, mime) :
141 def has_MSDS(self, id) :
142 if self.get_MSDS_path(id) == None :
145 def get_all(self, simlinks=True) :
147 for record in self.db.records() :
148 p = self.get_MSDS_path( int(record['ID']) )
150 if simlinks == False and os.path.islink( p ) :
151 continue # ignore the symbolic link
152 ret.append({'ID':record['ID'], 'Name':record['Name']})
155 class docgen (object) :
156 "Generate the officially required documents"
157 def __init__(self, db) :
159 def _latex_safe(self, string):
160 string = string.replace('%', '\%')
161 string = string.replace('>', '$>$')
162 string = string.replace('<', '$<$')
164 def _set_main_target(self, target):
165 print >> file('./docs/main.tex', 'w'), \
166 """\documentclass[letterpaper]{article}
170 def _make_pdf(self, target_file):
171 os.system('cd ./docs && make pdf')
172 path = os.path.join('./docs/', target_file)
173 os.system('cp ./docs/main.pdf %s' % path)
175 def inventory(self, title=None,
176 namewidth='a', sort_field='db_id',
177 valid_record=lambda r: r['Disposed'] == '') :
178 """Create a pdf list of all maching chemicals. The default is to
179 match all currently owned chemicals. Matching chemicals can be sorted
180 by any field (defaults to 'ID')."""
183 pp = db_pretty_printer(self.db)
185 for record in self.db.records() :
186 if valid_record(record) : # get ids for matching chemicals
187 active_ids.append(record['db_id'])
188 active_ids.sort(cmp=lambda a,b: cmp(self.db.record(a)[sort_field],
189 self.db.record(b)[sort_field]))
190 active_fields = ['ID', 'Name', 'Amount',
191 'H', 'F', 'R', 'O', 'M', 'C', 'T']
193 for field in active_fields :
195 width['Name'] = namewidth
197 #string = "Chemical inventory:\t\tGenerated on %s\n\n" \
198 # % time.strftime('%Y-%m-%d')
199 #string += pp.multi_record_string(active_ids, active_fields,
200 # width=width, FS=' ')
203 string = "\\begin{longtable}{l l l c c c c c c c}\n"
204 string += ('%% The header for the remaining page(s) of the table...\n'
205 'ID & Name & Amount & H & F & R & O & M & C & T \\\\\n'
208 for db_id in active_ids :
209 record = self.db.record(db_id)
210 string += " %s & %s & %s & %s & %s & %s & %s & %s & %s & %s \\\\\n" \
211 % (self._latex_safe(record['ID']),
212 self._latex_safe(record['Name']),
213 self._latex_safe(record['Amount']),
214 self._latex_safe(record['H']),
215 self._latex_safe(record['F']),
216 self._latex_safe(record['R']),
217 self._latex_safe(record['O']),
218 self._latex_safe(record['M']),
219 self._latex_safe(record['C']),
220 self._latex_safe(record['T']))
221 string += "\\end{longtable}\n"
222 print >> file('./docs/inventory_title.tex', 'w'), title
223 print >> file('./docs/inventory_data.tex', 'w'), string
224 ## alter main.tex to point to the inventory template.
225 self._set_main_target('inventory_template')
227 path = self._make_pdf('inventory.pdf')
229 def door_warning(self,
230 valid_record=lambda r: r['Disposed'] == '') :
231 """create a warning NFPA diamond and list of the most dangerous
232 chemicals for which valid_record(record) is true. For
233 example, to generate a door warning for the front door use
234 door_warning(lambda r: r['Disposed'] == '')
235 or to generate the warning for the fridge
236 door_warning(lambda r: r['Location'] == 'Refrigerator')
237 Note that valid_record defaults to the first example.
239 pp = db_pretty_printer(self.db)
240 all_ids = range(self.db.len_records())
242 # Search the database to find the nasties
243 NFPA_maxs = {'H':0, 'F':0, 'R':0, 'O':[]}
251 for record in self.db.records() :
252 if valid_record(record) :
253 for field in ['H', 'F', 'R', 'O'] :
255 if r != '' and r != '?' :
256 if field != 'O' and int(r) > NFPA_maxs[field] :
257 NFPA_maxs[field] = int(r)
258 elif field == 'O' and not r in NFPA_maxs['O'] :
259 NFPA_maxs[field].append(r)
260 for field,array in zip(['M','C','T'],
264 if record[field] != '' and record[field] != '?':
265 array.append(record['db_id'])
266 # now that we've found the max NFPAs,
267 # find all the chemicals at those levels
268 for record in self.db.records() :
269 if valid_record(record) :
270 for field,array in zip(['H', 'F', 'R', 'O'],
272 Reactivities, Others]) :
274 if r != '' and r != '?' :
275 if field != 'O' and int(r) == NFPA_maxs[field] :
276 array.append(record['db_id'])
277 elif field == 'O' and r in NFPA_maxs['O'] :
278 array.append(record['db_id'])
280 ## generate the output
281 # first, update the NFPA grapic code
282 if 'OX' in NFPA_maxs['O'] : OX = 'y'
284 if 'W' in NFPA_maxs['O'] : W = 'y'
286 os.system('./docs/mp/gen_NFPA.sh %d %d %d %s %s > ./docs/mp/NFPA.mp'
287 % (NFPA_maxs['H'], NFPA_maxs['F'], NFPA_maxs['R'], OX, W))
288 # now generate a list of the nasties ( Amount & ID & Name )
289 string = "\\begin{tabular}{r r l}\n"
290 for field,name,array in zip(['H', 'F', 'R', 'O'],
292 'Reactivity', 'Other'],
294 Reactivities, Others]) :
295 if (not hasattr(NFPA_maxs[field], '__len__')) \
296 or len(NFPA_maxs[field]) > 0 :
297 string += " \multicolumn{3}{c}{\Tstrut %s : %s} \\\\\n" \
298 % (name, NFPA_maxs[field])
299 else : # Print "Other" instead of "Other : []"
300 string += " \multicolumn{3}{c}{\Tstrut %s} \\\\\n" \
303 record = self.db.record(db_id)
304 string += " %s & %s & %s \\\\\n" \
305 % (self._latex_safe(record['Amount']),
306 self._latex_safe(record['ID']),
307 self._latex_safe(record['Name']))
309 string += " \multicolumn{3}{c}{ --- } \\\\\n"
310 for hazard,array in zip(['Mutagens','Carcinogens','Teratogens'],
311 [Mutagens, Carcinogens, Teratogens]) :
312 string += " \multicolumn{3}{c}{\Tstrut %s} \\\\\n" % (hazard)
314 record = self.db.record(db_id)
315 string += " %s & %s & %s \\\\\n" \
316 % (self._latex_safe(record['Amount']),
317 self._latex_safe(record['ID']),
318 self._latex_safe(record['Name']))
320 string += " \multicolumn{3}{c}{ --- } \\\\\n"
321 string += "\\end{tabular}\n"
322 print >> file('./docs/door_data.tex', 'w'), string
323 ## alter main.tex to point to the door template.
324 self._set_main_target('door_template')
326 path = self._make_pdf('door_warning.pdf')
333 def open_IOfiles(ifilename=None, ofilename=None, debug=False):
335 if debug : print >> stderr, "open input file '%s'" % ifilename
336 ifile = file(ifilename, 'r')
340 if debug : print >> stderr, "open output file '%s'" % ofilename
341 ofile = file(ofilename, 'w')
344 return (ifile, ofile)
346 def close_IOfiles(ifilename=None, ifile=stdin,
347 ofilename=None, ofile=stdout,
350 if debug : print >> stderr, "close input file '%s'" % ifilename
353 if debug : print >> stderr, "close output file '%s'" % ofilename
357 if __name__ == '__main__' :
358 from optparse import OptionParser
360 parser = OptionParser(usage='usage: %prog [options]', version='%prog 0.1')
362 parser.add_option('-f', '--input-file', dest='ifilename',
363 help='Read input from FILE (default stdin)',
364 type='string', metavar='FILE')
365 parser.add_option('-o', '--output-file', dest='ofilename',
366 help='Write output to FILE (default stdout)',
367 type='string', metavar='FILE')
368 parser.add_option('-d', '--delimiter', dest='FS', # field seperator
369 help="Set field delimiter (default '%default')",
370 type='string', metavar='DELIM', default='\t')
371 parser.add_option('-p', '--print-fields', dest='print_fields',
372 help='Only print certain fields (e.g. 0,3,4,2)',
373 type='string', metavar='FIELDS')
374 parser.add_option('-r', '--print-records', dest='print_records',
375 help='Only print certain records (e.g. 0:3)',
376 type='string', metavar='RECORDS')
377 parser.add_option('-w', '--column-width', dest='width',
378 help='Set column width for short-format output.',
379 type='string', metavar='WIDTH')
380 parser.add_option('-L', '--long-format', dest='long_format',
381 help='Print long format (several lines per record)',
382 action='store_true', default=False)
383 parser.add_option('-l', '--short-format', dest='long_format',
384 help='Print short format (default) (one lines per record)',
385 action='store_false', default=False)
386 parser.add_option('--valid-record', dest='valid_record',
387 help="Select fields where True == lambda r : eval(EXPRESSION). default '%default'",
388 type='string', metavar='EXPRESSION', default="r['Disposed'] == ''")
389 parser.add_option('--sort-field', dest='sort_field',
390 help="Sort matching records by FIELD (defauly '%default')",
391 type='string', metavar='FIELD', default='db_id')
392 parser.add_option('--pdf-title', dest='pdf_title',
393 help='Override the default PDF title',
394 type='string', metavar='TITLE')
395 parser.add_option('--inventory', dest='inventory',
396 help='Output a PDF inventory of matching records',
397 action='store_true', default=False)
398 parser.add_option('--door-warning', dest='door_warning',
399 help='Output a PDF door warning of matching records',
400 action='store_true', default=False)
401 parser.add_option('-t', '--test', dest='test',
402 help='Run docutils tests on db.py',
403 action='store_true', default=False)
404 parser.add_option('--list-locations', dest='locations',
405 help='List all currently used locations (no other output)',
406 action='store_true', default=False)
407 parser.add_option('-V', '--validate', dest='validate',
408 help='Validate CAS#s (no other output)',
409 action='store_true', default=False)
410 parser.add_option('-A', '--audit', dest='audit',
411 help='Search for troublesome entries (no other output)',
412 action='store_true', default=False)
413 parser.add_option('-v', '--verbose', dest='verbose',
414 help='Print lots of debugging information',
415 action='store_true', default=False)
417 (options, args) = parser.parse_args()
420 ifile,ofile = open_IOfiles(options.ifilename, options.ofilename,
425 elif options.locations :
426 db = text_db(filename=None)
427 pp = db_pretty_printer(db)
429 # read in and parse the file
430 db._parse(ifile.read())
433 for record in db.records():
434 if len(record['Location']) > 0 and record['Location'] not in locations:
435 locations.append(record['Location'])
437 print >> ofile, '\n'.join(locations)
438 elif options.validate :
439 db = text_db(filename=None)
440 pp = db_pretty_printer(db)
442 # read in and parse the file
443 db._parse(ifile.read())
445 CAS_DELIM = ',' # seperate CAS entries for chemicals with multiple CAS numbers
446 PERCENT_DELIM = ':' # seperate CAS number from ingredient percentage
447 for record in db.records() :
450 if len(cas.split(CAS_DELIM)) == 0 : # cas = 'N...N-NN-N'
451 if not valid_CASno(cas, options.verbose) :
453 print >> ofile, "Invalid CAS# in record: '%s'" % cas
454 else : # cas = 'N...N-NN-N:X%,N...N-NN-N:Y%,...'
455 for casterm in cas.split(CAS_DELIM) : # casterm = 'N...N-NN-N:X%'
456 c = casterm.split(PERCENT_DELIM)[0] # c = 'N...N-NN-N'
457 if not valid_CASno(c, options.verbose) :
459 print >> ofile, "Invalid CAS* in record: '%s'" % c
461 print >> ofile, "in record %s: %s" % (record['ID'], record['Name'])
462 #pp.full_record_string(record)
464 db = text_db(filename=None)
465 pp = db_pretty_printer(db)
467 # read in and parse the file
468 db._parse(ifile.read())
470 for record in db.records():
471 # check for extra spaces
472 for key,value in record.items():
473 if type(value) in types.StringTypes and value.strip() != value:
474 print >> ofile, "Extra whitespace for %s - %s field %s : '%s'" % (record['ID'], record['Name'], key, value)
475 # make sure we know the location of all current chemicals
476 if len(record['Disposed']) == 0 and len(record['Location']) == 0:
477 print >> ofile, "Misplaced record: %s - %s" % (record['ID'], record['Name'])
478 elif options.inventory:
479 db = text_db(filename=None)
480 pp = db_pretty_printer(db)
482 # read in and parse the file
483 db._parse(ifile.read())
486 def valid_record(r) :
487 return eval(options.valid_record, # expression
488 {'__builtins__':None}, # globals
490 path = dgen.inventory(title=options.pdf_title,
492 sort_field=options.sort_field,
493 valid_record=valid_record)
494 print >> ofile, '\n', path
495 elif options.door_warning:
496 db = text_db(filename=None)
497 pp = db_pretty_printer(db)
499 # read in and parse the file
500 db._parse(ifile.read())
503 def valid_record(r) :
504 return eval(options.valid_record, # expression
505 {'__builtins__':None}, # globals
507 path = dgen.door_warning(valid_record=valid_record)
508 print >> ofile, '\n', path
510 db = text_db(filename=None)
512 # read in and parse the file
513 db._parse(ifile.read())
514 pp = db_pretty_printer(db)
515 if options.long_format :
516 for id in pp._norm_record_ids(options.print_records) :
517 string = pp.full_record_string_id(id)
519 # pythonize the width option
520 if options.width == None or options.width == 'a':
521 width = options.width
522 elif len(options.width.split(':')) == 1 :
523 width = int(options.width)
524 elif len(options.width.split(':')) > 1 :
526 for kv in options.width.split(',') :
528 assert len(spl) == 2, 'invalid width "%s" in "%s"' % (kv, options.width)
530 width[spl[0]] = spl[1]
532 width[spl[0]] = int(spl[1])
534 string = pp.multi_record_string(options.print_records,
535 options.print_fields,
538 print >> ofile, string,
540 close_IOfiles(options.ifilename, ifile,
541 options.ofilename, ofile, options.verbose)