server-env/data_encryption/models/encrypted_data.py

157 lines
4.9 KiB
Python

# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
import logging
import json
from odoo import api, fields, models
from odoo.exceptions import AccessError, ValidationError
from odoo.tools.config import config
from odoo.tools.translate import _
from odoo.tools import ormcache
_logger = logging.getLogger(__name__)
try:
from cryptography.fernet import Fernet, InvalidToken
except ImportError as err: # pragma: no cover
_logger.debug(err)
class EncryptedData(models.Model):
"""Model to store encrypted data by environment (prod, preprod...)"""
_name = "encrypted.data"
_description = "Store any encrypted data by environment"
name = fields.Char(
required=True, readonly=True, index=True, help="Technical name"
)
environment = fields.Char(
required=True,
index=True,
help="Concerned Odoo environment (prod, preprod...)",
)
encrypted_data = fields.Binary()
_sql_constraints = [
(
"name_environment_uniq",
"unique (name, environment)",
"You can not store multiple encrypted data for the same record and \
environment",
)
]
def _decrypt_data(self, env):
self.ensure_one()
cipher = self._get_cipher(env)
try:
return cipher.decrypt(self.encrypted_data).decode()
except InvalidToken:
raise ValidationError(
_(
"Password has been encrypted with a different "
"key. Unless you can recover the previous key, "
"this password is unreadable."
)
)
@api.model
@ormcache("self._uid", "name", "env")
def _encrypted_get(self, name, env=None):
if self.env.context.get("bin_size"):
self = self.with_context(bin_size=False)
if not self.env.user._is_superuser():
raise AccessError(
_("Encrypted data can only be read as superuser")
)
if not env:
env = self._retrieve_env()
encrypted_rec = self.search(
[("name", "=", name), ("environment", "=", env)]
)
if not encrypted_rec:
return None
return encrypted_rec._decrypt_data(env)
@api.model
@ormcache("self._uid", "name", "env")
def _encrypted_read_json(self, name, env=None):
data = self._encrypted_get(name, env=env)
if not data:
return {}
try:
return json.loads(data)
except (ValueError, TypeError):
raise ValidationError(
_("The data you are trying to read are not in a json format")
)
@staticmethod
def _retrieve_env():
"""Return the current environment
Raise if none is found
"""
current = config.get("running_env", False)
if not current:
raise ValidationError(
_(
"No environment found, please check your running_env "
"entry in your config file."
)
)
return current
@classmethod
def _get_cipher(cls, env):
"""Return a cipher using the key of environment.
force_env = name of the env key.
Useful for encoding against one precise env
"""
key_name = "encryption_key_%s" % env
key_str = config.get(key_name)
if not key_str:
raise ValidationError(
_(
"No '%s' entry found in config file. "
"Use a key similar to: %s"
)
% (key_name, Fernet.generate_key())
)
# key should be in bytes format
key = key_str.encode()
return Fernet(key)
@api.model
def _encrypt_data(self, data, env):
cipher = self._get_cipher(env)
if not isinstance(data, bytes):
data = data.encode()
return cipher.encrypt(data or "")
@api.model
def _encrypted_store(self, name, data, env=None):
if not self.env.user._is_superuser():
raise AccessError(_("You can only encrypt data as superuser"))
if not env:
env = self._retrieve_env()
encrypted_data = self._encrypt_data(data, env)
existing_data = self.search(
[("name", "=", name), ("environment", "=", env)]
)
if existing_data:
existing_data.write({"encrypted_data": encrypted_data})
else:
self.create(
{
"name": name,
"environment": env,
"encrypted_data": encrypted_data,
}
)
self._encrypted_get.clear_cache(self)
self._encrypted_read_json.clear_cache(self)
@api.model
def _encrypted_store_json(self, name, json_data, env=None):
return self._encrypted_store(name, json.dumps(json_data), env=env)