Module control.utils
Things that do not find a more logical place.
- Utitility functions
- Character constants
Expand source code
"""Things that do not find a more logical place.
* Utitility functions
* Character constants
"""
import sys
import re
import json
from json import JSONEncoder
from bson.objectid import ObjectId
from base64 import b64encode, b64decode
from datetime import datetime as dt
from flask import request
REGION_SHIFT = 0x1F1E6 - ord("A")
"""Offset of the Unicode position where flag symbols start w.r.t. to `'A'`."""
ISO_DTP = """%Y-%m-%dT%H:%M:%S.%f"""
ISO_DT = """%Y-%m-%dT%H:%M:%S"""
ISO_D = """%Y-%m-%d"""
E = ""
BLANK = " "
COMMA = ","
COLON = ":"
DOT = "."
PIPE = "|"
T = "T"
Z = "Z"
AT = "@"
EURO = "€"
MINONE = "-1"
ZERO = "0"
ONE = "1"
TWO = "2"
THREE = "3"
SLASH = "/"
LOW = "_"
AMP = "&"
LT = "<"
APOS = "'"
QUOT = '"'
DOLLAR = "$"
Q = "?"
S = "s"
NL = "\n"
TAB = "\t"
LINE_SEP = "§"
MIDDLE_DOT = "\u00b7"
PLUS = "+"
MIN = "-"
HYPHEN = "-"
WHYPHEN = " - "
ELLIPS = "..."
ON = " on "
NBSP = "
"
LATIN1 = "latin1"
UTF8 = "utf8"
EMPTY_DATE = "1900-01-01T00:00:00Z"
ITER = "__iter__"
class MongoJSONEncoder(JSONEncoder):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def default(self, obj):
if isinstance(obj, dt):
return obj.isoformat()
elif isinstance(obj, ObjectId):
return str(obj)
return JSONEncoder.default(self, obj)
mjson = MongoJSONEncoder(ensure_ascii=False).encode
def mktsv(data):
if data is None:
return ""
allHeaders = set()
for row in data:
allHeaders |= set(row)
allHeaders = sorted(allHeaders)
lines = ["\t".join(allHeaders)]
for row in data:
values = []
for field in allHeaders:
value = row.get(field, "")
if type(value) in {list, tuple}:
value = MIDDLE_DOT.join(str(v) for v in value)
else:
value = str(value)
value = value.replace("\n", LINE_SEP).replace("\t", " ")
values.append(value)
lines.append("\t".join(values))
return "\n".join(lines)
def factory(name, Base, Deriveds):
"""Find the base or derived class by registered name.
Parameters
----------
Base: class
Start the lookup here.
Deriveds: iterable of (name, class)
A list of derived classes with their names.
Returns
-------
class
"""
Derived = Base
for (nm, NmCl) in Deriveds:
if nm == name:
Derived = NmCl
break
return Derived
def utf8FromLatin1(s):
"""Get Unicode from a latin1 string.
!!! hint
Needed to process the values of environment variables, in particular
those from the identity provider..
Parameters
----------
s: string(latin1)
Returns
-------
string(utf8)
"""
return str(bytes(s, encoding=LATIN1), encoding=UTF8)
def bencode(s):
"""Serialize a complex data structure into a plain ASCII string.
!!! hint
Needed to pass the original value into an edit widget, so that the Javascript
has a way to know whether an edited value is dirty or not.
Parameters
----------
s: Python value
Returns
-------
string(ascii)
"""
return b64encode(json.dumps(s, separators=(COMMA, COLON)).encode()).decode()
def bdecode(s):
"""Interpets a serialized value as a Python value.
Parameters
----------
s: string(ascii)
Returns
-------
Python value.
"""
return json.loads(b64decode(s.encode()).decode())
def cap1(s):
"""The first letter capitalized.
Parameters
----------
s: string
Returns
-------
string
"""
return E if not s else s[0].upper() + s[1:]
def shiftRegional(iso):
"""Transpose iso country code into flag.
By shifting the 2-letter iso country code with a fixed offset,
we get two Unicode characters that browsers know to render as a flag symbol
for that country.
Parameters
----------
iso: string
2-letter iso country code.
Returns
-------
flag:string
2-letter unicode, starting from `control.utils.REGION_SHIFT`.
"""
return E.join(chr(ord(r) + REGION_SHIFT) for r in iso)
def now():
"""The current moment in time as a `datetime` value."""
return dt.utcnow()
def thisYear():
"""The current year as number."""
return dt.utcnow().year
def debug(*msg):
"""Print a message to the std error immediately."""
sys.stderr.write(f"""{" ".join(msg)}{NL}""")
sys.stderr.flush()
def serverprint(*msg):
"""Print a message to the console immediately."""
sys.stdout.write(f"""{" ".join(msg)}{NL}""")
sys.stdout.flush()
def dtm(isostr):
"""Get a datetime value from an ISO string representing time."""
isostr = isostr.rstrip(Z)
try:
date = dt.strptime(isostr, ISO_DTP)
except Exception:
try:
date = dt.strptime(isostr, ISO_DT)
except Exception:
try:
date = dt.strptime(isostr, ISO_D)
except Exception as err:
return (str(err), isostr)
return (E, date)
def isIterable(value):
"""Whether a value is a non-string iterable.
!!! note
Strings are iterables.
We want to know whether a value is a string or an iterable of strings.
"""
return type(value) is not str and hasattr(value, ITER)
def asString(value):
"""Join an iterable of strings into a string.
And if the value is already a string, return it, and if it is `None`
return the empty string.
"""
return E if value is None else E.join(value) if isIterable(value) else value
def getLast(sequence):
"""Get the last element of a sequence or `None` if the sequence is empty."""
return sequence[-1] if sequence else None
def pick(record, field, default=None):
"""Get the value for a key in a dict, or None if there is no dict.
!!! warning
But if the value for `field` in the record is `None`, `None` will be returned.
Parameters
----------
record: dict | `None`
`pick` should work in both cases.
field: string
The field in `record` we want to extract.
default: mixed
Default value.
Returns
-------
value | `None`
The value is the default if the record is `None`, or if the record has no
`field`.
Otherwise it is the value for `field` from the record.
"""
return default if record is None else record.get(field, default)
def creators(record, creatorField, editorsField):
"""List all ids in two fields of a record.
Parameters
----------
record: dict
The source record
creatorField: string
The name of a field with a single id value.
editorsFields: string
The name of a field with multiple id values.
Returns
-------
list
A sorted list of all ids encountered in those fields.
"""
editors = set(pick(record, editorsField, default=[]))
editors.add(pick(record, creatorField))
return sorted(editors)
def filterModified(modified):
"""Filter a provenance trail.
The provenance trail is a list of strings shaped as `"actor on date"` corresponding
to changes in a record.
After filtering we retain for each day only the last modification event per person.
"""
logicM = decomposeM(modified)
chunks = perDay(logicM)
thinned = thinM(chunks)
return composeM(thinned)
def decomposeM(modified):
"""Auxiliary in provenance filtering: split an entry into name and date."""
splits = [m.rsplit(ON, 1) for m in modified]
return [(m[0], dtm(m[1].replace(BLANK, T))[1]) for m in splits]
def trimM(mdt, trim):
"""Auxiliary in provenance filtering: trim the secoonds part.
Parameters
----------
mdt: string
Modification date in iso shape.
trim: boolean
Whether or not to trim the decimal parts of the seconds aways.
"""
return str(mdt).split(BLANK)[0] if trim == 1 else str(mdt).split(DOT)[0]
def composeM(modified):
"""Auxiliary in provenance filtering: compose the trimmed parts."""
return [f"""{m[0]}{ON}{trimM(m[1], trim)}""" for (m, trim) in reversed(modified)]
def perDay(modified):
"""Auxiliary in provenance filtering: chunk the trails into daily bits."""
chunks = {}
for m in modified:
chunks.setdefault(dt.date(m[1]), []).append(m)
return [chunks[date] for date in sorted(chunks)]
def thinM(chunks):
"""Auxiliary in provenance filtering: weed out the non-last items per day."""
modified = []
nChunks = len(chunks)
for (i, chunk) in enumerate(chunks):
isLast = i == nChunks - 1
people = {}
for m in chunk:
people.setdefault(m[0], []).append(m[1])
thinned = []
for (p, dates) in people.items():
thinned.append((p, sorted(dates)[-1]))
for m in sorted(thinned, key=lambda x: x[1]):
modified.append((m, 2 if isLast else 1))
return modified
IDLIKE_RE = re.compile(r"^[0-9a-f]+$", re.S)
def isIdLike(val):
return IDLIKE_RE.match(val)
NAMELIKE_RE = re.compile(r"^[0-9a-zA-Z_]+$", re.S)
def isNameLike(val):
return NAMELIKE_RE.match(val)
def isEmailLike(val):
parts = val.split("@")
if len(parts) != 2:
return False
good = True
for part in parts:
if not part.replace("_", "").replace("-", "").replace("+", "").isalnum():
good = False
break
return good
def isEppnLike(val):
parts = val.split("@")
good = True
for part in parts:
if not part.replace("_", "").replace("-", "").replace("+", "").isalnum():
good = False
break
return good
def isFileLike(val):
parts = val.split("/")
good = True
for part in parts:
if (
not part.replace("_", "")
.replace("-", "")
.replace("+", "")
.replace(".", "")
.isalnum()
):
good = False
break
return good
def isNamesLike(val):
parts = val.split(",")
good = True
for part in parts:
if not part.isalnum():
good = False
break
return good
def saveParam(v):
if not v:
return ""
if len(v) < 30:
return v
return f"{v[0:10]} ... {v[-10:]}"
def getq(name):
return request.args.get(name, "")[0:64]
Global variables
var REGION_SHIFT
-
Offset of the Unicode position where flag symbols start w.r.t. to
'A'
.
Functions
def asString(value)
-
Join an iterable of strings into a string.
And if the value is already a string, return it, and if it is
None
return the empty string.Expand source code
def asString(value): """Join an iterable of strings into a string. And if the value is already a string, return it, and if it is `None` return the empty string. """ return E if value is None else E.join(value) if isIterable(value) else value
def bdecode(s)
-
Interpets a serialized value as a Python value.
Parameters
s
:string(ascii)
Returns
Python value.
Expand source code
def bdecode(s): """Interpets a serialized value as a Python value. Parameters ---------- s: string(ascii) Returns ------- Python value. """ return json.loads(b64decode(s.encode()).decode())
def bencode(s)
-
Serialize a complex data structure into a plain ASCII string.
Hint
Needed to pass the original value into an edit widget, so that the Javascript has a way to know whether an edited value is dirty or not.
Parameters
s
:Python value
Returns
string(ascii)
Expand source code
def bencode(s): """Serialize a complex data structure into a plain ASCII string. !!! hint Needed to pass the original value into an edit widget, so that the Javascript has a way to know whether an edited value is dirty or not. Parameters ---------- s: Python value Returns ------- string(ascii) """ return b64encode(json.dumps(s, separators=(COMMA, COLON)).encode()).decode()
def cap1(s)
-
The first letter capitalized.
Parameters
s
:string
Returns
string
Expand source code
def cap1(s): """The first letter capitalized. Parameters ---------- s: string Returns ------- string """ return E if not s else s[0].upper() + s[1:]
def composeM(modified)
-
Auxiliary in provenance filtering: compose the trimmed parts.
Expand source code
def composeM(modified): """Auxiliary in provenance filtering: compose the trimmed parts.""" return [f"""{m[0]}{ON}{trimM(m[1], trim)}""" for (m, trim) in reversed(modified)]
def creators(record, creatorField, editorsField)
-
List all ids in two fields of a record.
Parameters
record
:dict
- The source record
creatorField
:string
- The name of a field with a single id value.
editorsFields
:string
- The name of a field with multiple id values.
Returns
list
- A sorted list of all ids encountered in those fields.
Expand source code
def creators(record, creatorField, editorsField): """List all ids in two fields of a record. Parameters ---------- record: dict The source record creatorField: string The name of a field with a single id value. editorsFields: string The name of a field with multiple id values. Returns ------- list A sorted list of all ids encountered in those fields. """ editors = set(pick(record, editorsField, default=[])) editors.add(pick(record, creatorField)) return sorted(editors)
def debug(*msg)
-
Print a message to the std error immediately.
Expand source code
def debug(*msg): """Print a message to the std error immediately.""" sys.stderr.write(f"""{" ".join(msg)}{NL}""") sys.stderr.flush()
def decomposeM(modified)
-
Auxiliary in provenance filtering: split an entry into name and date.
Expand source code
def decomposeM(modified): """Auxiliary in provenance filtering: split an entry into name and date.""" splits = [m.rsplit(ON, 1) for m in modified] return [(m[0], dtm(m[1].replace(BLANK, T))[1]) for m in splits]
def dtm(isostr)
-
Get a datetime value from an ISO string representing time.
Expand source code
def dtm(isostr): """Get a datetime value from an ISO string representing time.""" isostr = isostr.rstrip(Z) try: date = dt.strptime(isostr, ISO_DTP) except Exception: try: date = dt.strptime(isostr, ISO_DT) except Exception: try: date = dt.strptime(isostr, ISO_D) except Exception as err: return (str(err), isostr) return (E, date)
def factory(name, Base, Deriveds)
-
Find the base or derived class by registered name.
Parameters
Base
:class
- Start the lookup here.
Deriveds
:iterable
of(name, class)
- A list of derived classes with their names.
Returns
class
Expand source code
def factory(name, Base, Deriveds): """Find the base or derived class by registered name. Parameters ---------- Base: class Start the lookup here. Deriveds: iterable of (name, class) A list of derived classes with their names. Returns ------- class """ Derived = Base for (nm, NmCl) in Deriveds: if nm == name: Derived = NmCl break return Derived
def filterModified(modified)
-
Filter a provenance trail.
The provenance trail is a list of strings shaped as
"actor on date"
corresponding to changes in a record.After filtering we retain for each day only the last modification event per person.
Expand source code
def filterModified(modified): """Filter a provenance trail. The provenance trail is a list of strings shaped as `"actor on date"` corresponding to changes in a record. After filtering we retain for each day only the last modification event per person. """ logicM = decomposeM(modified) chunks = perDay(logicM) thinned = thinM(chunks) return composeM(thinned)
def getLast(sequence)
-
Get the last element of a sequence or
None
if the sequence is empty.Expand source code
def getLast(sequence): """Get the last element of a sequence or `None` if the sequence is empty.""" return sequence[-1] if sequence else None
def getq(name)
-
Expand source code
def getq(name): return request.args.get(name, "")[0:64]
def isEmailLike(val)
-
Expand source code
def isEmailLike(val): parts = val.split("@") if len(parts) != 2: return False good = True for part in parts: if not part.replace("_", "").replace("-", "").replace("+", "").isalnum(): good = False break return good
def isEppnLike(val)
-
Expand source code
def isEppnLike(val): parts = val.split("@") good = True for part in parts: if not part.replace("_", "").replace("-", "").replace("+", "").isalnum(): good = False break return good
def isFileLike(val)
-
Expand source code
def isFileLike(val): parts = val.split("/") good = True for part in parts: if ( not part.replace("_", "") .replace("-", "") .replace("+", "") .replace(".", "") .isalnum() ): good = False break return good
def isIdLike(val)
-
Expand source code
def isIdLike(val): return IDLIKE_RE.match(val)
def isIterable(value)
-
Whether a value is a non-string iterable.
Note
Strings are iterables. We want to know whether a value is a string or an iterable of strings.
Expand source code
def isIterable(value): """Whether a value is a non-string iterable. !!! note Strings are iterables. We want to know whether a value is a string or an iterable of strings. """ return type(value) is not str and hasattr(value, ITER)
def isNameLike(val)
-
Expand source code
def isNameLike(val): return NAMELIKE_RE.match(val)
def isNamesLike(val)
-
Expand source code
def isNamesLike(val): parts = val.split(",") good = True for part in parts: if not part.isalnum(): good = False break return good
def mktsv(data)
-
Expand source code
def mktsv(data): if data is None: return "" allHeaders = set() for row in data: allHeaders |= set(row) allHeaders = sorted(allHeaders) lines = ["\t".join(allHeaders)] for row in data: values = [] for field in allHeaders: value = row.get(field, "") if type(value) in {list, tuple}: value = MIDDLE_DOT.join(str(v) for v in value) else: value = str(value) value = value.replace("\n", LINE_SEP).replace("\t", " ") values.append(value) lines.append("\t".join(values)) return "\n".join(lines)
def now()
-
The current moment in time as a
datetime
value.Expand source code
def now(): """The current moment in time as a `datetime` value.""" return dt.utcnow()
def perDay(modified)
-
Auxiliary in provenance filtering: chunk the trails into daily bits.
Expand source code
def perDay(modified): """Auxiliary in provenance filtering: chunk the trails into daily bits.""" chunks = {} for m in modified: chunks.setdefault(dt.date(m[1]), []).append(m) return [chunks[date] for date in sorted(chunks)]
def pick(record, field, default=None)
-
Get the value for a key in a dict, or None if there is no dict.
Warning
But if the value for
field
in the record isNone
,None
will be returned.Parameters
record
:dict |
None``pick()
should work in both cases.field
:string
- The field in
record
we want to extract. default
:mixed
- Default value.
Returns
value |
None
The value is the default if the record isNone
, or if the record has nofield
. Otherwise it is the value forfield
from the record.Expand source code
def pick(record, field, default=None): """Get the value for a key in a dict, or None if there is no dict. !!! warning But if the value for `field` in the record is `None`, `None` will be returned. Parameters ---------- record: dict | `None` `pick` should work in both cases. field: string The field in `record` we want to extract. default: mixed Default value. Returns ------- value | `None` The value is the default if the record is `None`, or if the record has no `field`. Otherwise it is the value for `field` from the record. """ return default if record is None else record.get(field, default)
def saveParam(v)
-
Expand source code
def saveParam(v): if not v: return "" if len(v) < 30: return v return f"{v[0:10]} ... {v[-10:]}"
def serverprint(*msg)
-
Print a message to the console immediately.
Expand source code
def serverprint(*msg): """Print a message to the console immediately.""" sys.stdout.write(f"""{" ".join(msg)}{NL}""") sys.stdout.flush()
def shiftRegional(iso)
-
Transpose iso country code into flag.
By shifting the 2-letter iso country code with a fixed offset, we get two Unicode characters that browsers know to render as a flag symbol for that country.
Parameters
iso
:string
- 2-letter iso country code.
Returns
flag:string
- 2-letter unicode, starting from
REGION_SHIFT
.
Expand source code
def shiftRegional(iso): """Transpose iso country code into flag. By shifting the 2-letter iso country code with a fixed offset, we get two Unicode characters that browsers know to render as a flag symbol for that country. Parameters ---------- iso: string 2-letter iso country code. Returns ------- flag:string 2-letter unicode, starting from `control.utils.REGION_SHIFT`. """ return E.join(chr(ord(r) + REGION_SHIFT) for r in iso)
def thinM(chunks)
-
Auxiliary in provenance filtering: weed out the non-last items per day.
Expand source code
def thinM(chunks): """Auxiliary in provenance filtering: weed out the non-last items per day.""" modified = [] nChunks = len(chunks) for (i, chunk) in enumerate(chunks): isLast = i == nChunks - 1 people = {} for m in chunk: people.setdefault(m[0], []).append(m[1]) thinned = [] for (p, dates) in people.items(): thinned.append((p, sorted(dates)[-1])) for m in sorted(thinned, key=lambda x: x[1]): modified.append((m, 2 if isLast else 1)) return modified
def thisYear()
-
The current year as number.
Expand source code
def thisYear(): """The current year as number.""" return dt.utcnow().year
def trimM(mdt, trim)
-
Auxiliary in provenance filtering: trim the secoonds part.
Parameters
mdt
:string
- Modification date in iso shape.
trim
:boolean
- Whether or not to trim the decimal parts of the seconds aways.
Expand source code
def trimM(mdt, trim): """Auxiliary in provenance filtering: trim the secoonds part. Parameters ---------- mdt: string Modification date in iso shape. trim: boolean Whether or not to trim the decimal parts of the seconds aways. """ return str(mdt).split(BLANK)[0] if trim == 1 else str(mdt).split(DOT)[0]
def utf8FromLatin1(s)
-
Get Unicode from a latin1 string.
Hint
Needed to process the values of environment variables, in particular those from the identity provider..
Parameters
s
:string(latin1)
Returns
string(utf8)
Expand source code
def utf8FromLatin1(s): """Get Unicode from a latin1 string. !!! hint Needed to process the values of environment variables, in particular those from the identity provider.. Parameters ---------- s: string(latin1) Returns ------- string(utf8) """ return str(bytes(s, encoding=LATIN1), encoding=UTF8)
Classes
class MongoJSONEncoder (*args, **kwargs)
-
Extensible JSON http://json.org encoder for Python data structures.
Supports the following objects and types by default:
+-------------------+---------------+ | Python | JSON | +===================+===============+ | dict | object | +-------------------+---------------+ | list, tuple | array | +-------------------+---------------+ | str | string | +-------------------+---------------+ | int, float | number | +-------------------+---------------+ | True | true | +-------------------+---------------+ | False | false | +-------------------+---------------+ | None | null | +-------------------+---------------+
To extend this to recognize other objects, subclass and implement a
.default()
method with another method that returns a serializable object foro
if possible, otherwise it should call the superclass implementation (to raiseTypeError
).Constructor for JSONEncoder, with sensible defaults.
If skipkeys is false, then it is a TypeError to attempt encoding of keys that are not str, int, float or None. If skipkeys is True, such items are simply skipped.
If ensure_ascii is true, the output is guaranteed to be str objects with all incoming non-ASCII characters escaped. If ensure_ascii is false, the output can contain non-ASCII characters.
If check_circular is true, then lists, dicts, and custom encoded objects will be checked for circular references during encoding to prevent an infinite recursion (which would cause an RecursionError). Otherwise, no such check takes place.
If allow_nan is true, then NaN, Infinity, and -Infinity will be encoded as such. This behavior is not JSON specification compliant, but is consistent with most JavaScript based encoders and decoders. Otherwise, it will be a ValueError to encode such floats.
If sort_keys is true, then the output of dictionaries will be sorted by key; this is useful for regression tests to ensure that JSON serializations can be compared on a day-to-day basis.
If indent is a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines. None is the most compact representation.
If specified, separators should be an (item_separator, key_separator) tuple. The default is (', ', ': ') if indent is
None
and (',', ': ') otherwise. To get the most compact JSON representation, you should specify (',', ':') to eliminate whitespace.If specified, default is a function that gets called for objects that can't otherwise be serialized. It should return a JSON encodable version of the object or raise a
TypeError
.Expand source code
class MongoJSONEncoder(JSONEncoder): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def default(self, obj): if isinstance(obj, dt): return obj.isoformat() elif isinstance(obj, ObjectId): return str(obj) return JSONEncoder.default(self, obj)
Ancestors
- json.encoder.JSONEncoder
Methods
def default(self, obj)
-
Implement this method in a subclass such that it returns a serializable object for
o
, or calls the base implementation (to raise aTypeError
).For example, to support arbitrary iterators, you could implement default like this::
def default(self, o): try: iterable = iter(o) except TypeError: pass else: return list(iterable) # Let the base class default method raise the TypeError return JSONEncoder.default(self, o)
Expand source code
def default(self, obj): if isinstance(obj, dt): return obj.isoformat() elif isinstance(obj, ObjectId): return str(obj) return JSONEncoder.default(self, obj)