Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1"""Computing workflow. 

2 

3* Initialize the workflow table 

4* Adjust workflow after user actions 

5""" 

6 

7import sys 

8 

9from config import Config as C, Names as N 

10from control.utils import getLast, pick as G, serverprint, creators 

11 

12 

13CB = C.base 

14CT = C.tables 

15CF = C.workflow 

16CM = C.mongo 

17 

18DEBUG = CB.debug 

19DEBUG_WORKFLOW = G(DEBUG, N.workflow) 

20USER_TABLES_LIST = CT.userTables 

21MAIN_TABLE = USER_TABLES_LIST[0] 

22INTER_TABLE = USER_TABLES_LIST[1] 

23DETAILS = CT.details 

24WORKFLOW_TABLES_LIST = CT.userTables + CT.userEntryTables 

25 

26WORKFLOW_TABLES = set(WORKFLOW_TABLES_LIST) 

27 

28DEBUG = "5a1690a32179c013250d932a" 

29 

30 

31class Workflow: 

32 """Manages workflow information. 

33 

34 Workflow is the concept that contributions, assessments and reviews 

35 undergo steps in a certain order, and that their treatment is dependent on 

36 the stage they are in. See the workflow.yaml config file. 

37 

38 Workflow information is represented in records that correspond to contrib records 

39 in that the contrib record and the workflow record have one and the same id. 

40 

41 A workflow record for a contrib contains all the relevant info (as far as workflow 

42 is concerned) of the contrib record and its (valid) assessment and their (valid) 

43 reviews. See `control.workflow.apply.WorkflowItem`. 

44 There it is defined how workflow information is *applied*. 

45 

46 At startup time, the workflow information is computed from scratch and stored 

47 in the database. 

48 

49 This class is about computing and managing the workflow information. 

50 

51 ## Fixity 

52 

53 Due to workflow, records may become fixed, temporarily or permanently. 

54 The following workflow attributes will be computed: 

55 

56 frozen 

57 : permananently fixed due to a selection decision on the contrib. 

58 

59 * extends from contribs to assessments and reviews. 

60 * workflow tasks: all forbidden 

61 

62 done 

63 : permanently fixed due to a final review decision. 

64 

65 * extends from review to assessment to contrib 

66 * workflow tasks: only selection decisions allowed 

67 

68 locked 

69 : temporarily fixed due to review in progress. 

70 

71 * extends from assessment to contrib. 

72 * workflow tasks: all allowed as far as they make sense 

73 * also used to let final reviewer wait for expert reviewe 

74 

75 Any record that carries one of these fixity attributes cannot be edited 

76 or deleted, except for the fields that get modified when an allowed 

77 workflow task is executed. 

78 """ 

79 

80 def __init__(self, db): 

81 """## Initialization 

82 

83 Several pieces of data that will be used many times in workflow computations 

84 are fetched and stored as attributes. 

85 

86 The previous workflow table is dropped and replaced by a freshly computed one. 

87 

88 Parameters 

89 ---------- 

90 db: object 

91 See below. 

92 """ 

93 

94 self.db = db 

95 """*object* The `control.db.Db` singleton 

96 

97 The database is needed to store computed workflow information, so we store 

98 the Db singleton as attribute `db`. 

99 """ 

100 

101 decisionRecords = db.getValueRecords(N.decision) 

102 self.decisions = { 

103 G(record, N._id): G(record, N.rep) for record in decisionRecords 

104 } 

105 """*dict* Mapping of decision ids to decision verbs. 

106 

107 !!! hint 

108 Think of `Accept`, `Reject` 

109 """ 

110 

111 self.decisionParticiple = { 

112 G(record, N._id): G(record, N.participle) for record in decisionRecords 

113 } 

114 """*dict* Mapping of decision ids to decision participles. 

115 

116 !!! hint 

117 Think of `Accepted`, `Rejected` 

118 """ 

119 

120 scoreData = db.getValueRecords(N.score) 

121 self.scoreMapping = { 

122 G(record, N._id): G(record, N.score) 

123 for record in scoreData 

124 if N.score in record 

125 } 

126 """*dict* Mapping of score ids to numeric scores. 

127 """ 

128 

129 maxScoreByCrit = {} 

130 for record in scoreData: 

131 criteriaId = G(record, N.criteria) 

132 if criteriaId is None: 132 ↛ 133line 132 didn't jump to line 133, because the condition on line 132 was never true

133 continue 

134 score = G(record, N.score, default=0) 

135 prevMax = maxScoreByCrit.setdefault(criteriaId, None) 

136 if prevMax is None or score > prevMax: 136 ↛ 130line 136 didn't jump to line 130, because the condition on line 136 was never false

137 maxScoreByCrit[criteriaId] = score 

138 

139 self.maxScoreByCrit = maxScoreByCrit 

140 """*dict* Mapping of criteria ids to the maximum score for that criterion. 

141 

142 !!! note 

143 We collect the maximum score that can be given for a criteria, 

144 irrespective of any concrete assessment. 

145 

146 We need the maximum to present a given score as a percentage. 

147 """ 

148 

149 def initWorkflow(self, drop=False): 

150 """(Re)fills the workflow table. 

151 

152 !!! caution 

153 This is not needed if the workflow table stays in sync 

154 with the other data in the database. 

155 So, normally, it is best not to carry out this step, because 

156 when workers start and restart, we do not want a big table 

157 operation to happen that is visible across workers. 

158 

159 When the server starts, we carry out this function once. 

160 

161 !!! hint "Gunicorn" 

162 On `gunicorn`, we start the server with `--preload`, 

163 hence the workflow init happens before any worker starts. 

164 

165 !!! hint "Build script" 

166 You can manually trigger the workflow initialization by means 

167 of the build script, whether or not the webserver runs. 

168 

169 !!! hint "Sysadmin" 

170 System administrators can trigger the workflow initialization 

171 by means of a button in the sidebar, only visible and executable by them. 

172 

173 Parameters 

174 ---------- 

175 drop: boolean 

176 If True, the complete table will first be dropped and then 

177 recreated. 

178 Otherwise, the table will merely be cleared. 

179 

180 Returns 

181 ------- 

182 The number of workflow records stored. 

183 """ 

184 

185 db = self.db 

186 

187 if drop: 

188 if DEBUG_WORKFLOW: 188 ↛ 189line 188 didn't jump to line 189, because the condition on line 188 was never true

189 serverprint("WORKFLOW: Drop exisiting table") 

190 db.dropWorkflow() 

191 else: 

192 if DEBUG_WORKFLOW: 192 ↛ 193line 192 didn't jump to line 193, because the condition on line 192 was never true

193 serverprint("WORKFLOW: Clear exisiting table") 

194 db.clearWorkflow() 

195 

196 entries = {} 

197 if DEBUG_WORKFLOW: 197 ↛ 198line 197 didn't jump to line 198, because the condition on line 197 was never true

198 serverprint("WORKFLOW: Read user (entry) tables") 

199 for table in WORKFLOW_TABLES: 

200 entries[table] = db.entries(table) 

201 

202 if DEBUG_WORKFLOW: 202 ↛ 203line 202 didn't jump to line 203, because the condition on line 202 was never true

203 serverprint("WORKFLOW: Link masters and details") 

204 self.aggregate(entries) 

205 

206 if DEBUG_WORKFLOW: 206 ↛ 207line 206 didn't jump to line 207, because the condition on line 206 was never true

207 serverprint("WORKFLOW: Compute workflow info") 

208 wfRecords = [] 

209 for mainRecord in G(entries, MAIN_TABLE, default={}).values(): 

210 info = self.computeWorkflow(record=mainRecord) 

211 if info: 211 ↛ 209line 211 didn't jump to line 209, because the condition on line 211 was never false

212 wfRecords.append(info) 

213 

214 nWf = len(wfRecords) 

215 if DEBUG_WORKFLOW: 215 ↛ 216line 215 didn't jump to line 216, because the condition on line 215 was never true

216 serverprint(f"WORKFLOW: Store {nWf} workflow records") 

217 # check whether the wfRecords are distinct objects, otherwise we'll 

218 # get a bulk-write error 

219 wfIds = {} 

220 for record in wfRecords: 

221 wfIds.setdefault(id(record), []).append(record) 

222 if DEBUG_WORKFLOW: 222 ↛ 223line 222 didn't jump to line 223, because the condition on line 222 was never true

223 serverprint("WORKFLOW: CHECKING DUPLICATES: ...") 

224 good = True 

225 for (wfId, records) in wfIds.items(): 

226 if len(records) > 1: 226 ↛ 227line 226 didn't jump to line 227, because the condition on line 226 was never true

227 if DEBUG_WORKFLOW: 

228 serverprint( 

229 f"WORKFLOW: DUPLICATE OBJECTS TO BE INSERTED ({len(records)} x:" 

230 ) 

231 serverprint(records[0]) 

232 good = False 

233 if good: 233 ↛ 237line 233 didn't jump to line 237, because the condition on line 233 was never false

234 if DEBUG_WORKFLOW: 234 ↛ 235line 234 didn't jump to line 235, because the condition on line 234 was never true

235 serverprint("WORKFLOW: NO DUPLICATES") 

236 else: 

237 sys.exit(4) 

238 

239 if wfRecords: 

240 db.insertWorkflowMany(wfRecords) 

241 if DEBUG_WORKFLOW: 241 ↛ 242line 241 didn't jump to line 242, because the condition on line 241 was never true

242 serverprint("WORKFLOW: Initialization done") 

243 return nWf 

244 

245 def insert(self, contribId): 

246 """Computes and stores workflow for a single contribution. 

247 

248 Parameters 

249 ---------- 

250 contribId: ObjectId 

251 The contrib for which to compute workflow. 

252 """ 

253 

254 db = self.db 

255 

256 info = self.computeWorkflow(contribId=contribId) 

257 info[N._id] = contribId 

258 if DEBUG_WORKFLOW: 258 ↛ 259line 258 didn't jump to line 259, because the condition on line 258 was never true

259 serverprint(f"WORKFLOW: New workflow info {contribId}") 

260 db.insertWorkflow(info) 

261 

262 def recompute(self, contribId): 

263 """Recomputes and replaces workflow for a single contribution. 

264 

265 Parameters 

266 ---------- 

267 contribId: ObjectId 

268 The contrib for which to compute workflow. 

269 """ 

270 

271 db = self.db 

272 

273 info = self.computeWorkflow(contribId=contribId) 

274 db.updateWorkflow(contribId, info) 

275 

276 def delete(self, contribId): 

277 """Deletes workflow for a single contribution. 

278 

279 Parameters 

280 ---------- 

281 contribId: ObjectId 

282 The contrib for which to delete workflow. 

283 """ 

284 

285 db = self.db 

286 

287 if DEBUG_WORKFLOW: 287 ↛ 288line 287 didn't jump to line 288, because the condition on line 287 was never true

288 serverprint(f"WORKFLOW: Delete workflow info {contribId}") 

289 db.deleteWorkflow(contribId) 

290 

291 def computeWorkflow(self, record=None, contribId=None): 

292 """Computes workflow for a single contribution. 

293 

294 Part of the work will be delegated to functions that 

295 retrieve workflow info off assessment and review records. 

296 

297 Parameters 

298 ---------- 

299 record: dict 

300 The full contrib record for which to compute workflow. 

301 If not given, the record will be retrieved on the basis 

302 of `contribId` parameter. 

303 contribId: ObjectId 

304 The id of the contrib for which to compute workflow. 

305 

306 Returns 

307 ------- 

308 dict 

309 Workflow attributes. 

310 """ 

311 

312 if record is None: 

313 record = self.getFullItem(contribId) 

314 

315 contribId = G(record, N._id) 

316 if contribId is None: 

317 return {} 

318 

319 contribType = G(record, N.typeContribution) 

320 selected = G(record, N.selected) 

321 dateDecided = G(record, N.dateDecided) 

322 

323 stage = ( 

324 N.selectYes 

325 if selected 

326 else N.selectNone 

327 if selected is None 

328 else N.selectNo 

329 ) 

330 frozen = stage != N.selectNone 

331 

332 assessmentValid = getLast( 

333 [ 

334 aRecord 

335 for aRecord in G(record, N.assessment, default=[]) 

336 if contribType is not None 

337 and G(aRecord, N.assessmentType) == contribType 

338 ] 

339 ) 

340 if str(contribId) == DEBUG: 

341 pass 

342 assessmentWf = ( 

343 self.computeWorkflowAssessment(assessmentValid, frozen) 

344 if assessmentValid 

345 else {} 

346 ) 

347 

348 locked = G(assessmentWf, N.locked, default=False) 

349 done = G(assessmentWf, N.done, default=False) 

350 mayAdd = not done and not locked and not frozen and not assessmentValid 

351 

352 return { 

353 N._id: contribId, 

354 N.creators: creators(record, N.creator, N.editors), 

355 N.country: G(record, N.country), 

356 N.type: contribType, 

357 N.title: G(record, N.title), 

358 N.selected: G(record, N.selected), 

359 N.assessment: assessmentWf, 

360 N.stage: stage, 

361 N.stageDate: dateDecided, 

362 N.frozen: frozen, 

363 N.locked: locked, 

364 N.done: done, 

365 N.mayAdd: mayAdd, 

366 } 

367 

368 def computeWorkflowAssessment(self, record, frozen): 

369 """Computes workflow info derived from an assessment record. 

370 

371 This includes workflow information associated with the reviews 

372 of this assessment. However, that will be delegated to another function. 

373 

374 Parameters 

375 ---------- 

376 record: dict 

377 The assessment record that is the information source for the 

378 workflow information. 

379 frozen: boolean 

380 This is an attribute of the workflow, derived from the 

381 contribution record. 

382 It should be inherited by the associated assessment and review records. 

383 Hence it is passed down. 

384 

385 Key attributes that will be computed are: 

386 

387 Attributes 

388 ---------- 

389 locked: boolean 

390 Workflow attribute that derives from the assessment. 

391 It is set to `True` when an assessment is currently under review. 

392 It is also important for the contribution, hence it will 

393 be passed upwards to it. 

394 done: boolean 

395 Workflow attribute that derives from the reviews. 

396 It is also important for the contribution, hence it will 

397 be passed upwards to it. 

398 It is set to `True` when the final reviewer has decided other than `Revise`. 

399 If `done`, also the assessment and the contribution 

400 count as `done`. 

401 

402 Returns 

403 ------- 

404 attributes: dict 

405 Workflow attributes 

406 """ 

407 

408 db = self.db 

409 typeCriteria = db.typeCriteria 

410 

411 assessmentId = G(record, N._id) 

412 assessmentType = G(record, N.assessmentType) 

413 nCriteria = len(G(typeCriteria, assessmentType, default=[])) 

414 

415 centries = [ 

416 rec 

417 for rec in G(record, N.criteriaEntry, default=[]) 

418 if ( 

419 assessmentId is not None 

420 and G(rec, N.criteria) is not None 

421 and G(rec, N.assessment) == assessmentId 

422 ) 

423 ] 

424 complete = len(centries) == nCriteria and all( 

425 G(rec, N.score) and G(rec, N.evidence) for rec in centries 

426 ) 

427 submitted = G(record, N.submitted) 

428 dateSubmitted = G(record, N.dateSubmitted) 

429 dateWithdrawn = G(record, N.dateWithdrawn) 

430 withdrawn = not submitted and dateWithdrawn 

431 

432 score = self.computeScore(centries) 

433 

434 reviewer = { 

435 N.expert: G(record, N.reviewerE), 

436 N.final: G(record, N.reviewerF), 

437 } 

438 reviewers = sorted(set(reviewer.values()) - {None}) 

439 

440 reviewsWf = {} 

441 

442 for (kind, theReviewer) in reviewer.items(): 

443 reviewValid = getLast( 

444 [ 

445 rec 

446 for rec in G(record, N.review, default=[]) 

447 if G(rec, N.creator) == theReviewer 

448 and G(rec, N.reviewType) == assessmentType 

449 ] 

450 ) 

451 reviewWf = self.computeWorkflowReview(kind, reviewValid, frozen) 

452 reviewsWf[kind] = reviewWf 

453 

454 finalReviewStage = None 

455 

456 expertReviewWf = G(reviewsWf, N.expert) 

457 finalReviewWf = G(reviewsWf, N.final) 

458 finalReviewStage = G(finalReviewWf, N.stage) 

459 

460 finalReviewDate = G(finalReviewWf, N.stageDate) 

461 revisedProgress = ( 

462 submitted 

463 and finalReviewStage == N.reviewRevise 

464 and finalReviewDate > dateSubmitted 

465 ) 

466 revisedDone = ( 

467 submitted 

468 and finalReviewStage == N.reviewRevise 

469 and finalReviewDate < dateSubmitted 

470 ) 

471 

472 stage = ( 

473 (N.completeWithdrawn if complete else N.incompleteWithdrawn) 

474 if withdrawn 

475 else (N.completeRevised if complete else N.incompleteRevised) 

476 if revisedProgress 

477 else N.submittedRevised 

478 if revisedDone 

479 else ( 

480 N.submitted if submitted else N.complete if complete else N.incomplete 

481 ) 

482 ) 

483 stageDate = dateWithdrawn if withdrawn else dateSubmitted 

484 

485 locked = stage in {N.submitted, N.submittedRevised} 

486 

487 done = not not finalReviewStage and finalReviewStage != N.reviewRevise 

488 

489 if done: 

490 if expertReviewWf: 490 ↛ 492line 490 didn't jump to line 492, because the condition on line 490 was never false

491 expertReviewWf[N.done] = True 

492 if finalReviewWf: 492 ↛ 495line 492 didn't jump to line 495, because the condition on line 492 was never false

493 finalReviewWf[N.done] = True 

494 

495 mayAdd = { 

496 kind: not frozen and not done and not G(reviewsWf, kind) 

497 for kind in (N.expert, N.final) 

498 } 

499 

500 return { 

501 N._id: assessmentId, 

502 N.creators: creators(record, N.creator, N.editors), 

503 N.title: G(record, N.title), 

504 N.submitted: G(record, N.submitted), 

505 N.reviewer: reviewer, 

506 N.reviewers: reviewers, 

507 N.reviews: reviewsWf, 

508 N.score: score, 

509 N.stage: stage, 

510 N.stageDate: stageDate, 

511 N.frozen: frozen, 

512 N.locked: locked, 

513 N.done: done, 

514 N.mayAdd: mayAdd, 

515 } 

516 

517 def computeWorkflowReview(self, kind, record, frozen): 

518 """Computes workflow info derived from a review record. 

519 

520 !!! note 

521 Nothing in the review itself indicates what kind a review is. 

522 But the associated assessment specifies an expert reviewer and a 

523 final reviewer. 

524 Hence the creator of a review will tell what kind of review it is. 

525 

526 Parameters 

527 ---------- 

528 kind: string {`expert`, `final`} 

529 The kind of review. 

530 record: dict 

531 The review record that is the information source for the 

532 workflow information. 

533 frozen: boolean 

534 This is an attribute of the workflow, derived from the 

535 contribution record. 

536 It should be inherited by the associated assessment and review records. 

537 Hence it is passed from there to here. 

538 """ 

539 

540 if record is None: 

541 return {} 

542 

543 decisions = self.decisions 

544 

545 decision = G(decisions, G(record, N.decision)) 

546 

547 stage = ( 

548 ( 

549 N.reviewAdviseAccept 

550 if decision == N.Accept 

551 else N.reviewAdviseReject 

552 if decision == N.Reject 

553 else N.reviewAdviseRevise 

554 if decision == N.Revise 

555 else None 

556 ) 

557 if kind == N.expert 

558 else ( 

559 N.reviewAccept 

560 if decision == N.Accept 

561 else N.reviewReject 

562 if decision == N.Reject 

563 else N.reviewRevise 

564 if decision == N.Revise 

565 else None 

566 ) 

567 ) 

568 

569 return { 

570 N._id: G(record, N._id), 

571 N.creators: creators(record, N.creator, N.editors), 

572 N.title: G(record, N.title), 

573 N.decision: G(record, N.decision), 

574 N.kind: kind, 

575 N.stage: stage, 

576 N.stageDate: G(record, N.dateDecided), 

577 N.frozen: frozen, 

578 } 

579 

580 def computeScore(self, criteriaEntries): 

581 """Computes the score of an assessment. 

582 

583 The assessment an average of the score given by the assessor to his/her 

584 criteriaEntries. 

585 

586 Parameters 

587 ---------- 

588 criteriaEntries: iterable of dict 

589 The records in which an assessor enters his/her evalutation. 

590 Each record gets a score. 

591 The overall score is a simple average of all scores. 

592 However, some criteria are not required, and a zero score for them 

593 does not add to the average. 

594 

595 Returns 

596 ------- 

597 dict 

598 Overall score plus other quantities that serve to present a 

599 derivation of the overall score. 

600 """ 

601 

602 scoreMapping = self.scoreMapping 

603 maxScoreByCrit = self.maxScoreByCrit 

604 theseScores = [ 

605 ( 

606 G(cEntry, N.criteria), 

607 G(scoreMapping, G(cEntry, N.score)) or 0, 

608 G(maxScoreByCrit, G(cEntry, N.criteria)) or 0, 

609 ) 

610 for cEntry in criteriaEntries 

611 ] 

612 

613 allMax = sum(x[2] for x in theseScores) 

614 allN = len(theseScores) 

615 

616 relevantCriteriaEntries = [x for x in theseScores if x[1] >= 0] 

617 relevantMax = sum(x[2] for x in relevantCriteriaEntries) 

618 relevantScore = sum(x[1] for x in relevantCriteriaEntries) 

619 relevantN = len(relevantCriteriaEntries) 

620 overall = 0 if relevantMax == 0 else (round(relevantScore * 100 / relevantMax)) 

621 return dict( 

622 overall=overall, 

623 relevantScore=relevantScore, 

624 relevantMax=relevantMax, 

625 allMax=allMax, 

626 relevantN=relevantN, 

627 allN=allN, 

628 ) 

629 

630 def getFullItem(self, contribId): 

631 """Collect a contribution with all relevant assessments and reviews. 

632 

633 Parameters 

634 ---------- 

635 contribId: ObjectId 

636 The contrib whose information we want to gather. 

637 

638 Returns 

639 ------- 

640 dict 

641 The contrib record and its dependent records. 

642 """ 

643 

644 db = self.db 

645 

646 entries = {} 

647 for table in WORKFLOW_TABLES_LIST: 

648 crit = ( 

649 {N._id: contribId} 

650 if table == MAIN_TABLE 

651 else {N.contrib: contribId} 

652 if table in CT.userTables 

653 else {INTER_TABLE: db.inCrit(G(entries, INTER_TABLE, default={}))} 

654 ) 

655 entries[table] = db.entries(table, crit) 

656 self.aggregate(entries) 

657 

658 return G(G(entries, MAIN_TABLE), contribId) 

659 

660 @staticmethod 

661 def aggregate(entries): 

662 """Aggregates details records in the record of their master. 

663 

664 Parameters 

665 ---------- 

666 entries: dict 

667 a dict keyed by table name and valued by lists of records of that table. 

668 

669 Returns 

670 ------- 

671 void 

672 This fuction works in place, in `entries`. 

673 For every item `(detailTable, detailRecords)` in entries, where 

674 detailTable is relevant to the workflow, 

675 a possibly new key masterTable will be made in entries, and the 

676 detailRecords will be put under that key as a dict keyed by id. 

677 """ 

678 

679 for (masterTable, detailTables) in DETAILS.items(): 

680 if masterTable in WORKFLOW_TABLES: 

681 detailTablesWf = [ 

682 detailTable 

683 for detailTable in detailTables 

684 if detailTable in WORKFLOW_TABLES 

685 ] 

686 for detailTable in detailTablesWf: 

687 if DEBUG_WORKFLOW: 687 ↛ 688line 687 didn't jump to line 688, because the condition on line 687 was never true

688 serverprint( 

689 f"WORKFLOW: {masterTable}: lookup details from {detailTable}" 

690 ) 

691 for record in sorted( 

692 G(entries, detailTable, default={}).values(), 

693 key=lambda r: G(r, N.dateCreated, default=0), 

694 ): 

695 masterId = G(record, masterTable) 

696 if masterId: 696 ↛ 691line 696 didn't jump to line 691, because the condition on line 696 was never false

697 entries.setdefault(masterTable, {}).setdefault( 

698 masterId, {} 

699 ).setdefault(detailTable, []).append(record)