from typing import Any, Union, Dict, List, Set, Tuple, NoReturn, ClassVar, TextIO, Callable
import base64
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from collections import OrderedDict
from .packing import json_pack, json_unpack, SERIALIZABLE, PACKED, JSONABLE
from .errors import WrongKeyError, UnknownUserError, UnknownActionError, InsufficientPermissionsError
from cryptography.fernet import Fernet, InvalidToken, InvalidSignature
from crypt import crypt, mksalt
from getpass import getpass
# from hmac import compare_digest
_master_salt = '6FwLrxJb5mTPVwthumpackMASTERsalt'
[docs]def secure_key(word: str, salt: str = None) -> str:
'''
Get a hash from the raw text password
:param word: raw text password
:param salt: random salt to seed the hash computation
:return: hash of the password
'''
if salt is None:
salt = _master_salt
hsh = crypt(word, salt)
return hsh
[docs]def prompt_password_hash(salt: str = None) -> str:
'''
Get the hash of a password which is entered by the user in a prompt
:param salt: random salt for hash computation
:return: hash of the entered password
'''
if salt is None:
salt = _master_salt
hsh = secure_key(getpass(), salt=salt)
return hsh
[docs]def encrypt(data: bytes, hsh: Union[str, bytes] = None) -> bytes:
'''
Encrypt the data from a provided hash (or prompt for a password if no hash is provided)
:param data: bytes to be encrypted
:param hsh: hash used as key to encrypt
:return: encrypted data bytes
'''
if hsh is None:
hsh = prompt_password_hash()
key = format_key(hsh)
f = Fernet(key)
return f.encrypt(data)
[docs]def decrypt(data: bytes, hsh: Union[str, bytes] = None) -> bytes:
'''
Decrypt data with provided hash (or prompt for a password to compute hash if no hash is provided)
:param data: encrypted bytes that should be decrypted
:param hsh: hash of a password to be used as a key to decrypt (obviously, must be the same as was used to encrypt)
:return: the decrypted data, or a WrongKeyError if the key failed
'''
if hsh is None:
hsh = prompt_password_hash()
key = format_key(hsh)
f = Fernet(key)
try:
return f.decrypt(data)
except (InvalidToken, InvalidSignature):
pass
raise WrongKeyError
[docs]def secure_pack(obj: SERIALIZABLE, hsh: Union[str, bytes] = None,
meta: Dict[str,JSONABLE] = None, include_timestamp:bool = False) -> bytes:
'''
Pack the object and encrypt it using the provided hash (prompt user for password if none is provided)
:param obj: object to be packed
:param hsh: hash used as key to encrypt
:param meta: meta information to store with the packed `obj`
:param include_timestamp: include timestamp in meta info
:return: encrypted bytes
'''
data = json_pack(obj, meta=meta, include_timestamp=include_timestamp)
data = data.encode('latin1')
if hsh is None:
hsh = prompt_password_hash()
return encrypt(data, hsh)
[docs]def secure_unpack(data: bytes, hsh: Union[str, bytes] = None,
return_meta: bool = False) -> SERIALIZABLE:
'''
Decrypt `data` and unpack to recover the original object using the provided hash as a key
:param data: encrypted bytes
:param hsh: hash to be used as a key to decrypt (prompt user, if not provided)
:param return_meta: include meta info in output
:return: decrypted and unpacked object, possibly including the meta info
'''
if hsh is None:
hsh = prompt_password_hash()
data = decrypt(data, hsh).decode('latin1')
data = json_unpack(data, return_meta=return_meta)
return data
class Permission_Handler: # TODO: make transactionable, actually just move to humpack
def __init__(self, power_hierarchy={}, default_roles=[], god=None):
self.god = god # separate role that can't be modified by clients
self.users = OrderedDict() # user -> roles
self.powers = OrderedDict() # user -> power
self._power_hierarchy = OrderedDict() # role -> power
self.roles = OrderedDict() # role -> users
self.update_roles(**power_hierarchy)
self.default_roles = [role for role in default_roles if role in self.roles] # for new users
self.permissions = OrderedDict() # action -> roles
self.actions = OrderedDict() # action -> power
def _update_consistency(self, users=[], roles=[]):
if not (len(users) or len(roles)):
users = list(self.users.keys())
roles = list(self.roles.keys())
for user in users:
for role, usrs in self.roles.items():
if role not in self.users[user] and user in usrs:
self.users[user].add(role)
elif role in self.users[user] and user not in usrs:
self.users[user].remove(role)
for role in roles:
for user, rls in self.users.items():
if user not in self.roles[role] and role in rls:
self.roles[role].add(user)
elif user in self.roles[role] and role not in rls:
self.roles[role].remove(role)
def __contains__(self, user):
return self.contains_user(user)
def contains_user(self, user):
return user in self.users or user in self.powers
def contains_action(self, action):
return action in self.actions or action in self.permissions
def _update_role(self, name, power):
if name not in self.roles:
self.roles[name] = set()
self._power_hierarchy[name] = power
def update_role(self, role, power):
self._update_role(role, power)
self._update_consistency(role=[role])
def update_roles(self, **roles):
for name, power in roles.items():
self._update_role(name, power)
self._update_consistency(role=roles)
def update_user(self, user, *roles, power=None):
if power is not None:
self.powers[user] = power
if len(roles):
if user not in self.users:
self.users[user] = set()
self.users[user].update(roles)
def new_user(self, user, *roles, power=None):
roles = set(roles)
roles.update(self.default_roles)
self.update_user(user, *roles, power=power)
def set_user(self, user, *roles, power=None):
if power is None and user in self.powers:
del self.powers[user]
self.update_user(user, *roles, power=power)
def remove_user(self, user):
if user in self.users:
del self.users[user]
if user in self.powers:
del self.powers[user]
def update_action(self, action, *roles, power=None):
if power is not None:
self.actions[action] = power
if len(roles):
if action not in self.permissions:
self.permissions[action] = set()
self.permissions[action].update(roles)
def new_action(self, action, *roles, power=None):
self.update_action(action, *roles, power=power)
def set_action(self, action, *roles, power=None):
if power is None and action in self.actions:
del self.actions[action]
self.update_action(action, *roles, power=power)
def remove_action(self, action):
if action in self.actions:
del self.actions[action]
if action in self.permissions:
del self.permissions[action]
def validate(self, user, action=None):
if user == self.god:
return user
if self.contains_user(user):
raise UnknownUserError(user)
if action is None:
return user
if not self.contains_action(action):
raise UnknownActionError(action)
if user in self.users and action in self.permissions \
and len(self.users[user].intersection(self.permissions[action])):
return user
if user in self.powers and action in self.actions \
and self.powers[user] >= self.actions[action]:
return user
if user in self.users and action in self.actions:
power = max(self._power_hierarchy.get(role, 0) for role in self.users[user])
if power >= self.actions[action]:
return user
if user in self.powers and action in self.permissions:
req = min(self._power_hierarchy.get(role, 0) for role in self.permissions[action])
if self.powers[user] >= req:
return user
raise InsufficientPermissionsError(user, action)