1
2
3
4
5
6
7
8
9
10
11 """ Various storage (molecular and otherwise) functionality
12
13 """
14 from rdkit import RDConfig
15 from rdkit.Dbase import DbModule
16 from rdkit.Dbase.DbConnection import DbConnect
17
19 """ returns whether or not an RDId is valid
20
21 >>> ValidateRDId('RDCmpd-000-009-9')
22 1
23 >>> ValidateRDId('RDCmpd-009-000-009-8')
24 1
25 >>> ValidateRDId('RDCmpd-009-000-109-8')
26 0
27 >>> ValidateRDId('bogus')
28 0
29
30 """
31 id = id.replace('_','-')
32 splitId = id.split('-')
33 if len(splitId)<4:
34 return 0
35 accum = 0
36 for entry in splitId[1:-1]:
37 for char in entry:
38 try:
39 v = int(char)
40 except:
41 return 0
42 accum += v
43 crc = int(splitId[-1])
44 return accum%10 == crc
45
47 """ Returns the integer index for a given RDId
48 Throws a ValueError on error
49
50 >>> RDIdToInt('RDCmpd-000-009-9')
51 9
52 >>> RDIdToInt('RDCmpd-009-000-009-8')
53 9000009
54 >>> RDIdToInt('RDData_000_009_9')
55 9
56 >>> try:
57 ... RDIdToInt('RDCmpd-009-000-109-8')
58 ... except ValueError:
59 ... print 'ok'
60 ... else:
61 ... print 'failed'
62 ok
63 >>> try:
64 ... RDIdToInt('bogus')
65 ... except ValueError:
66 ... print 'ok'
67 ... else:
68 ... print 'failed'
69 ok
70
71 """
72 if validate and not ValidateRDId(id):
73 raise ValueError("Bad RD Id")
74 id = id.replace('_','-')
75 terms = id.split('-')[1:-1]
76 res = 0
77 factor = 1
78 terms.reverse()
79 for term in terms:
80 res += factor*int(term)
81 factor *= 1000
82 return res
83
84
86 """ Converts an integer index into an RDId
87
88 The format of the ID is:
89 leadText-xxx-xxx-xxx-y
90 The number blocks are zero padded and the the final digit (y)
91 is a checksum:
92 >>> str(IndexToRDId(9))
93 'RDCmpd-000-009-9'
94 >>> str(IndexToRDId(9009))
95 'RDCmpd-009-009-8'
96
97 A millions block is included if it's nonzero:
98 >>> str(IndexToRDId(9000009))
99 'RDCmpd-009-000-009-8'
100
101 The text at the beginning can be altered:
102 >>> str(IndexToRDId(9,leadText='RDAlt'))
103 'RDAlt-000-009-9'
104
105 Negative indices are errors:
106 >>> try:
107 ... IndexToRDId(-1)
108 ... except ValueError:
109 ... print 'ok'
110 ... else:
111 ... print 'failed'
112 ok
113
114 """
115 if idx < 0:
116 raise ValueError('indices must be >= zero')
117
118 res = leadText+'-'
119 tmpIdx = idx
120 if idx>=1e6:
121 res += '%03d-'%(idx//1e6)
122 tmpIdx = idx % int(1e6)
123 if tmpIdx<1000:
124 res += '000-'
125 else:
126 res += '%03d-'%(tmpIdx//1000)
127 tmpIdx = tmpIdx % 1000
128
129 res += '%03d-'%(tmpIdx)
130 accum = 0
131 txt = str(idx)
132 for char in txt:
133 accum += int(char)
134
135 res += str(accum%10)
136 return res
137
139 """ returns the next available Id in the database
140
141 see RegisterItem for testing/documentation
142
143 """
144 vals = conn.GetData(table=table,fields=idColName)
145 max = 0
146 for val in vals:
147 val = RDIdToInt(val[0],validate=0)
148 if val > max: max = val
149 max += 1
150 return max
151
152 -def GetNextRDId(conn,table,idColName='Id',leadText=''):
153 """ returns the next available RDId in the database
154
155 see RegisterItem for testing/documentation
156
157 """
158 if not leadText:
159 val = conn.GetData(table=table,fields=idColName)[0][0]
160 val = val.replace('_','-')
161 leadText = val.split('-')[0]
162
163 id = GetNextId(conn,table,idColName=idColName)
164 return IndexToRDId(id,leadText=leadText)
165
166 -def RegisterItem(conn,table,value,columnName,data=None,
167 id='',idColName='Id',leadText='RDCmpd'):
168 """
169
170 >>> dbName = RDConfig.RDTestDatabase
171 >>> conn = DbConnect(dbName)
172 >>> tblName = 'StorageTest'
173 >>> conn.AddTable(tblName,'id varchar(32) not null primary key,label varchar(40),val int')
174 >>> RegisterItem(conn,tblName,'label1','label',['label1',1])==(1, 'RDCmpd-000-001-1')
175 True
176 >>> RegisterItem(conn,tblName,'label2','label',['label2',1])==(1, 'RDCmpd-000-002-2')
177 True
178 >>> RegisterItem(conn,tblName,'label1','label',['label1',1])==(0, 'RDCmpd-000-001-1')
179 True
180 >>> str(GetNextRDId(conn,tblName))
181 'RDCmpd-000-003-3'
182 >>> tuple(conn.GetData(table=tblName)[0])==('RDCmpd-000-001-1', 'label1', 1)
183 True
184
185 It's also possible to provide ids by hand:
186 >>> RegisterItem(conn,tblName,'label10','label',['label10',1],id='RDCmpd-000-010-1')==(1, 'RDCmpd-000-010-1')
187 True
188 >>> str(GetNextRDId(conn,tblName))
189 'RDCmpd-000-011-2'
190
191 """
192 curs = conn.GetCursor()
193 query = 'select %s from %s where %s=%s'%(idColName,table,columnName,DbModule.placeHolder)
194 curs.execute(query,(value,))
195 tmp = curs.fetchone()
196 if tmp:
197 return 0,tmp[0]
198 if not id:
199 id = GetNextRDId(conn,table,idColName=idColName,leadText=leadText)
200 if data:
201 row = [id]
202 row.extend(data)
203 conn.InsertData(table,row)
204 conn.Commit()
205 return 1,id
206
207 -def RegisterItems(conn,table,values,columnName,rows,
208 startId='',idColName='Id',leadText='RDCmpd'):
209 """
210 """
211 if rows and len(rows) != len(values):
212 raise ValueError("length mismatch between rows and values")
213 nVals = len(values)
214 origOrder={}
215 for i,v in enumerate(values):
216 origOrder[v]=i
217
218 curs = conn.GetCursor()
219 qs = ','.join(DbModule.placeHolder*nVals)
220 curs.execute("create temporary table regitemstemp (%(columnName)s)"%locals())
221 curs.executemany("insert into regitemstemp values (?)",[(x,) for x in values])
222 query = 'select %(columnName)s,%(idColName)s from %(table)s where %(columnName)s in (select * from regitemstemp)'%locals()
223 curs.execute(query)
224
225 dbData = curs.fetchall()
226 if dbData and len(dbData)==nVals:
227 return 0,[x[1] for x in dbData]
228
229 if not startId:
230 startId = GetNextRDId(conn,table,idColName=idColName,leadText=leadText)
231 startId = RDIdToInt(startId)
232 ids = [None]*nVals
233 for val,id in dbData:
234 ids[origOrder[val]]=id
235
236 rowsToInsert=[]
237 for i in range(nVals):
238 if ids[i] is None:
239 id = startId
240 startId += 1
241 id = IndexToRDId(id,leadText=leadText)
242 ids[i] = id
243 if rows:
244 row = [id]
245 row.extend(rows[i])
246 rowsToInsert.append(row)
247 if rowsToInsert:
248 nCols = len(rowsToInsert[0])
249 qs = ','.join(DbModule.placeHolder*nCols)
250 curs.executemany('insert into %(table)s values (%(qs)s)'%locals(),rowsToInsert)
251 conn.Commit()
252 return len(values)-len(dbData),ids
253
254
255
256
257
258
259
260
261
262 _roundtripTests = """
263 >>> ValidateRDId(IndexToRDId(100))
264 1
265 >>> ValidateRDId(IndexToRDId(10000,leadText='foo'))
266 1
267 >>> indices = [1,100,1000,1000000]
268 >>> vals = []
269 >>> for idx in indices:
270 ... vals.append(RDIdToInt(IndexToRDId(idx)))
271 >>> vals == indices
272 1
273
274 """
275 __test__ = {"roundtrip":_roundtripTests}
276
278 import doctest,sys
279 return doctest.testmod(sys.modules["__main__"])
280
281 if __name__ == '__main__':
282 import sys
283 failed,tried = _test()
284 sys.exit(failed)
285