Coverage for control/utils.py : 78%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""Things that do not find a more logical place.
3* Utitility functions
4* Character constants
5"""
7import sys
8import re
9import json
10from json import JSONEncoder
11from bson.objectid import ObjectId
13from base64 import b64encode, b64decode
14from datetime import datetime as dt
15from flask import request
18REGION_SHIFT = 0x1F1E6 - ord("A")
19"""Offset of the Unicode position where flag symbols start w.r.t. to `'A'`."""
21ISO_DTP = """%Y-%m-%dT%H:%M:%S.%f"""
22ISO_DT = """%Y-%m-%dT%H:%M:%S"""
23ISO_D = """%Y-%m-%d"""
25E = ""
26BLANK = " "
27COMMA = ","
28COLON = ":"
29DOT = "."
30PIPE = "|"
31T = "T"
32Z = "Z"
33AT = "@"
34EURO = "€"
35MINONE = "-1"
36ZERO = "0"
37ONE = "1"
38TWO = "2"
39THREE = "3"
40SLASH = "/"
41LOW = "_"
42AMP = "&"
43LT = "<"
44APOS = "'"
45QUOT = '"'
46DOLLAR = "$"
47Q = "?"
48S = "s"
50NL = "\n"
51TAB = "\t"
52LINE_SEP = "§"
53MIDDLE_DOT = "\u00b7"
55PLUS = "+"
56MIN = "-"
57HYPHEN = "-"
58WHYPHEN = " - "
59ELLIPS = "..."
60ON = " on "
62NBSP = "
"
64LATIN1 = "latin1"
65UTF8 = "utf8"
67EMPTY_DATE = "1900-01-01T00:00:00Z"
69ITER = "__iter__"
72class MongoJSONEncoder(JSONEncoder):
73 def __init__(self, *args, **kwargs):
74 super().__init__(*args, **kwargs)
76 def default(self, obj):
77 if isinstance(obj, dt):
78 return obj.isoformat()
79 elif isinstance(obj, ObjectId):
80 return str(obj)
81 return JSONEncoder.default(self, obj)
84mjson = MongoJSONEncoder(ensure_ascii=False).encode
87def mktsv(data):
88 if data is None:
89 return ""
91 allHeaders = set()
92 for row in data:
93 allHeaders |= set(row)
94 allHeaders = sorted(allHeaders)
96 lines = ["\t".join(allHeaders)]
97 for row in data:
98 values = []
99 for field in allHeaders:
100 value = row.get(field, "")
101 if type(value) in {list, tuple}:
102 value = MIDDLE_DOT.join(str(v) for v in value)
103 else:
104 value = str(value)
105 value = value.replace("\n", LINE_SEP).replace("\t", " ")
106 values.append(value)
107 lines.append("\t".join(values))
108 return "\n".join(lines)
111def factory(name, Base, Deriveds):
112 """Find the base or derived class by registered name.
114 Parameters
115 ----------
116 Base: class
117 Start the lookup here.
118 Deriveds: iterable of (name, class)
119 A list of derived classes with their names.
121 Returns
122 -------
123 class
124 """
126 Derived = Base
127 for (nm, NmCl) in Deriveds:
128 if nm == name:
129 Derived = NmCl
130 break
132 return Derived
135def utf8FromLatin1(s):
136 """Get Unicode from a latin1 string.
138 !!! hint
139 Needed to process the values of environment variables, in particular
140 those from the identity provider..
142 Parameters
143 ----------
144 s: string(latin1)
146 Returns
147 -------
148 string(utf8)
149 """
150 return str(bytes(s, encoding=LATIN1), encoding=UTF8)
153def bencode(s):
154 """Serialize a complex data structure into a plain ASCII string.
156 !!! hint
157 Needed to pass the original value into an edit widget, so that the Javascript
158 has a way to know whether an edited value is dirty or not.
160 Parameters
161 ----------
162 s: Python value
164 Returns
165 -------
166 string(ascii)
167 """
169 return b64encode(json.dumps(s, separators=(COMMA, COLON)).encode()).decode()
172def bdecode(s):
173 """Interpets a serialized value as a Python value.
175 Parameters
176 ----------
177 s: string(ascii)
179 Returns
180 -------
181 Python value.
182 """
184 return json.loads(b64decode(s.encode()).decode())
187def cap1(s):
188 """The first letter capitalized.
190 Parameters
191 ----------
192 s: string
194 Returns
195 -------
196 string
197 """
199 return E if not s else s[0].upper() + s[1:]
202def shiftRegional(iso):
203 """Transpose iso country code into flag.
205 By shifting the 2-letter iso country code with a fixed offset,
206 we get two Unicode characters that browsers know to render as a flag symbol
207 for that country.
209 Parameters
210 ----------
211 iso: string
212 2-letter iso country code.
214 Returns
215 -------
216 flag:string
217 2-letter unicode, starting from `control.utils.REGION_SHIFT`.
218 """
220 return E.join(chr(ord(r) + REGION_SHIFT) for r in iso)
223def now():
224 """The current moment in time as a `datetime` value."""
226 return dt.utcnow()
229def thisYear():
230 """The current year as number."""
232 return dt.utcnow().year
235def debug(*msg):
236 """Print a message to the std error immediately."""
238 sys.stderr.write(f"""{" ".join(msg)}{NL}""")
239 sys.stderr.flush()
242def serverprint(*msg):
243 """Print a message to the console immediately."""
245 sys.stdout.write(f"""{" ".join(msg)}{NL}""")
246 sys.stdout.flush()
249def dtm(isostr):
250 """Get a datetime value from an ISO string representing time."""
252 isostr = isostr.rstrip(Z)
253 try:
254 date = dt.strptime(isostr, ISO_DTP)
255 except Exception:
256 try:
257 date = dt.strptime(isostr, ISO_DT)
258 except Exception:
259 try:
260 date = dt.strptime(isostr, ISO_D)
261 except Exception as err:
262 return (str(err), isostr)
263 return (E, date)
266def isIterable(value):
267 """Whether a value is a non-string iterable.
269 !!! note
270 Strings are iterables.
271 We want to know whether a value is a string or an iterable of strings.
272 """
274 return type(value) is not str and hasattr(value, ITER)
277def asString(value):
278 """Join an iterable of strings into a string.
280 And if the value is already a string, return it, and if it is `None`
281 return the empty string.
282 """
284 return E if value is None else E.join(value) if isIterable(value) else value
287def getLast(sequence):
288 """Get the last element of a sequence or `None` if the sequence is empty."""
290 return sequence[-1] if sequence else None
293def pick(record, field, default=None):
294 """Get the value for a key in a dict, or None if there is no dict.
296 !!! warning
297 But if the value for `field` in the record is `None`, `None` will be returned.
299 Parameters
300 ----------
301 record: dict | `None`
302 `pick` should work in both cases.
303 field: string
304 The field in `record` we want to extract.
305 default: mixed
306 Default value.
308 Returns
309 -------
310 value | `None`
311 The value is the default if the record is `None`, or if the record has no
312 `field`.
313 Otherwise it is the value for `field` from the record.
314 """
316 return default if record is None else record.get(field, default)
319def creators(record, creatorField, editorsField):
320 """List all ids in two fields of a record.
322 Parameters
323 ----------
324 record: dict
325 The source record
326 creatorField: string
327 The name of a field with a single id value.
328 editorsFields: string
329 The name of a field with multiple id values.
331 Returns
332 -------
333 list
334 A sorted list of all ids encountered in those fields.
335 """
337 editors = set(pick(record, editorsField, default=[]))
338 editors.add(pick(record, creatorField))
339 return sorted(editors)
342def filterModified(modified):
343 """Filter a provenance trail.
345 The provenance trail is a list of strings shaped as `"actor on date"` corresponding
346 to changes in a record.
348 After filtering we retain for each day only the last modification event per person.
349 """
351 logicM = decomposeM(modified)
352 chunks = perDay(logicM)
353 thinned = thinM(chunks)
354 return composeM(thinned)
357def decomposeM(modified):
358 """Auxiliary in provenance filtering: split an entry into name and date."""
360 splits = [m.rsplit(ON, 1) for m in modified]
361 return [(m[0], dtm(m[1].replace(BLANK, T))[1]) for m in splits]
364def trimM(mdt, trim):
365 """Auxiliary in provenance filtering: trim the secoonds part.
367 Parameters
368 ----------
369 mdt: string
370 Modification date in iso shape.
371 trim: boolean
372 Whether or not to trim the decimal parts of the seconds aways.
373 """
375 return str(mdt).split(BLANK)[0] if trim == 1 else str(mdt).split(DOT)[0]
378def composeM(modified):
379 """Auxiliary in provenance filtering: compose the trimmed parts."""
381 return [f"""{m[0]}{ON}{trimM(m[1], trim)}""" for (m, trim) in reversed(modified)]
384def perDay(modified):
385 """Auxiliary in provenance filtering: chunk the trails into daily bits."""
387 chunks = {}
388 for m in modified:
389 chunks.setdefault(dt.date(m[1]), []).append(m)
390 return [chunks[date] for date in sorted(chunks)]
393def thinM(chunks):
394 """Auxiliary in provenance filtering: weed out the non-last items per day."""
396 modified = []
397 nChunks = len(chunks)
398 for (i, chunk) in enumerate(chunks):
399 isLast = i == nChunks - 1
400 people = {}
401 for m in chunk:
402 people.setdefault(m[0], []).append(m[1])
403 thinned = []
404 for (p, dates) in people.items():
405 thinned.append((p, sorted(dates)[-1]))
406 for m in sorted(thinned, key=lambda x: x[1]):
407 modified.append((m, 2 if isLast else 1))
408 return modified
411IDLIKE_RE = re.compile(r"^[0-9a-f]+$", re.S)
414def isIdLike(val):
415 return IDLIKE_RE.match(val)
418NAMELIKE_RE = re.compile(r"^[0-9a-zA-Z_]+$", re.S)
421def isNameLike(val):
422 return NAMELIKE_RE.match(val)
425def isEmailLike(val):
426 parts = val.split("@")
427 if len(parts) != 2: 427 ↛ 429line 427 didn't jump to line 429, because the condition on line 427 was never false
428 return False
429 good = True
430 for part in parts:
431 if not part.replace("_", "").replace("-", "").replace("+", "").isalnum():
432 good = False
433 break
434 return good
437def isEppnLike(val):
438 parts = val.split("@")
439 good = True
440 for part in parts:
441 if not part.replace("_", "").replace("-", "").replace("+", "").isalnum():
442 good = False
443 break
444 return good
447def isFileLike(val):
448 parts = val.split("/")
449 good = True
450 for part in parts:
451 if (
452 not part.replace("_", "")
453 .replace("-", "")
454 .replace("+", "")
455 .replace(".", "")
456 .isalnum()
457 ):
458 good = False
459 break
460 return good
463def isNamesLike(val):
464 parts = val.split(",")
465 good = True
466 for part in parts: 466 ↛ 470line 466 didn't jump to line 470, because the loop on line 466 didn't complete
467 if not part.isalnum(): 467 ↛ 466line 467 didn't jump to line 466, because the condition on line 467 was never false
468 good = False
469 break
470 return good
473def saveParam(v):
474 if not v:
475 return ""
476 if len(v) < 30:
477 return v
478 return f"{v[0:10]} ... {v[-10:]}"
481def getq(name):
482 return request.args.get(name, "")[0:64]