# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). import json import logging from odoo import api, fields, models from odoo.exceptions import AccessError, ValidationError from odoo.tools import ormcache from odoo.tools.config import config from odoo.tools.translate import _ _logger = logging.getLogger(__name__) try: from cryptography.fernet import Fernet, InvalidToken except ImportError as err: # pragma: no cover _logger.debug(err) class EncryptedData(models.Model): """Model to store encrypted data by environment (prod, preprod...)""" _name = "encrypted.data" _description = "Store any encrypted data by environment" name = fields.Char(required=True, readonly=True, index=True, help="Technical name") environment = fields.Char( required=True, index=True, help="Concerned Odoo environment (prod, preprod...)", ) encrypted_data = fields.Binary(attachment=False) _sql_constraints = [ ( "name_environment_uniq", "unique (name, environment)", "You can not store multiple encrypted data for the same record and \ environment", ) ] def _decrypt_data(self, env): self.ensure_one() cipher = self._get_cipher(env) try: return cipher.decrypt(self.encrypted_data).decode() except InvalidToken as exc: raise ValidationError( _( "Password has been encrypted with a different " "key. Unless you can recover the previous key, " "this password is unreadable." ) ) from exc @api.model @ormcache("self._uid", "name", "env") def _encrypted_get(self, name, env=None): if self.env.context.get("bin_size"): self = self.with_context(bin_size=False) if not self.env.su: raise AccessError( _("Encrypted data can only be read with suspended security (sudo)") ) if not env: env = self._retrieve_env() encrypted_rec = self.search([("name", "=", name), ("environment", "=", env)]) if not encrypted_rec: return None return encrypted_rec._decrypt_data(env) @api.model @ormcache("self._uid", "name", "env") def _encrypted_read_json(self, name, env=None): data = self._encrypted_get(name, env=env) if not data: return {} try: return json.loads(data) except (ValueError, TypeError) as exc: raise ValidationError( _("The data you are trying to read are not in a json format") ) from exc @staticmethod def _retrieve_env(): """Return the current environment Raise if none is found """ current = config.get("running_env", False) if not current: raise ValidationError( _( "No environment found, please check your running_env " "entry in your config file." ) ) return current @classmethod def _get_cipher(cls, env): """Return a cipher using the key of environment. force_env = name of the env key. Useful for encoding against one precise env """ key_name = "encryption_key_%s" % env key_str = config.get(key_name) if not key_str: raise ValidationError( _( "No '%(key_name)s' entry found in config file. " "Use a key similar to: %(key)s" ) % {"key_name": key_name, "key": Fernet.generate_key()} ) # key should be in bytes format key = key_str.encode() return Fernet(key) @api.model def _encrypt_data(self, data, env): cipher = self._get_cipher(env) if not isinstance(data, bytes): data = data.encode() return cipher.encrypt(data or "") @api.model def _encrypted_store(self, name, data, env=None): if not self.env.su: raise AccessError( _("You can only encrypt data with suspended security (sudo)") ) if not env: env = self._retrieve_env() encrypted_data = self._encrypt_data(data, env) existing_data = self.search([("name", "=", name), ("environment", "=", env)]) if existing_data: existing_data.write({"encrypted_data": encrypted_data}) else: self.create( {"name": name, "environment": env, "encrypted_data": encrypted_data} ) self._encrypted_get.clear_cache(self) self._encrypted_read_json.clear_cache(self) @api.model def _encrypted_store_json(self, name, json_data, env=None): return self._encrypted_store(name, json.dumps(json_data), env=env)