Package rdkit :: Package Dbase :: Module StorageUtils
[hide private]
[frames] | no frames]

Source Code for Module rdkit.Dbase.StorageUtils

  1  # $Id$ 
  2  # 
  3  #  Copyright (C) 2003-2006 Rational Discovery LLC 
  4  # 
  5  #   @@ All Rights Reserved @@ 
  6  #  This file is part of the RDKit. 
  7  #  The contents are covered by the terms of the BSD license 
  8  #  which is included in the file license.txt, found at the root 
  9  #  of the RDKit source tree. 
 10  # 
 11  """ Various storage (molecular and otherwise) functionality 
 12   
 13  """ 
 14  from rdkit import RDConfig 
 15  from rdkit.Dbase import DbModule 
 16  from rdkit.Dbase.DbConnection import DbConnect 
 17   
18 -def ValidateRDId(id):
19 """ returns whether or not an RDId is valid 20 21 >>> ValidateRDId('RDCmpd-000-009-9') 22 1 23 >>> ValidateRDId('RDCmpd-009-000-009-8') 24 1 25 >>> ValidateRDId('RDCmpd-009-000-109-8') 26 0 27 >>> ValidateRDId('bogus') 28 0 29 30 """ 31 id = id.replace('_','-') 32 splitId = id.split('-') 33 if len(splitId)<4: 34 return 0 35 accum = 0 36 for entry in splitId[1:-1]: 37 for char in entry: 38 try: 39 v = int(char) 40 except: 41 return 0 42 accum += v 43 crc = int(splitId[-1]) 44 return accum%10 == crc
45
46 -def RDIdToInt(id,validate=1):
47 """ Returns the integer index for a given RDId 48 Throws a ValueError on error 49 50 >>> RDIdToInt('RDCmpd-000-009-9') 51 9 52 >>> RDIdToInt('RDCmpd-009-000-009-8') 53 9000009 54 >>> RDIdToInt('RDData_000_009_9') 55 9 56 >>> try: 57 ... RDIdToInt('RDCmpd-009-000-109-8') 58 ... except ValueError: 59 ... print 'ok' 60 ... else: 61 ... print 'failed' 62 ok 63 >>> try: 64 ... RDIdToInt('bogus') 65 ... except ValueError: 66 ... print 'ok' 67 ... else: 68 ... print 'failed' 69 ok 70 71 """ 72 if validate and not ValidateRDId(id): 73 raise ValueError("Bad RD Id") 74 id = id.replace('_','-') 75 terms = id.split('-')[1:-1] 76 res = 0 77 factor = 1 78 terms.reverse() 79 for term in terms: 80 res += factor*int(term) 81 factor *= 1000 82 return res
83 84
85 -def IndexToRDId(idx,leadText='RDCmpd'):
86 """ Converts an integer index into an RDId 87 88 The format of the ID is: 89 leadText-xxx-xxx-xxx-y 90 The number blocks are zero padded and the the final digit (y) 91 is a checksum: 92 >>> str(IndexToRDId(9)) 93 'RDCmpd-000-009-9' 94 >>> str(IndexToRDId(9009)) 95 'RDCmpd-009-009-8' 96 97 A millions block is included if it's nonzero: 98 >>> str(IndexToRDId(9000009)) 99 'RDCmpd-009-000-009-8' 100 101 The text at the beginning can be altered: 102 >>> str(IndexToRDId(9,leadText='RDAlt')) 103 'RDAlt-000-009-9' 104 105 Negative indices are errors: 106 >>> try: 107 ... IndexToRDId(-1) 108 ... except ValueError: 109 ... print 'ok' 110 ... else: 111 ... print 'failed' 112 ok 113 114 """ 115 if idx < 0: 116 raise ValueError('indices must be >= zero') 117 118 res = leadText+'-' 119 tmpIdx = idx 120 if idx>=1e6: 121 res += '%03d-'%(idx//1e6) 122 tmpIdx = idx % int(1e6) 123 if tmpIdx<1000: 124 res += '000-' 125 else: 126 res += '%03d-'%(tmpIdx//1000) 127 tmpIdx = tmpIdx % 1000 128 129 res += '%03d-'%(tmpIdx) 130 accum = 0 131 txt = str(idx) 132 for char in txt: 133 accum += int(char) 134 135 res += str(accum%10) 136 return res
137
138 -def GetNextId(conn,table,idColName='Id'):
139 """ returns the next available Id in the database 140 141 see RegisterItem for testing/documentation 142 143 """ 144 vals = conn.GetData(table=table,fields=idColName) 145 max = 0 146 for val in vals: 147 val = RDIdToInt(val[0],validate=0) 148 if val > max: max = val 149 max += 1 150 return max
151
152 -def GetNextRDId(conn,table,idColName='Id',leadText=''):
153 """ returns the next available RDId in the database 154 155 see RegisterItem for testing/documentation 156 157 """ 158 if not leadText: 159 val = conn.GetData(table=table,fields=idColName)[0][0] 160 val = val.replace('_','-') 161 leadText = val.split('-')[0] 162 163 id = GetNextId(conn,table,idColName=idColName) 164 return IndexToRDId(id,leadText=leadText)
165
166 -def RegisterItem(conn,table,value,columnName,data=None, 167 id='',idColName='Id',leadText='RDCmpd'):
168 """ 169 170 >>> dbName = RDConfig.RDTestDatabase 171 >>> conn = DbConnect(dbName) 172 >>> tblName = 'StorageTest' 173 >>> conn.AddTable(tblName,'id varchar(32) not null primary key,label varchar(40),val int') 174 >>> RegisterItem(conn,tblName,'label1','label',['label1',1])==(1, 'RDCmpd-000-001-1') 175 True 176 >>> RegisterItem(conn,tblName,'label2','label',['label2',1])==(1, 'RDCmpd-000-002-2') 177 True 178 >>> RegisterItem(conn,tblName,'label1','label',['label1',1])==(0, 'RDCmpd-000-001-1') 179 True 180 >>> str(GetNextRDId(conn,tblName)) 181 'RDCmpd-000-003-3' 182 >>> tuple(conn.GetData(table=tblName)[0])==('RDCmpd-000-001-1', 'label1', 1) 183 True 184 185 It's also possible to provide ids by hand: 186 >>> RegisterItem(conn,tblName,'label10','label',['label10',1],id='RDCmpd-000-010-1')==(1, 'RDCmpd-000-010-1') 187 True 188 >>> str(GetNextRDId(conn,tblName)) 189 'RDCmpd-000-011-2' 190 191 """ 192 curs = conn.GetCursor() 193 query = 'select %s from %s where %s=%s'%(idColName,table,columnName,DbModule.placeHolder) 194 curs.execute(query,(value,)) 195 tmp = curs.fetchone() 196 if tmp: 197 return 0,tmp[0] 198 if not id: 199 id = GetNextRDId(conn,table,idColName=idColName,leadText=leadText) 200 if data: 201 row = [id] 202 row.extend(data) 203 conn.InsertData(table,row) 204 conn.Commit() 205 return 1,id
206
207 -def RegisterItems(conn,table,values,columnName,rows, 208 startId='',idColName='Id',leadText='RDCmpd'):
209 """ 210 """ 211 if rows and len(rows) != len(values): 212 raise ValueError("length mismatch between rows and values") 213 nVals = len(values) 214 origOrder={} 215 for i,v in enumerate(values): 216 origOrder[v]=i 217 218 curs = conn.GetCursor() 219 qs = ','.join(DbModule.placeHolder*nVals) 220 curs.execute("create temporary table regitemstemp (%(columnName)s)"%locals()) 221 curs.executemany("insert into regitemstemp values (?)",[(x,) for x in values]) 222 query = 'select %(columnName)s,%(idColName)s from %(table)s where %(columnName)s in (select * from regitemstemp)'%locals() 223 curs.execute(query) 224 225 dbData = curs.fetchall() 226 if dbData and len(dbData)==nVals: 227 return 0,[x[1] for x in dbData] 228 229 if not startId: 230 startId = GetNextRDId(conn,table,idColName=idColName,leadText=leadText) 231 startId = RDIdToInt(startId) 232 ids = [None]*nVals 233 for val,id in dbData: 234 ids[origOrder[val]]=id 235 236 rowsToInsert=[] 237 for i in range(nVals): 238 if ids[i] is None: 239 id = startId 240 startId += 1 241 id = IndexToRDId(id,leadText=leadText) 242 ids[i] = id 243 if rows: 244 row = [id] 245 row.extend(rows[i]) 246 rowsToInsert.append(row) 247 if rowsToInsert: 248 nCols = len(rowsToInsert[0]) 249 qs = ','.join(DbModule.placeHolder*nCols) 250 curs.executemany('insert into %(table)s values (%(qs)s)'%locals(),rowsToInsert) 251 conn.Commit() 252 return len(values)-len(dbData),ids
253 254 255 256 257 258 #------------------------------------ 259 # 260 # doctest boilerplate 261 # 262 _roundtripTests = """ 263 >>> ValidateRDId(IndexToRDId(100)) 264 1 265 >>> ValidateRDId(IndexToRDId(10000,leadText='foo')) 266 1 267 >>> indices = [1,100,1000,1000000] 268 >>> vals = [] 269 >>> for idx in indices: 270 ... vals.append(RDIdToInt(IndexToRDId(idx))) 271 >>> vals == indices 272 1 273 274 """ 275 __test__ = {"roundtrip":_roundtripTests} 276
277 -def _test():
278 import doctest,sys 279 return doctest.testmod(sys.modules["__main__"])
280 281 if __name__ == '__main__': 282 import sys 283 failed,tried = _test() 284 sys.exit(failed) 285