[12.0] Add data_encryption

This module come from the previous module keychain. It has been heavely refactored
to be compatible with server_environment module. For this it has been split into
2 modules data_encryption and server_environment_data_encryption
This commit is contained in:
Florian da Costa 2019-04-11 17:13:51 +02:00
parent 42ae968171
commit 0e0595f441
12 changed files with 336 additions and 0 deletions

View File

@ -0,0 +1 @@
from . import models

View File

@ -0,0 +1,24 @@
# Copyright <2019> Akretion
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
{
"name": "Encryption data",
"summary": "Store accounts and credentials encrypted by environment",
"version": "12.0.1.0.0",
"development_status": 'Alpha',
"category": "Tools",
"website": "https://github/oca/server-env",
"author": "Akretion, Odoo Community Association (OCA)",
"license": "AGPL-3",
"application": False,
"installable": True,
"external_dependencies": {
"python": [
'cryptography'],
},
"depends": [
"base",
],
"data": [
"security/ir.model.access.csv",
],
}

View File

@ -0,0 +1 @@
from . import encrypted_data

View File

@ -0,0 +1,138 @@
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
import logging
import json
from odoo import api, fields, models
from odoo.exceptions import AccessError, ValidationError
from odoo.tools.config import config
from odoo.tools.translate import _
from odoo.tools import ormcache
_logger = logging.getLogger(__name__)
try:
from cryptography.fernet import Fernet, InvalidToken
except ImportError as err: # pragma: no cover
_logger.debug(err)
class EncryptedData(models.Model):
"""Model to store encrypted data by environment (prod, preprod...)"""
_name = 'encrypted.data'
_description = 'Store any encrypted data by environment'
name = fields.Char(
required=True, readonly=True, index=True,
help="Technical name")
environment = fields.Char(
required=True,
index=True,
help="Concerned Odoo environment (prod, preprod...)")
encrypted_data = fields.Binary()
_sql_constraints = [
('name_environment_uniq', 'unique (name, environment)',
'You can not store multiple encrypted data for the same record and \
environment')
]
def _decrypt_data(self, env):
self.ensure_one()
cipher = self._get_cipher(env)
try:
return cipher.decrypt(self.encrypted_data).decode()
except InvalidToken:
raise ValidationError(_(
"Password has been encrypted with a different "
"key. Unless you can recover the previous key, "
"this password is unreadable."))
@api.model
@ormcache('self._uid', 'name', 'env')
def _encrypted_get(self, name, env=None):
if self.env.context.get('bin_size'):
self = self.with_context(bin_size=False)
if not self.env.user._is_superuser():
raise AccessError(
_("Encrypted data can only be read as superuser"))
if not env:
env = self._retrieve_env()
encrypted_rec = self.search(
[('name', '=', name), ('environment', '=', env)])
if not encrypted_rec:
return None
return encrypted_rec._decrypt_data(env)
@api.model
@ormcache('self._uid', 'name', 'env')
def _encrypted_read_json(self, name, env=None):
data = self._encrypted_get(name, env=env)
if not data:
return {}
try:
return json.loads(data)
except (ValueError, TypeError):
raise ValidationError(
_("The data you are trying to read are not in a json format"))
@staticmethod
def _retrieve_env():
"""Return the current environment
Raise if none is found
"""
current = config.get('running_env', False)
if not current:
raise ValidationError(
_('No environment found, please check your running_env '
'entry in your config file.'))
return current
@classmethod
def _get_cipher(cls, env):
"""Return a cipher using the key of environment.
force_env = name of the env key.
Useful for encoding against one precise env
"""
key_name = 'encryption_key_%s' % env
key_str = config.get(key_name)
if not key_str:
raise ValidationError(_(
"No '%s' entry found in config file. "
"Use a key similar to: %s") % (key_name, Fernet.generate_key())
)
# key should be in bytes format
key = key_str.encode()
return Fernet(key)
@api.model
def _encrypt_data(self, data, env):
cipher = self._get_cipher(env)
if not isinstance(data, bytes):
data = data.encode()
return cipher.encrypt(data or '')
@api.model
def _encrypted_store(self, name, data, env=None):
if not self.env.user._is_superuser():
raise AccessError(
_("You can only encrypt data as superuser"))
if not env:
env = self._retrieve_env()
encrypted_data = self._encrypt_data(data, env)
existing_data = self.search(
[('name', '=', name), ('environment', '=', env)])
if existing_data:
existing_data.write({'encrypted_data': encrypted_data})
else:
self.create({
'name': name,
'environment': env,
'encrypted_data': encrypted_data,
})
self._encrypted_get.clear_cache(self)
self._encrypted_read_json.clear_cache(self)
@api.model
def _encrypted_store_json(self, name, json_data, env=None):
return self._encrypted_store(name, json.dumps(json_data), env=env)

View File

@ -0,0 +1,21 @@
To configure this module, you need to edit the main configuration file
of your instance, and add a directive called ``running_env``. Commonly
used values are 'dev', 'test', 'production'::
[options]
running_env=dev
You also need to set the encryption key(s). The main idea is to have different
encryption keys for your different environment, to avoid the possibility to retrieve
crucial information from the production environment in a developement environment, for instance.
So, if your running environment is 'dev'::
[options]
encryption_key_dev=fyeMIx9XVPBBky5XZeLDxVc9dFKy7Uzas3AoyMarHPA=
In the configuration file of your production environment, you may want to configure
all your other environments encryption key. This way, from production you can encrypt and decrypt
data for all environments.
You can generate keys with python -c 'from cryptography.fernet import Fernet; print Fernet.generate_key()'.

View File

@ -0,0 +1,2 @@
* Raphaël Reverdy <raphael.reverdy@akretion.com>
* Florian da Costa <florian.dacosta@akretion.com>

View File

@ -0,0 +1,3 @@
This module allows to encrypt and decrypt data. This module is not usable
by itself, it is a low level module which should work as a base for others.
An example is the module server_environment_data_encryption

View File

@ -0,0 +1,5 @@
For now the encryption is dependent on the environment. It has been designed
to store the same kind of data with different values depending on the environement
(dev, preprod, prod...).
An improvement could be to split this in 2 modules. But the environment stuff
is not a big constraint.

View File

@ -0,0 +1,2 @@
id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink
access_encrypted_data,access_encrypted_data,model_encrypted_data,base.group_system,0,0,0,0
1 id name model_id:id group_id:id perm_read perm_write perm_create perm_unlink
2 access_encrypted_data access_encrypted_data model_encrypted_data base.group_system 0 0 0 0

View File

@ -0,0 +1 @@
from . import test_data_encrypt

View File

@ -0,0 +1,39 @@
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
from odoo.tests.common import TransactionCase
from odoo.tools.config import config
import logging
_logger = logging.getLogger(__name__)
try:
from cryptography.fernet import Fernet
except ImportError as err: # pragma: no cover
_logger.debug(err)
class CommonDataEncrypted(TransactionCase):
def setUp(self):
super().setUp()
self.encrypted_data = self.env['encrypted.data']
self.set_new_key_env('test')
self.old_running_env = config.get('running_env', '')
config['running_env'] = 'test'
self.crypted_data_name = 'test_model,1'
def set_new_key_env(self, environment):
crypting_key = Fernet.generate_key()
# The key is encoded to bytes in the module, because in real life
# the key com from the config file and is not in a binary format.
# So we decode here to avoid having a special behavior because of
# the tests.
config['encryption_key_{}'.format(environment)] = \
crypting_key.decode()
def tearDown(self):
config['running_env'] = self.old_running_env
return super().tearDown()

View File

@ -0,0 +1,99 @@
# © 2016 Akretion Raphaël REVERDY
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
from .common import CommonDataEncrypted
from odoo.tools.config import config
from odoo.exceptions import AccessError, ValidationError
import logging
_logger = logging.getLogger(__name__)
try:
from cryptography.fernet import Fernet
except ImportError as err: # pragma: no cover
_logger.debug(err)
class TestDataEncrypted(CommonDataEncrypted):
def test_store_data_no_superuser(self):
# only superuser can use this model
admin = self.env.ref('base.user_admin')
with self.assertRaises(AccessError):
self.encrypted_data.sudo(admin.id)._encrypted_store(
self.crypted_data_name, "My config")
def test_store_data_noenv_set(self):
config.pop('running_env', None)
with self.assertRaises(ValidationError):
self.encrypted_data.sudo()._encrypted_store(
self.crypted_data_name, "My config")
def test_store_data_nokey_set(self):
config.pop('encryption_key_test', None)
with self.assertRaises(ValidationError):
self.encrypted_data.sudo()._encrypted_store(
self.crypted_data_name, "My config")
def test_get_data_decrypted_and_cache(self):
self.encrypted_data.sudo()._encrypted_store(
'test_model,1', "My config")
data = self.encrypted_data.sudo()._encrypted_get(
self.crypted_data_name)
self.assertEqual(data, "My config")
# Test cache really depends on user (super user) else any user could
# access the data
admin = self.env.ref('base.user_admin')
with self.assertRaises(AccessError):
self.encrypted_data.sudo(admin)._encrypted_get(
self.crypted_data_name)
# Change value should invalidate cache
self.encrypted_data.sudo()._encrypted_store(
'test_model,1', "Other Config")
new_data = self.encrypted_data.sudo()._encrypted_get(
self.crypted_data_name)
self.assertEqual(new_data, "Other Config")
def test_get_data_wrong_key(self):
self.encrypted_data.sudo()._encrypted_store(
'test_model,1', "My config")
new_key = Fernet.generate_key()
config['encryption_key_test'] = new_key.decode()
with self.assertRaises(ValidationError):
self.encrypted_data.sudo()._encrypted_get(
self.crypted_data_name)
def test_get_empty_data(self):
empty_data = self.encrypted_data.sudo()._encrypted_get(
self.crypted_data_name)
self.assertEqual(empty_data, None)
def test_get_wrong_json(self):
self.encrypted_data.sudo()._encrypted_store(
self.crypted_data_name, 'config')
with self.assertRaises(ValidationError):
self.encrypted_data.sudo()._encrypted_read_json(
self.crypted_data_name)
def test_get_good_json(self):
self.encrypted_data.sudo()._encrypted_store_json(
self.crypted_data_name, {'key': 'value'})
data = self.encrypted_data.sudo()._encrypted_read_json(
self.crypted_data_name)
self.assertEqual(data, {'key': 'value'})
def test_get_empty_json(self):
data = self.encrypted_data.sudo()._encrypted_read_json(
self.crypted_data_name)
self.assertEqual(data, {})
def test_get_data_with_bin_size_context(self):
self.encrypted_data.sudo()._encrypted_store(
self.crypted_data_name, "test")
data = self.encrypted_data.sudo().with_context(bin_size=True).\
_encrypted_get(self.crypted_data_name)
self.assertEqual(data, "test")