DAMASK_EICMD/lib/damask/asciitable.py

372 lines
14 KiB
Python
Raw Normal View History

# -*- coding: UTF-8 no BOM -*-
# $Id$
import os,sys
import numpy as np
2011-12-14 01:32:26 +05:30
class ASCIItable():
'''
There should be a doc string here :)
'''
2011-12-14 01:32:26 +05:30
__slots__ = ['__IO__',
'info',
'labels',
'data',
]
# ------------------------------------------------------------------
2011-12-14 01:32:26 +05:30
def __init__(self,
fileIn = sys.stdin,
fileOut = sys.stdout,
buffered = False, # flush writes
labels = True): # assume table has labels
2011-12-14 01:32:26 +05:30
self.__IO__ = {'in': fileIn,
'out':fileOut,
'output':[],
'buffered':buffered,
'labels':labels,
2011-12-14 01:32:26 +05:30
'validReadSize': 0,
'readBuffer': [], # buffer to hold non-advancing reads
'dataStart': 0,
2011-12-14 01:32:26 +05:30
}
self.info = []
self.labels = []
self.data = []
# ------------------------------------------------------------------
def _transliterateToFloat(self,x):
try:
return float(x)
except:
return 0.0
# ------------------------------------------------------------------
def close(self,dismiss = False):
self.input_close()
self.output_close(dismiss)
# ------------------------------------------------------------------
def input_close(self):
self.__IO__['in'].close()
# ------------------------------------------------------------------
2011-12-14 01:32:26 +05:30
def output_write(self,
what):
'''
aggregate a single row (string) or list of (possibly containing further lists of) rows into output
'''
if not isinstance(what, (str, unicode)):
try:
for item in what: self.output_write(item)
except:
self.__IO__['output'] += [str(what)]
2011-12-14 01:32:26 +05:30
else:
self.__IO__['output'] += [what]
return self.__IO__['buffered'] or self.output_flush()
2011-12-14 01:32:26 +05:30
# ------------------------------------------------------------------
2011-12-14 01:32:26 +05:30
def output_flush(self,
clear = True):
try:
self.__IO__['output'] == [] or self.__IO__['out'].write('\n'.join(self.__IO__['output']) + '\n')
except(IOError) as e:
return False
2011-12-14 01:32:26 +05:30
if clear: self.output_clear()
return True
# ------------------------------------------------------------------
2011-12-14 01:32:26 +05:30
def output_clear(self):
self.__IO__['output'] = []
# ------------------------------------------------------------------
def output_close(self, dismiss = False):
self.__IO__['out'].close()
if dismiss: os.remove(self.__IO__['out'].name)
# ------------------------------------------------------------------
2011-12-14 01:32:26 +05:30
def head_read(self):
'''
get column labels by either read the first row, or
--if keyword "head[*]" is present-- the last line of the header
'''
2011-12-18 21:19:44 +05:30
import re
2011-12-14 01:32:26 +05:30
try:
self.__IO__['in'].seek(0)
except:
pass
firstline = self.__IO__['in'].readline()
m = re.search('(\d+)\s+head', firstline.lower())
if self.__IO__['labels']: # table features labels
if m: # found header info
self.info = [self.__IO__['in'].readline().strip() for i in xrange(1,int(m.group(1)))]
self.labels = self.__IO__['in'].readline().split()
else: # no header info (but labels)
self.labels = firstline.split()
self.__IO__['validReadSize'] = len(self.labels)
else: # no labels present in table
if m: # found header info
self.info = [self.__IO__['in'].readline().strip() for i in xrange(0,int(m.group(1)))] # all header is info
# ... without any labels
try:
self.__IO__['dataStart'] = self.__IO__['in'].tell() # current file position is at start of data
except(IOError):
pass
2011-12-14 01:32:26 +05:30
if self.__IO__['validReadSize'] == 0: # in case no valid data length is known
self.data_read(advance = False)
self.__IO__['validReadSize'] = len(self.data) # assume constant data width from first line
# ------------------------------------------------------------------
2011-12-14 01:32:26 +05:30
def head_write(self):
'''
write current header information (info + labels)
'''
if self.__IO__['labels']:
return self.output_write ([
'%i\theader'%(len(self.info)+1),
self.info,
'\t'.join(self.labels),
])
else:
return self.output_write ([
'%i\theader'%(len(self.info)),
self.info,
])
2011-12-14 01:32:26 +05:30
# ------------------------------------------------------------------
2011-12-14 01:32:26 +05:30
def labels_append(self,
what):
'''
add item or list to existing set of labels (and switch on labeling)
'''
if not isinstance(what, (str, unicode)):
try:
for item in what: self.labels_append(item)
except:
self.labels += [str(what)]
else:
self.labels += [what]
2011-12-14 01:32:26 +05:30
self.__IO__['labels'] = True # switch on processing (in particular writing) of labels
# ------------------------------------------------------------------
def labels_clear(self):
'''
delete existing labels and switch to no labeling
'''
self.labels = []
self.__IO__['labels'] = False
# ------------------------------------------------------------------
def label_index(self,
labels):
'''
tell index of column label(s).
return numpy array if asked for list of labels.
transparently deals with label positions implicitly given as numbers or their headings given as strings.
'''
if isinstance(labels,list): # check whether list of labels is requested
idx = []
for label in labels:
if label != None:
try:
idx.append(int(label)) # column given as integer number?
except ValueError:
try:
idx.append(self.labels.index(label)) # locate string in label list
except ValueError:
idx.append(-1) # not found...
else:
try:
idx = int(labels)
except ValueError:
try:
idx = self.labels.index(labels)
except(ValueError):
idx = None if labels == None else -1
return np.array(idx) if isinstance(idx,list) else idx
# ------------------------------------------------------------------
def labels_dimension(self,
labels):
'''
tell dimension (length) of column label(s).
return numpy array if asked for list of labels.
transparently deals with label positions implicitly given as numbers or their headings given as strings.
'''
if isinstance(labels,list): # check whether list of labels is requested
dim = []
for label in labels:
if label != None:
try:
idx.append(int(label)) # column given as integer number?
except ValueError:
try:
idx.append(self.labels.index(label)) # locate string in label list
except ValueError:
idx.append(-1) # not found...
else:
try:
idx = int(labels)
except ValueError:
try:
idx = self.labels.index(labels)
except(ValueError):
idx = None if labels == None else -1
return np.array(idx) if isinstance(idx,list) else idx
# ------------------------------------------------------------------
2011-12-14 01:32:26 +05:30
def info_append(self,
what):
'''
add item or list to existing set of infos
'''
if not isinstance(what, (str, unicode)):
try:
for item in what: self.info_append(item)
except:
self.info += [str(what)]
else:
self.info += [what]
2011-12-14 01:32:26 +05:30
# ------------------------------------------------------------------
def info_clear(self):
'''
delete any info block
'''
self.info = []
# ------------------------------------------------------------------
def data_rewind(self):
self.__IO__['in'].seek(self.__IO__['dataStart']) # position file to start of data section
self.__IO__['readBuffer'] = [] # delete any non-advancing data reads
# ------------------------------------------------------------------
def data_skipLines(self,count):
'''
wind forward by count number of lines
'''
for i in xrange(count):
alive = self.data_read()
return alive
# ------------------------------------------------------------------
def data_read(self,advance = True):
'''
read next line (possibly buffered) and parse it into data array
'''
if len(self.__IO__['readBuffer']) > 0:
line = self.__IO__['readBuffer'].pop(0) # take buffered content
else:
line = self.__IO__['in'].readline() # get next data row from file
if not advance:
self.__IO__['readBuffer'].append(line) # keep line just read in buffer
if self.__IO__['labels']:
items = line.split()[:self.__IO__['validReadSize']] # use up to valid size (label count)
self.data = items if len(items) == self.__IO__['validReadSize'] else [] # take if correct number of entries
else:
self.data = line.split() # take all
return self.data != []
# ------------------------------------------------------------------
def data_readLine(self,line):
'''
seek beginning of data and wind forward to selected line
'''
self.__IO__['in'].seek(self.__IO__['dataStart'])
for i in xrange(line-1):
self.__IO__['in'].readline()
self.data_read()
# ------------------------------------------------------------------
def data_readArray(self,
labels = []):
'''
read whole data of all (given) labels as numpy array
'''
if not isinstance(labels,list):
labels = [labels]
if labels == [None] or labels == []:
use = np.arange(self.__IO__['validReadSize']) # use all columns (and keep labels intact)
labels_missing = []
else:
indices = self.label_index(labels) # check requested labels
present = np.where(indices >= 0)[0] # positions in request list of labels that are present ...
missing = np.where(indices < 0)[0] # ... and missing in table
labels_missing = np.array( labels) [missing] # corresponding labels ...
self.labels = list(np.array(self.labels)[indices[present]]) # ... for missing and present columns
self.__IO__['validReadSize'] = len(present) # update data width
use = indices[present]
try:
self.data_rewind() # try to wind back to start of data
except:
pass # assume/hope we are at data start already...
self.data = np.loadtxt(self.__IO__['in'], usecols=use,ndmin=2)
return labels_missing
# ------------------------------------------------------------------
def data_write(self,delimiter = '\t'):
'''
write current data array and report alive output back
'''
if len(self.data) == 0: return True
2011-12-14 01:32:26 +05:30
if isinstance(self.data[0],list):
return self.output_write([delimiter.join(map(str,items)) for items in self.data])
2011-12-14 01:32:26 +05:30
else:
return self.output_write(delimiter.join(map(str,self.data)))
2011-12-14 01:32:26 +05:30
# ------------------------------------------------------------------
def data_writeArray(self,format = '%g',delimiter = '\t'):
'''
write whole numpy array data
'''
return np.savetxt(self.__IO__['out'],self.data,fmt = format,delimiter = delimiter)
# ------------------------------------------------------------------
2011-12-14 01:32:26 +05:30
def data_append(self,
what):
if not isinstance(what, (str, unicode)):
try:
for item in what: self.data_append(item)
except:
self.data += [str(what)]
else:
self.data += [what]
# ------------------------------------------------------------------
def data_set(self,
what,where):
idx = -1
try:
idx = self.labels.index(where)
if len(self.data) <= idx:
self.data_append(['n/a' for i in xrange(idx+1-len(self.data))]) # grow data if too short
self.data[idx] = str(what)
except(ValueError):
pass
return idx
# ------------------------------------------------------------------
def data_clear(self):
self.data = []
# ------------------------------------------------------------------
def data_asFloat(self):
return map(self._transliterateToFloat,self.data)