Coverage for control/workflow/compute.py : 87%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Computing workflow.
3* Initialize the workflow table
4* Adjust workflow after user actions
5"""
7import sys
9from config import Config as C, Names as N
10from control.utils import getLast, pick as G, serverprint, creators
13CB = C.base
14CT = C.tables
15CF = C.workflow
16CM = C.mongo
18DEBUG = CB.debug
19DEBUG_WORKFLOW = G(DEBUG, N.workflow)
20USER_TABLES_LIST = CT.userTables
21MAIN_TABLE = USER_TABLES_LIST[0]
22INTER_TABLE = USER_TABLES_LIST[1]
23DETAILS = CT.details
24WORKFLOW_TABLES_LIST = CT.userTables + CT.userEntryTables
26WORKFLOW_TABLES = set(WORKFLOW_TABLES_LIST)
28DEBUG = "5a1690a32179c013250d932a"
31class Workflow:
32 """Manages workflow information.
34 Workflow is the concept that contributions, assessments and reviews
35 undergo steps in a certain order, and that their treatment is dependent on
36 the stage they are in. See the workflow.yaml config file.
38 Workflow information is represented in records that correspond to contrib records
39 in that the contrib record and the workflow record have one and the same id.
41 A workflow record for a contrib contains all the relevant info (as far as workflow
42 is concerned) of the contrib record and its (valid) assessment and their (valid)
43 reviews. See `control.workflow.apply.WorkflowItem`.
44 There it is defined how workflow information is *applied*.
46 At startup time, the workflow information is computed from scratch and stored
47 in the database.
49 This class is about computing and managing the workflow information.
51 ## Fixity
53 Due to workflow, records may become fixed, temporarily or permanently.
54 The following workflow attributes will be computed:
56 frozen
57 : permananently fixed due to a selection decision on the contrib.
59 * extends from contribs to assessments and reviews.
60 * workflow tasks: all forbidden
62 done
63 : permanently fixed due to a final review decision.
65 * extends from review to assessment to contrib
66 * workflow tasks: only selection decisions allowed
68 locked
69 : temporarily fixed due to review in progress.
71 * extends from assessment to contrib.
72 * workflow tasks: all allowed as far as they make sense
73 * also used to let final reviewer wait for expert reviewe
75 Any record that carries one of these fixity attributes cannot be edited
76 or deleted, except for the fields that get modified when an allowed
77 workflow task is executed.
78 """
80 def __init__(self, db):
81 """## Initialization
83 Several pieces of data that will be used many times in workflow computations
84 are fetched and stored as attributes.
86 The previous workflow table is dropped and replaced by a freshly computed one.
88 Parameters
89 ----------
90 db: object
91 See below.
92 """
94 self.db = db
95 """*object* The `control.db.Db` singleton
97 The database is needed to store computed workflow information, so we store
98 the Db singleton as attribute `db`.
99 """
101 decisionRecords = db.getValueRecords(N.decision)
102 self.decisions = {
103 G(record, N._id): G(record, N.rep) for record in decisionRecords
104 }
105 """*dict* Mapping of decision ids to decision verbs.
107 !!! hint
108 Think of `Accept`, `Reject`
109 """
111 self.decisionParticiple = {
112 G(record, N._id): G(record, N.participle) for record in decisionRecords
113 }
114 """*dict* Mapping of decision ids to decision participles.
116 !!! hint
117 Think of `Accepted`, `Rejected`
118 """
120 scoreData = db.getValueRecords(N.score)
121 self.scoreMapping = {
122 G(record, N._id): G(record, N.score)
123 for record in scoreData
124 if N.score in record
125 }
126 """*dict* Mapping of score ids to numeric scores.
127 """
129 maxScoreByCrit = {}
130 for record in scoreData:
131 criteriaId = G(record, N.criteria)
132 if criteriaId is None: 132 ↛ 133line 132 didn't jump to line 133, because the condition on line 132 was never true
133 continue
134 score = G(record, N.score, default=0)
135 prevMax = maxScoreByCrit.setdefault(criteriaId, None)
136 if prevMax is None or score > prevMax: 136 ↛ 130line 136 didn't jump to line 130, because the condition on line 136 was never false
137 maxScoreByCrit[criteriaId] = score
139 self.maxScoreByCrit = maxScoreByCrit
140 """*dict* Mapping of criteria ids to the maximum score for that criterion.
142 !!! note
143 We collect the maximum score that can be given for a criteria,
144 irrespective of any concrete assessment.
146 We need the maximum to present a given score as a percentage.
147 """
149 def initWorkflow(self, drop=False):
150 """(Re)fills the workflow table.
152 !!! caution
153 This is not needed if the workflow table stays in sync
154 with the other data in the database.
155 So, normally, it is best not to carry out this step, because
156 when workers start and restart, we do not want a big table
157 operation to happen that is visible across workers.
159 When the server starts, we carry out this function once.
161 !!! hint "Gunicorn"
162 On `gunicorn`, we start the server with `--preload`,
163 hence the workflow init happens before any worker starts.
165 !!! hint "Build script"
166 You can manually trigger the workflow initialization by means
167 of the build script, whether or not the webserver runs.
169 !!! hint "Sysadmin"
170 System administrators can trigger the workflow initialization
171 by means of a button in the sidebar, only visible and executable by them.
173 Parameters
174 ----------
175 drop: boolean
176 If True, the complete table will first be dropped and then
177 recreated.
178 Otherwise, the table will merely be cleared.
180 Returns
181 -------
182 The number of workflow records stored.
183 """
185 db = self.db
187 if drop:
188 if DEBUG_WORKFLOW: 188 ↛ 189line 188 didn't jump to line 189, because the condition on line 188 was never true
189 serverprint("WORKFLOW: Drop exisiting table")
190 db.dropWorkflow()
191 else:
192 if DEBUG_WORKFLOW: 192 ↛ 193line 192 didn't jump to line 193, because the condition on line 192 was never true
193 serverprint("WORKFLOW: Clear exisiting table")
194 db.clearWorkflow()
196 entries = {}
197 if DEBUG_WORKFLOW: 197 ↛ 198line 197 didn't jump to line 198, because the condition on line 197 was never true
198 serverprint("WORKFLOW: Read user (entry) tables")
199 for table in WORKFLOW_TABLES:
200 entries[table] = db.entries(table)
202 if DEBUG_WORKFLOW: 202 ↛ 203line 202 didn't jump to line 203, because the condition on line 202 was never true
203 serverprint("WORKFLOW: Link masters and details")
204 self.aggregate(entries)
206 if DEBUG_WORKFLOW: 206 ↛ 207line 206 didn't jump to line 207, because the condition on line 206 was never true
207 serverprint("WORKFLOW: Compute workflow info")
208 wfRecords = []
209 for mainRecord in G(entries, MAIN_TABLE, default={}).values():
210 info = self.computeWorkflow(record=mainRecord)
211 if info: 211 ↛ 209line 211 didn't jump to line 209, because the condition on line 211 was never false
212 wfRecords.append(info)
214 nWf = len(wfRecords)
215 if DEBUG_WORKFLOW: 215 ↛ 216line 215 didn't jump to line 216, because the condition on line 215 was never true
216 serverprint(f"WORKFLOW: Store {nWf} workflow records")
217 # check whether the wfRecords are distinct objects, otherwise we'll
218 # get a bulk-write error
219 wfIds = {}
220 for record in wfRecords:
221 wfIds.setdefault(id(record), []).append(record)
222 if DEBUG_WORKFLOW: 222 ↛ 223line 222 didn't jump to line 223, because the condition on line 222 was never true
223 serverprint("WORKFLOW: CHECKING DUPLICATES: ...")
224 good = True
225 for (wfId, records) in wfIds.items():
226 if len(records) > 1: 226 ↛ 227line 226 didn't jump to line 227, because the condition on line 226 was never true
227 if DEBUG_WORKFLOW:
228 serverprint(
229 f"WORKFLOW: DUPLICATE OBJECTS TO BE INSERTED ({len(records)} x:"
230 )
231 serverprint(records[0])
232 good = False
233 if good: 233 ↛ 237line 233 didn't jump to line 237, because the condition on line 233 was never false
234 if DEBUG_WORKFLOW: 234 ↛ 235line 234 didn't jump to line 235, because the condition on line 234 was never true
235 serverprint("WORKFLOW: NO DUPLICATES")
236 else:
237 sys.exit(4)
239 if wfRecords:
240 db.insertWorkflowMany(wfRecords)
241 if DEBUG_WORKFLOW: 241 ↛ 242line 241 didn't jump to line 242, because the condition on line 241 was never true
242 serverprint("WORKFLOW: Initialization done")
243 return nWf
245 def insert(self, contribId):
246 """Computes and stores workflow for a single contribution.
248 Parameters
249 ----------
250 contribId: ObjectId
251 The contrib for which to compute workflow.
252 """
254 db = self.db
256 info = self.computeWorkflow(contribId=contribId)
257 info[N._id] = contribId
258 if DEBUG_WORKFLOW: 258 ↛ 259line 258 didn't jump to line 259, because the condition on line 258 was never true
259 serverprint(f"WORKFLOW: New workflow info {contribId}")
260 db.insertWorkflow(info)
262 def recompute(self, contribId):
263 """Recomputes and replaces workflow for a single contribution.
265 Parameters
266 ----------
267 contribId: ObjectId
268 The contrib for which to compute workflow.
269 """
271 db = self.db
273 info = self.computeWorkflow(contribId=contribId)
274 db.updateWorkflow(contribId, info)
276 def delete(self, contribId):
277 """Deletes workflow for a single contribution.
279 Parameters
280 ----------
281 contribId: ObjectId
282 The contrib for which to delete workflow.
283 """
285 db = self.db
287 if DEBUG_WORKFLOW: 287 ↛ 288line 287 didn't jump to line 288, because the condition on line 287 was never true
288 serverprint(f"WORKFLOW: Delete workflow info {contribId}")
289 db.deleteWorkflow(contribId)
291 def computeWorkflow(self, record=None, contribId=None):
292 """Computes workflow for a single contribution.
294 Part of the work will be delegated to functions that
295 retrieve workflow info off assessment and review records.
297 Parameters
298 ----------
299 record: dict
300 The full contrib record for which to compute workflow.
301 If not given, the record will be retrieved on the basis
302 of `contribId` parameter.
303 contribId: ObjectId
304 The id of the contrib for which to compute workflow.
306 Returns
307 -------
308 dict
309 Workflow attributes.
310 """
312 if record is None:
313 record = self.getFullItem(contribId)
315 contribId = G(record, N._id)
316 if contribId is None:
317 return {}
319 contribType = G(record, N.typeContribution)
320 selected = G(record, N.selected)
321 dateDecided = G(record, N.dateDecided)
323 stage = (
324 N.selectYes
325 if selected
326 else N.selectNone
327 if selected is None
328 else N.selectNo
329 )
330 frozen = stage != N.selectNone
332 assessmentValid = getLast(
333 [
334 aRecord
335 for aRecord in G(record, N.assessment, default=[])
336 if contribType is not None
337 and G(aRecord, N.assessmentType) == contribType
338 ]
339 )
340 if str(contribId) == DEBUG:
341 pass
342 assessmentWf = (
343 self.computeWorkflowAssessment(assessmentValid, frozen)
344 if assessmentValid
345 else {}
346 )
348 locked = G(assessmentWf, N.locked, default=False)
349 done = G(assessmentWf, N.done, default=False)
350 mayAdd = not done and not locked and not frozen and not assessmentValid
352 return {
353 N._id: contribId,
354 N.creators: creators(record, N.creator, N.editors),
355 N.country: G(record, N.country),
356 N.type: contribType,
357 N.title: G(record, N.title),
358 N.selected: G(record, N.selected),
359 N.assessment: assessmentWf,
360 N.stage: stage,
361 N.stageDate: dateDecided,
362 N.frozen: frozen,
363 N.locked: locked,
364 N.done: done,
365 N.mayAdd: mayAdd,
366 }
368 def computeWorkflowAssessment(self, record, frozen):
369 """Computes workflow info derived from an assessment record.
371 This includes workflow information associated with the reviews
372 of this assessment. However, that will be delegated to another function.
374 Parameters
375 ----------
376 record: dict
377 The assessment record that is the information source for the
378 workflow information.
379 frozen: boolean
380 This is an attribute of the workflow, derived from the
381 contribution record.
382 It should be inherited by the associated assessment and review records.
383 Hence it is passed down.
385 Key attributes that will be computed are:
387 Attributes
388 ----------
389 locked: boolean
390 Workflow attribute that derives from the assessment.
391 It is set to `True` when an assessment is currently under review.
392 It is also important for the contribution, hence it will
393 be passed upwards to it.
394 done: boolean
395 Workflow attribute that derives from the reviews.
396 It is also important for the contribution, hence it will
397 be passed upwards to it.
398 It is set to `True` when the final reviewer has decided other than `Revise`.
399 If `done`, also the assessment and the contribution
400 count as `done`.
402 Returns
403 -------
404 attributes: dict
405 Workflow attributes
406 """
408 db = self.db
409 typeCriteria = db.typeCriteria
411 assessmentId = G(record, N._id)
412 assessmentType = G(record, N.assessmentType)
413 nCriteria = len(G(typeCriteria, assessmentType, default=[]))
415 centries = [
416 rec
417 for rec in G(record, N.criteriaEntry, default=[])
418 if (
419 assessmentId is not None
420 and G(rec, N.criteria) is not None
421 and G(rec, N.assessment) == assessmentId
422 )
423 ]
424 complete = len(centries) == nCriteria and all(
425 G(rec, N.score) and G(rec, N.evidence) for rec in centries
426 )
427 submitted = G(record, N.submitted)
428 dateSubmitted = G(record, N.dateSubmitted)
429 dateWithdrawn = G(record, N.dateWithdrawn)
430 withdrawn = not submitted and dateWithdrawn
432 score = self.computeScore(centries)
434 reviewer = {
435 N.expert: G(record, N.reviewerE),
436 N.final: G(record, N.reviewerF),
437 }
438 reviewers = sorted(set(reviewer.values()) - {None})
440 reviewsWf = {}
442 for (kind, theReviewer) in reviewer.items():
443 reviewValid = getLast(
444 [
445 rec
446 for rec in G(record, N.review, default=[])
447 if G(rec, N.creator) == theReviewer
448 and G(rec, N.reviewType) == assessmentType
449 ]
450 )
451 reviewWf = self.computeWorkflowReview(kind, reviewValid, frozen)
452 reviewsWf[kind] = reviewWf
454 finalReviewStage = None
456 expertReviewWf = G(reviewsWf, N.expert)
457 finalReviewWf = G(reviewsWf, N.final)
458 finalReviewStage = G(finalReviewWf, N.stage)
460 finalReviewDate = G(finalReviewWf, N.stageDate)
461 revisedProgress = (
462 submitted
463 and finalReviewStage == N.reviewRevise
464 and finalReviewDate > dateSubmitted
465 )
466 revisedDone = (
467 submitted
468 and finalReviewStage == N.reviewRevise
469 and finalReviewDate < dateSubmitted
470 )
472 stage = (
473 (N.completeWithdrawn if complete else N.incompleteWithdrawn)
474 if withdrawn
475 else (N.completeRevised if complete else N.incompleteRevised)
476 if revisedProgress
477 else N.submittedRevised
478 if revisedDone
479 else (
480 N.submitted if submitted else N.complete if complete else N.incomplete
481 )
482 )
483 stageDate = dateWithdrawn if withdrawn else dateSubmitted
485 locked = stage in {N.submitted, N.submittedRevised}
487 done = not not finalReviewStage and finalReviewStage != N.reviewRevise
489 if done:
490 if expertReviewWf: 490 ↛ 492line 490 didn't jump to line 492, because the condition on line 490 was never false
491 expertReviewWf[N.done] = True
492 if finalReviewWf: 492 ↛ 495line 492 didn't jump to line 495, because the condition on line 492 was never false
493 finalReviewWf[N.done] = True
495 mayAdd = {
496 kind: not frozen and not done and not G(reviewsWf, kind)
497 for kind in (N.expert, N.final)
498 }
500 return {
501 N._id: assessmentId,
502 N.creators: creators(record, N.creator, N.editors),
503 N.title: G(record, N.title),
504 N.submitted: G(record, N.submitted),
505 N.reviewer: reviewer,
506 N.reviewers: reviewers,
507 N.reviews: reviewsWf,
508 N.score: score,
509 N.stage: stage,
510 N.stageDate: stageDate,
511 N.frozen: frozen,
512 N.locked: locked,
513 N.done: done,
514 N.mayAdd: mayAdd,
515 }
517 def computeWorkflowReview(self, kind, record, frozen):
518 """Computes workflow info derived from a review record.
520 !!! note
521 Nothing in the review itself indicates what kind a review is.
522 But the associated assessment specifies an expert reviewer and a
523 final reviewer.
524 Hence the creator of a review will tell what kind of review it is.
526 Parameters
527 ----------
528 kind: string {`expert`, `final`}
529 The kind of review.
530 record: dict
531 The review record that is the information source for the
532 workflow information.
533 frozen: boolean
534 This is an attribute of the workflow, derived from the
535 contribution record.
536 It should be inherited by the associated assessment and review records.
537 Hence it is passed from there to here.
538 """
540 if record is None:
541 return {}
543 decisions = self.decisions
545 decision = G(decisions, G(record, N.decision))
547 stage = (
548 (
549 N.reviewAdviseAccept
550 if decision == N.Accept
551 else N.reviewAdviseReject
552 if decision == N.Reject
553 else N.reviewAdviseRevise
554 if decision == N.Revise
555 else None
556 )
557 if kind == N.expert
558 else (
559 N.reviewAccept
560 if decision == N.Accept
561 else N.reviewReject
562 if decision == N.Reject
563 else N.reviewRevise
564 if decision == N.Revise
565 else None
566 )
567 )
569 return {
570 N._id: G(record, N._id),
571 N.creators: creators(record, N.creator, N.editors),
572 N.title: G(record, N.title),
573 N.decision: G(record, N.decision),
574 N.kind: kind,
575 N.stage: stage,
576 N.stageDate: G(record, N.dateDecided),
577 N.frozen: frozen,
578 }
580 def computeScore(self, criteriaEntries):
581 """Computes the score of an assessment.
583 The assessment an average of the score given by the assessor to his/her
584 criteriaEntries.
586 Parameters
587 ----------
588 criteriaEntries: iterable of dict
589 The records in which an assessor enters his/her evalutation.
590 Each record gets a score.
591 The overall score is a simple average of all scores.
592 However, some criteria are not required, and a zero score for them
593 does not add to the average.
595 Returns
596 -------
597 dict
598 Overall score plus other quantities that serve to present a
599 derivation of the overall score.
600 """
602 scoreMapping = self.scoreMapping
603 maxScoreByCrit = self.maxScoreByCrit
604 theseScores = [
605 (
606 G(cEntry, N.criteria),
607 G(scoreMapping, G(cEntry, N.score)) or 0,
608 G(maxScoreByCrit, G(cEntry, N.criteria)) or 0,
609 )
610 for cEntry in criteriaEntries
611 ]
613 allMax = sum(x[2] for x in theseScores)
614 allN = len(theseScores)
616 relevantCriteriaEntries = [x for x in theseScores if x[1] >= 0]
617 relevantMax = sum(x[2] for x in relevantCriteriaEntries)
618 relevantScore = sum(x[1] for x in relevantCriteriaEntries)
619 relevantN = len(relevantCriteriaEntries)
620 overall = 0 if relevantMax == 0 else (round(relevantScore * 100 / relevantMax))
621 return dict(
622 overall=overall,
623 relevantScore=relevantScore,
624 relevantMax=relevantMax,
625 allMax=allMax,
626 relevantN=relevantN,
627 allN=allN,
628 )
630 def getFullItem(self, contribId):
631 """Collect a contribution with all relevant assessments and reviews.
633 Parameters
634 ----------
635 contribId: ObjectId
636 The contrib whose information we want to gather.
638 Returns
639 -------
640 dict
641 The contrib record and its dependent records.
642 """
644 db = self.db
646 entries = {}
647 for table in WORKFLOW_TABLES_LIST:
648 crit = (
649 {N._id: contribId}
650 if table == MAIN_TABLE
651 else {N.contrib: contribId}
652 if table in CT.userTables
653 else {INTER_TABLE: db.inCrit(G(entries, INTER_TABLE, default={}))}
654 )
655 entries[table] = db.entries(table, crit)
656 self.aggregate(entries)
658 return G(G(entries, MAIN_TABLE), contribId)
660 @staticmethod
661 def aggregate(entries):
662 """Aggregates details records in the record of their master.
664 Parameters
665 ----------
666 entries: dict
667 a dict keyed by table name and valued by lists of records of that table.
669 Returns
670 -------
671 void
672 This fuction works in place, in `entries`.
673 For every item `(detailTable, detailRecords)` in entries, where
674 detailTable is relevant to the workflow,
675 a possibly new key masterTable will be made in entries, and the
676 detailRecords will be put under that key as a dict keyed by id.
677 """
679 for (masterTable, detailTables) in DETAILS.items():
680 if masterTable in WORKFLOW_TABLES:
681 detailTablesWf = [
682 detailTable
683 for detailTable in detailTables
684 if detailTable in WORKFLOW_TABLES
685 ]
686 for detailTable in detailTablesWf:
687 if DEBUG_WORKFLOW: 687 ↛ 688line 687 didn't jump to line 688, because the condition on line 687 was never true
688 serverprint(
689 f"WORKFLOW: {masterTable}: lookup details from {detailTable}"
690 )
691 for record in sorted(
692 G(entries, detailTable, default={}).values(),
693 key=lambda r: G(r, N.dateCreated, default=0),
694 ):
695 masterId = G(record, masterTable)
696 if masterId: 696 ↛ 691line 696 didn't jump to line 691, because the condition on line 696 was never false
697 entries.setdefault(masterTable, {}).setdefault(
698 masterId, {}
699 ).setdefault(detailTable, []).append(record)