4 Extend text_db with a CAS# validator, a command line interface, and document generation.
13 def valid_CASno(cas_string, debug=False):
15 Check N..NN-NN-N format, and the checksum digit for valid CAS number structure.
16 see http://www.cas.org/expertise/cascontent/registry/checkdig.html
17 for N_n .. N_4 N_3 - N_2 N_1 - R
18 R = remainder([sum_{i=1}^n i N_i ] / 10 )
19 Ignores 'na' and '+secret-non-hazardous'
20 >>> valid_CASno('107-07-3')
22 >>> valid_CASno('107-08-3')
24 >>> valid_CASno('107-083')
27 for string in ['na', '+secret-non-hazardous']:
28 # the first marks a non-existent CAS#
29 # the last marks items with secret, non-hazardous ingredients for which we have no CAS#
30 if cas_string == string:
33 # ^ matches the start of the string
34 # \Z matches the end of the string
35 regexp = re.compile('^[0-9]{2,}[-][0-9]{2}[-][0-9]\Z')
36 if regexp.match(cas_string) == None:
37 if debug : print >> stderr, "invalid CAS# format: '%s'" % cas_string
39 # generate check digit
40 casdigs = "".join(cas_string.split('-')) # remove '-'
41 sumdigs = list(casdigs[:-1])
44 for i in range(len(sumdigs)) :
45 sum += (i+1)*int(sumdigs[i])
47 if int(casdigs[-1]) == check :
50 if debug : print >> stderr, "invalid CAS# check: '%s' (expected %d)" % (cas_string, check)
53 class MSDS_manager (object) :
55 Manage Material Saftey Data Sheets (MSDSs)
57 def __init__(self, db, dir="./MSDS/") :
60 self.MIMEs = ['application/pdf',
63 self.MIME_exts = ['pdf', 'html', 'txt']
66 "Create the MSDS directory if it's missing"
67 if os.path.isdir(self.dir) :
68 return # all set to go
69 elif os.path.exists(self.dir) :
70 raise Exception, "Error: a non-directory file exists at %s" % self.dir
73 def basepath(self, id) :
74 assert type(id) == type(1), 'id must be an integer, not %s (%s)' \
76 return os.path.join(self.dir, "%d" % id)
77 def local_basepath(self, id) : # for symbolic links
78 assert type(id) == type(1), 'id must be an integer, not %s (%s)' \
81 def MIME_ext(self, mime) :
82 assert mime in self.MIMEs, \
83 "invalid MIME type '%s'\nshould be one of %s" % (mime, self.MIMEs)
84 i = self.MIMEs.index(mime)
85 ext = self.MIME_exts[i]
87 def path(self, id, mime) :
88 return "%s.%s" % (self.basepath(id), self.MIME_ext(mime))
89 def local_path(self, id, mime) :
90 return "%s.%s" % (self.local_basepath(id), self.MIME_ext(mime))
91 def save(self, id, filetext, mime='application/pdf') :
92 "Save the binary byte string FILE to the path for ID"
93 print >> file(self.path(id, mime), 'wb'), filetext,
94 def link(self, id, target_id) :
95 # target_id already exists, create a symlink to it for id.
96 target_mime = self.get_MSDS_MIME(target_id)
97 target_path = self.local_path(target_id, target_mime)
98 path = self.path(id, target_mime)
99 #os.link(self.path(target_id), self.path(id)) # hard link...
100 os.symlink(target_path, path) # ... or soft link
101 def has_MSDS_MIME(self, id, mime) :
103 >>> m = MSDS_manager(db=None)
104 >>> print m.has_MSDS_type(102, 'pdf') # test on html
106 >>> print m.has_MSDS_type(102, 'html') # test on html
108 >>> print m.has_MSDS_type(6, 'pdf') # test on pdf symlink
111 return os.path.exists(self.path(id, mime))
112 def get_MSDS_path(self, id) :
114 >>> m = MSDS_manager(db=None)
115 >>> print m.get_MSDS_path(102) # test on html
117 >>> print m.get_MSDS_path(1) # test on pdf
119 >>> print m.get_MSDS_path(6) # test on pdf symlink
122 for mime in self.MIMEs :
123 if self.has_MSDS_MIME(id, mime) :
124 return self.path(id, mime)
126 def get_MSDS_MIME(self, id) :
128 >>> m = MSDS_manager(db=None)
129 >>> print m.get_MSDS_MIME(102) # test on html
131 >>> print m.get_MSDS_MIME(1) # test on pdf
133 >>> print m.get_MSDS_MIME(6) # test on pdf symlink
136 for mime in self.MIMEs :
137 if self.has_MSDS_MIME(id, mime) :
140 def has_MSDS(self, id) :
141 if self.get_MSDS_path(id) == None :
144 def get_all(self, simlinks=True) :
146 for record in self.db.records() :
147 p = self.get_MSDS_path( int(record['ID']) )
149 if simlinks == False and os.path.islink( p ) :
150 continue # ignore the symbolic link
151 ret.append({'ID':record['ID'], 'Name':record['Name']})
154 class docgen (object) :
155 "Generate the officially required documents"
156 def __init__(self, db) :
158 def _latex_safe(self, string):
159 string = string.replace('%', '\%')
160 string = string.replace('>', '$>$')
161 string = string.replace('<', '$<$')
163 def _set_main_target(self, target):
164 print >> file('./docs/main.tex', 'w'), \
165 """\documentclass[letterpaper]{article}
169 def _make_pdf(self, target_file):
170 os.system('cd ./docs && make pdf')
171 path = os.path.join('./docs/', target_file)
172 os.system('cp ./docs/main.pdf %s' % path)
174 def inventory(self, namewidth='a') :
175 "Create a pdf list of all currently owned chemicals."
176 pp = db_pretty_printer(self.db)
178 for record in self.db.records() :
179 if record['Disposed'] == '' : # get ids for chemicals we still have
180 active_ids.append(record['db_id'])
181 active_fields = ['ID', 'Name', 'Amount',
182 'H', 'F', 'R', 'O', 'M', 'C', 'T']
184 for field in active_fields :
186 width['Name'] = namewidth
188 #string = "Chemical inventory:\t\tGenerated on %s\n\n" \
189 # % time.strftime('%Y-%m-%d')
190 #string += pp.multi_record_string(active_ids, active_fields,
191 # width=width, FS=' ')
194 string = "\\begin{longtable}{l l l c c c c c c c}\n"
195 string += ('%% The header for the remaining page(s) of the table...\n'
196 'ID & Name & Amount & H & F & R & O & M & C & T \\\\\n'
199 for db_id in active_ids :
200 record = self.db.record(db_id)
201 string += " %s & %s & %s & %s & %s & %s & %s & %s & %s & %s \\\\\n" \
202 % (self._latex_safe(record['ID']),
203 self._latex_safe(record['Name']),
204 self._latex_safe(record['Amount']),
205 self._latex_safe(record['H']),
206 self._latex_safe(record['F']),
207 self._latex_safe(record['R']),
208 self._latex_safe(record['O']),
209 self._latex_safe(record['M']),
210 self._latex_safe(record['C']),
211 self._latex_safe(record['T']))
212 string += "\\end{longtable}\n"
213 print >> file('./docs/inventory_data.tex', 'w'), string
214 ## alter main.tex to point to the inventory template.
215 self._set_main_target('inventory_template')
217 path = self._make_pdf('inventory.pdf')
219 def door_warning(self,
220 valid_record=lambda r: r['Disposed'] == '') :
221 """create a warning NFPA diamond and list of the most dangerous
222 chemicals for which valid_record(record) is true. For
223 example, to generate a door warning for the front door use
224 door_warning(lambda r: r['Disposed'] == '')
225 or to generate the warning for the fridge
226 door_warning(lambda r: r['Location'] == 'Refrigerator')
227 Note that valid_record defaults to the first example.
229 pp = db_pretty_printer(self.db)
230 all_ids = range(self.db.len_records())
232 # Search the database to find the nasties
233 NFPA_maxs = {'H':0, 'F':0, 'R':0, 'O':[]}
241 for record in self.db.records() :
242 if valid_record(record) :
243 for field in ['H', 'F', 'R', 'O'] :
245 if r != '' and r != '?' :
246 if field != 'O' and int(r) > NFPA_maxs[field] :
247 NFPA_maxs[field] = int(r)
248 elif field == 'O' and not r in NFPA_maxs['O'] :
249 NFPA_maxs[field].append(r)
250 for field,array in zip(['M','C','T'],
254 if record[field] != '' and record[field] != '?':
255 array.append(record['db_id'])
256 # now that we've found the max NFPAs,
257 # find all the chemicals at those levels
258 for record in self.db.records() :
259 if valid_record(record) :
260 for field,array in zip(['H', 'F', 'R', 'O'],
262 Reactivities, Others]) :
264 if r != '' and r != '?' :
265 if field != 'O' and int(r) == NFPA_maxs[field] :
266 array.append(record['db_id'])
267 elif field == 'O' and r in NFPA_maxs['O'] :
268 array.append(record['db_id'])
270 ## generate the output
271 # first, update the NFPA grapic code
272 if 'OX' in NFPA_maxs['O'] : OX = 'y'
274 if 'W' in NFPA_maxs['O'] : W = 'y'
276 os.system('./docs/mp/gen_NFPA.sh %d %d %d %s %s > ./docs/mp/NFPA.mp'
277 % (NFPA_maxs['H'], NFPA_maxs['F'], NFPA_maxs['R'], OX, W))
278 # now generate a list of the nasties ( Amount & ID & Name )
279 string = "\\begin{tabular}{r r l}\n"
280 for field,name,array in zip(['H', 'F', 'R', 'O'],
282 'Reactivity', 'Other'],
284 Reactivities, Others]) :
285 string += " \multicolumn{3}{c}{\Tstrut %s : %s} \\\\\n" \
286 % (name, NFPA_maxs[field])
288 record = self.db.record(db_id)
289 string += " %s & %s & %s \\\\\n" \
290 % (self._latex_safe(record['Amount']),
291 self._latex_safe(record['ID']),
292 self._latex_safe(record['Name']))
294 string += " \multicolumn{3}{c}{ --- } \\\\\n"
295 for hazard,array in zip(['Mutagens','Carcinogens','Teratogens'],
296 [Mutagens, Carcinogens, Teratogens]) :
297 string += " \multicolumn{3}{c}{\Tstrut %s} \\\\\n" % (hazard)
299 record = self.db.record(db_id)
300 string += " %s & %s & %s \\\\\n" \
301 % (self._latex_safe(record['Amount']),
302 self._latex_safe(record['ID']),
303 self._latex_safe(record['Name']))
305 string += " \multicolumn{3}{c}{ --- } \\\\\n"
306 string += "\\end{tabular}\n"
307 print >> file('./docs/door_data.tex', 'w'), string
308 ## alter main.tex to point to the door template.
309 self._set_main_target('door_template')
311 path = self._make_pdf('door_warning.pdf')
318 def open_IOfiles(ifilename=None, ofilename=None, debug=False):
320 if debug : print >> stderr, "open input file '%s'" % ifilename
321 ifile = file(ifilename, 'r')
325 if debug : print >> stderr, "open output file '%s'" % ofilename
326 ofile = file(ofilename, 'w')
329 return (ifile, ofile)
331 def close_IOfiles(ifilename=None, ifile=stdin,
332 ofilename=None, ofile=stdout,
335 if debug : print >> stderr, "close input file '%s'" % ifilename
338 if debug : print >> stderr, "close output file '%s'" % ofilename
342 if __name__ == "__main__" :
343 from optparse import OptionParser
345 parser = OptionParser(usage="usage: %prog [options]", version="%prog 0.1")
347 parser.add_option('-f', '--input-file', dest="ifilename",
348 help="Read input from FILE (default stdin)",
349 type='string', metavar="FILE")
350 parser.add_option('-o', '--output-file', dest="ofilename",
351 help="Write output to FILE (default stdout)",
352 type='string', metavar="FILE")
353 parser.add_option('-d', '--delimiter', dest="FS", # field seperator
354 help="Set field delimiter (default '%default')",
355 type='string', metavar="DELIM", default='\t')
356 parser.add_option('-p', '--print-fields', dest="print_fields",
357 help="Only print certain fields (e.g. 0,3,4,2)",
358 type='string', metavar="FIELDS")
359 parser.add_option('-r', '--print-records', dest="print_records",
360 help="Only print certain records (e.g. 0:3)",
361 type='string', metavar="RECORDS")
362 parser.add_option('-w', '--column-width', dest="width",
363 help="Set column width for short-format output.",
364 type='string', metavar="WIDTH")
365 parser.add_option('-L', '--long-format', dest="long_format",
366 help="Print long format (several lines per record)",
367 action='store_true', default=False)
368 parser.add_option('-l', '--short-format', dest="long_format",
369 help="Print short format (default) (one lines per record)",
370 action='store_false', default=False)
371 parser.add_option('-t', '--test', dest="test",
372 help="Run docutils tests on db.py",
373 action="store_true", default=False)
374 parser.add_option('-V', '--validate', dest="validate",
375 help="Validate CAS#s (no other output)",
376 action="store_true", default=False)
377 parser.add_option('-v', '--verbose', dest="verbose",
378 help="Print lots of debugging information",
379 action="store_true", default=False)
381 (options, args) = parser.parse_args()
384 ifile,ofile = open_IOfiles(options.ifilename, options.ofilename,
389 elif options.validate :
390 db = text_db(filename=None)
391 pp = db_pretty_printer(db)
393 # read in and parse the file
394 db._parse(ifile.read())
396 CAS_DELIM = ',' # seperate CAS entries for chemicals with multiple CAS numbers
397 PERCENT_DELIM = ':' # seperate CAS number from ingredient percentage
398 for record in db.records() :
401 if len(cas.split(CAS_DELIM)) == 0 : # cas = 'N...N-NN-N'
402 if not valid_CASno(cas, options.verbose) :
404 print >> ofile, "Invalid CAS# in record: '%s'" % cas
405 else : # cas = 'N...N-NN-N:X%,N...N-NN-N:Y%,...'
406 for casterm in cas.split(CAS_DELIM) : # casterm = 'N...N-NN-N:X%'
407 c = casterm.split(PERCENT_DELIM)[0] # c = 'N...N-NN-N'
408 if not valid_CASno(c, options.verbose) :
410 print >> ofile, "Invalid CAS* in record: '%s'" % c
412 print >> ofile, "in record %s: %s" % (record['ID'], record['Name'])
413 #pp.full_record_string(record)
416 db = text_db(filename=None)
418 # read in and parse the file
419 db._parse(ifile.read())
420 pp = db_pretty_printer(db)
421 if options.long_format :
422 for id in pp._norm_record_ids(options.print_records) :
423 string = pp.full_record_string_id(id)
425 # pythonize the width option
426 if (options.width != None
427 and options.width != 'a'
428 and len(options.width.split(':')) == 1
430 width = int(options.width)
431 elif len(options.width.split(':')) > 1 :
433 for kv in options.width.split(',') :
435 assert len(spl) == 2, 'invalid width "%s" in "%s"' % (kv, options.width)
437 width[spl[0]] = spl[1]
439 width[spl[0]] = int(spl[1])
441 string = pp.multi_record_string(options.print_records,
442 options.print_fields,
445 print >> ofile, string,
447 close_IOfiles(options.ifilename, ifile,
448 options.ofilename, ofile, options.verbose)