-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathimport_bika_setup.py
402 lines (370 loc) · 14 KB
/
import_bika_setup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
from AccessControl.SecurityManagement import newSecurityManager
from Products.Archetypes import Field
from Products.ATExtensions.ateapi import RecordField, RecordsField
from Products.CMFCore.utils import getToolByName
from Products.CMFPlone.factory import _DEFAULT_PROFILE
from Products.CMFPlone.factory import addPloneSite
import openpyxl
import argparse
import os
import pprint
import shutil
import tempfile
import transaction
import zipfile
# def excepthook(typ, value, tb):
# import pudb as pdb
# import traceback
# traceback.print_exception(typ, value, tb)
# pdb.pm()
# pdb.set_trace()
# import sys; sys.excepthook = excepthook
# If creating a new Plone site:
from zope.component.hooks import setSite
default_profiles = [
'plonetheme.classic:default',
'plonetheme.sunburst:default',
'plone.app.caching:default',
'bika.lims:default',
]
export_types = [
'Client',
'Contact',
'ARPriority',
'AnalysisProfile',
'ARTemplate',
'AnalysisCategory',
'AnalysisService',
'AnalysisSpec',
'AttachmentType',
'BatchLabel',
'Calculation',
'Container',
'ContainerType',
'Department',
'Instrument',
'InstrumentCalibration',
'InstrumentCertification',
'InstrumentMaintenanceTask',
'InstrumentScheduledTask',
'InstrumentType',
'InstrumentValidation',
'LabContact',
'LabProduct',
'Manufacturer',
'Method',
'Preservation',
'ReferenceDefinition',
'SampleCondition',
'SampleMatrix',
'StorageLocation',
'SamplePoint',
'SampleType',
'SamplingDeviation',
'SRTemplate',
'SubGroup',
'Supplier',
'SupplierContact',
'WorksheetTemplate',
]
class Main:
def __init__(self, args):
self.args = args
self.deferred = []
def __call__(self):
"""Export entire bika site
"""
# pose as user
self.user = app.acl_users.getUserById(self.args.username)
newSecurityManager(None, self.user)
# get or create portal object
try:
self.portal = app.unrestrictedTraverse(self.args.sitepath)
except KeyError:
self.portal = self.create_site()
setSite(self.portal)
# Extract zipfile
self.tempdir = tempfile.mkdtemp()
zf = zipfile.ZipFile(self.args.inputfile, 'r')
zf.extractall(self.tempdir)
# Open workbook
self.wb = openpyxl.load_workbook(
os.path.join(self.tempdir, 'setupdata.xlsx'))
# Import
self.import_laboratory()
self.import_bika_setup()
for portal_type in export_types:
self.import_portal_type(portal_type)
# Remove tempdir
shutil.rmtree(self.tempdir)
# Resolve deferred/circular references
self.solve_deferred()
# Rebuild catalogs
for c in ['bika_analysis_catalog',
'bika_catalog',
'bika_setup_catalog',
'portal_catalog']:
print 'rebuilding %s' % c
self.portal[c].clearFindAndRebuild()
transaction.commit()
def create_site(self):
profiles = default_profiles
if self.args.profiles:
profiles.extend(self.args.profiles)
addPloneSite(
app,
self.args.sitepath,
title=self.args.title,
profile_id=_DEFAULT_PROFILE,
extension_ids=profiles,
setup_content=True,
default_language=self.args.language
)
self.portal = app.unrestrictedTraverse(self.args.sitepath)
return self.portal
def get_catalog(self, portal_type):
"""grab the first catalog we are indexed in
"""
at = getToolByName(self.portal, 'archetype_tool')
return at.getCatalogsByType(portal_type)[0]
def resolve_reference_ids_to_uids(self, instance, field, value):
"""Get target UIDs for any ReferenceField.
If targets do not exist, the requirement is added to deferred.
"""
# We make an assumption here, that if there are multiple allowed
# types, they will all be indexed in the same catalog.
target_type = field.allowed_types \
if isinstance(field.allowed_types, basestring) \
else field.allowed_types[0]
catalog = self.get_catalog(target_type)
# The ID is what is stored in the export, so first we must grab these:
if field.multiValued:
# multiValued references get their values stored in a sheet
# named after the relationship.
ids = []
if field.relationship[:31] not in self.wb:
return None
ws = self.wb[field.relationship[:31]]
ids = []
for rownr, row in enumerate(ws.rows):
if rownr == 0:
keys = [cell.value for cell in row]
continue
rowdict = dict(zip(keys, [cell.value for cell in row]))
if rowdict['Source'] == instance.id:
ids.append(rowdict['Target'])
if not ids:
return []
final_value = []
for vid in ids:
brain = catalog(portal_type=field.allowed_types, id=vid)
if brain:
final_value.append(brain[0].getObject())
else:
self.defer(instance, field, catalog,
field.allowed_types, vid)
return final_value
else:
if value:
brain = catalog(portal_type=field.allowed_types, id=value)
if brain:
return brain[0].getObject()
else:
self.defer(instance, field, catalog,
field.allowed_types, value)
return None
def resolve_records(self, instance, field, value):
# RecordField and RecordsField
# We must re-create the dict (or list of dicts) from sheet values
ws = self.wb[value]
matches = []
for rownr, row in enumerate(ws.rows):
if rownr == 0:
keys = [cell.value for cell in row]
continue
rowdict = dict(zip(keys, [cell.value for cell in row]))
if rowdict['id'] == instance.id \
and rowdict['field'] == field.getName():
matches.append(rowdict)
if type(field.default) == dict:
return matches[0] if matches else {}
else:
return matches
def set(self, instance, field, value):
# mutator = field.getMutator(instance)
outval = self.mutate(instance, field, value)
if field.getName() == 'id':
# I don't know why, but if we use field.set for setting the id, it
# lands in the database as a unicode string causing catalog failure
instance.id = outval
else:
field.set(instance, outval)
def mutate(self, instance, field, value):
# Ints and bools are transparent
if type(value) in (int, bool):
return value
# All strings must be encoded
if isinstance(value, unicode):
value = value.encode('utf-8')
# RecordField is a single dictionary from the lookup table
if isinstance(field, RecordField):
value = self.resolve_records(instance, field, value) \
if value else {}
# RecordsField is a list of dictionaries from the lookup table
elif isinstance(field, RecordsField) or \
(isinstance(value, basestring)
and value
and value.endswith('_values')):
value = self.resolve_records(instance, field, value) \
if value else []
# ReferenceField looks up single ID from cell value, or multiple
# IDs from a lookup table
if Field.IReferenceField.providedBy(field):
value = self.resolve_reference_ids_to_uids(instance, field, value)
# LinesField was converted to a multiline string on export
if Field.ILinesField.providedBy(field):
value = value.splitlines() if value else ()
# XXX THis should not be reading entire file contents into mem.
# TextField provides the IFileField interface, these must be ignored.
elif value and Field.IFileField.providedBy(field) \
and not Field.ITextField.providedBy(field):
if not os.path.exists(os.path.join(self.tempdir, value)):
print "Expected file does not exist: " + value
return ''
value = open(os.path.join(self.tempdir, value)).read()
return value
def import_laboratory(self):
instance = self.portal.bika_setup.laboratory
schema = instance.schema
ws = self.wb['Laboratory']
for row in ws.rows:
fieldname = row[0].value
cellvalue = row[1].value
field = schema[fieldname]
self.set(instance, field, cellvalue)
def import_bika_setup(self):
instance = self.portal.bika_setup
schema = instance.schema
ws = self.wb['BikaSetup']
for row in ws.rows:
fieldname = row[0].value
cellvalue = row[1].value
field = schema[fieldname]
self.set(instance, field, cellvalue)
def import_portal_type(self, portal_type):
if portal_type not in self.wb:
return None
pt = getToolByName(self.portal, 'portal_types')
if portal_type not in pt:
print 'Error: %s not found in portal_types.' % portal_type
return None
fti = pt[portal_type]
ws = self.wb[portal_type]
keys = [cell.value for cell in ws.rows[0]]
for rownr, row in enumerate(ws.rows[1:]):
rowdict = dict(zip(keys, [cell.value for cell in row]))
# First, some fields we manually extract, to prevent them
# from being handled by the loop below:
path = rowdict['path'].encode('utf-8').strip('/').split('/')
del (rowdict['path'])
uid = rowdict['uid'].encode('utf-8')
del (rowdict['uid'])
instance_id = rowdict['id'].encode('utf-8')
del (rowdict['id'])
# We need to get 'title', for the case of aberrations with no value
# it's really required, so we use the ID in these cases.
title = rowdict['title'].encode('utf-8') if rowdict['title'] \
else instance_id
del (rowdict['title'])
parent = self.portal.unrestrictedTraverse(path)
instance = fti.constructInstance(parent, instance_id, title=title)
instance.unmarkCreationFlag()
instance.reindexObject()
for fieldname, value in rowdict.items():
field = instance.schema[fieldname]
self.set(instance, field, value)
def defer(self, instance, field, catalog, allowed_types, target_id):
self.deferred.append({
'instance': instance,
'field': field,
'catalog': catalog,
'allowed_types': allowed_types,
'target_id': target_id,
})
def solve_deferred(self):
# walk through self.deferred and link outstanding references
if self.deferred:
print 'Attempting to solve %s deferred reference targets' % \
len(self.deferred)
nr_unsolved = [0, len(self.deferred)]
while nr_unsolved[-1] > nr_unsolved[-2]:
unsolved = []
for d in self.deferred:
src_obj = d['instance']
src_field = d['field']
target_id = d['target_id']
allowed_types = d['allowed_types']
catalog = d['catalog']
try:
proxies = catalog(portal_type=allowed_types, id=target_id)
except:
continue
if len(proxies) > 0:
obj = proxies[0].getObject()
if src_field.multiValued:
value = src_field.get(src_obj)
if obj.UID() not in value:
value.append(obj.UID())
else:
value = obj.UID()
src_field.set(src_obj, value)
else:
unsolved.append(d)
self.deferred = unsolved
nr_unsolved.append(len(unsolved))
if self.deferred:
print 'Failed to solve %s deferred targets:' % len(self.deferred)
pprint.pprint(self.deferred)
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Import bika setupdata created by export_bika_setup.py',
epilog='This script is meant to be run with zopepy or bin/instance.'
' See http://docs.plone.org/develop/plone/misc/commandline.html'
' for details.'
)
parser.add_argument(
'-s',
dest='sitepath',
required=True,
help='full path to Plone site root. Site will be created if it does'
' not already exist.')
parser.add_argument(
'-i',
dest='inputfile',
required=True,
help='input zip file, created by the export script.')
parser.add_argument(
'-u',
dest='username',
default='admin',
help='zope admin username (default: admin)')
parser.add_argument(
'-t',
dest='title',
default='Plone',
help='If a new Plone site is created, this specifies the site Title.'),
parser.add_argument(
'-l',
dest='language',
default='en',
help='If a new Plone site is created, this is the site language.'
' (default: en)')
parser.add_argument(
'-p',
dest='profiles',
action='append',
help='If a new Plone site is created, this option may be used to'
' specify additional profiles to be activated.'),
args, unknown = parser.parse_known_args()
main = Main(args)
main()