commit
076339f19d
|
|
@ -0,0 +1 @@
|
||||||
|
from . import models
|
||||||
|
|
@ -0,0 +1,24 @@
|
||||||
|
# Copyright <2019> Akretion
|
||||||
|
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
|
||||||
|
{
|
||||||
|
"name": "Encryption data",
|
||||||
|
"summary": "Store accounts and credentials encrypted by environment",
|
||||||
|
"version": "12.0.1.0.0",
|
||||||
|
"development_status": 'Alpha',
|
||||||
|
"category": "Tools",
|
||||||
|
"website": "https://github/oca/server-env",
|
||||||
|
"author": "Akretion, Odoo Community Association (OCA)",
|
||||||
|
"license": "AGPL-3",
|
||||||
|
"application": False,
|
||||||
|
"installable": True,
|
||||||
|
"external_dependencies": {
|
||||||
|
"python": [
|
||||||
|
'cryptography'],
|
||||||
|
},
|
||||||
|
"depends": [
|
||||||
|
"base",
|
||||||
|
],
|
||||||
|
"data": [
|
||||||
|
"security/ir.model.access.csv",
|
||||||
|
],
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1 @@
|
||||||
|
from . import encrypted_data
|
||||||
|
|
@ -0,0 +1,138 @@
|
||||||
|
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
|
||||||
|
import logging
|
||||||
|
import json
|
||||||
|
|
||||||
|
from odoo import api, fields, models
|
||||||
|
from odoo.exceptions import AccessError, ValidationError
|
||||||
|
from odoo.tools.config import config
|
||||||
|
from odoo.tools.translate import _
|
||||||
|
from odoo.tools import ormcache
|
||||||
|
|
||||||
|
_logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
try:
|
||||||
|
from cryptography.fernet import Fernet, InvalidToken
|
||||||
|
except ImportError as err: # pragma: no cover
|
||||||
|
_logger.debug(err)
|
||||||
|
|
||||||
|
|
||||||
|
class EncryptedData(models.Model):
|
||||||
|
"""Model to store encrypted data by environment (prod, preprod...)"""
|
||||||
|
|
||||||
|
_name = 'encrypted.data'
|
||||||
|
_description = 'Store any encrypted data by environment'
|
||||||
|
|
||||||
|
name = fields.Char(
|
||||||
|
required=True, readonly=True, index=True,
|
||||||
|
help="Technical name")
|
||||||
|
environment = fields.Char(
|
||||||
|
required=True,
|
||||||
|
index=True,
|
||||||
|
help="Concerned Odoo environment (prod, preprod...)")
|
||||||
|
encrypted_data = fields.Binary()
|
||||||
|
|
||||||
|
_sql_constraints = [
|
||||||
|
('name_environment_uniq', 'unique (name, environment)',
|
||||||
|
'You can not store multiple encrypted data for the same record and \
|
||||||
|
environment')
|
||||||
|
]
|
||||||
|
|
||||||
|
def _decrypt_data(self, env):
|
||||||
|
self.ensure_one()
|
||||||
|
cipher = self._get_cipher(env)
|
||||||
|
try:
|
||||||
|
return cipher.decrypt(self.encrypted_data).decode()
|
||||||
|
except InvalidToken:
|
||||||
|
raise ValidationError(_(
|
||||||
|
"Password has been encrypted with a different "
|
||||||
|
"key. Unless you can recover the previous key, "
|
||||||
|
"this password is unreadable."))
|
||||||
|
|
||||||
|
@api.model
|
||||||
|
@ormcache('self._uid', 'name', 'env')
|
||||||
|
def _encrypted_get(self, name, env=None):
|
||||||
|
if self.env.context.get('bin_size'):
|
||||||
|
self = self.with_context(bin_size=False)
|
||||||
|
if not self.env.user._is_superuser():
|
||||||
|
raise AccessError(
|
||||||
|
_("Encrypted data can only be read as superuser"))
|
||||||
|
if not env:
|
||||||
|
env = self._retrieve_env()
|
||||||
|
encrypted_rec = self.search(
|
||||||
|
[('name', '=', name), ('environment', '=', env)])
|
||||||
|
if not encrypted_rec:
|
||||||
|
return None
|
||||||
|
return encrypted_rec._decrypt_data(env)
|
||||||
|
|
||||||
|
@api.model
|
||||||
|
@ormcache('self._uid', 'name', 'env')
|
||||||
|
def _encrypted_read_json(self, name, env=None):
|
||||||
|
data = self._encrypted_get(name, env=env)
|
||||||
|
if not data:
|
||||||
|
return {}
|
||||||
|
try:
|
||||||
|
return json.loads(data)
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
raise ValidationError(
|
||||||
|
_("The data you are trying to read are not in a json format"))
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _retrieve_env():
|
||||||
|
"""Return the current environment
|
||||||
|
Raise if none is found
|
||||||
|
"""
|
||||||
|
current = config.get('running_env', False)
|
||||||
|
if not current:
|
||||||
|
raise ValidationError(
|
||||||
|
_('No environment found, please check your running_env '
|
||||||
|
'entry in your config file.'))
|
||||||
|
return current
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def _get_cipher(cls, env):
|
||||||
|
"""Return a cipher using the key of environment.
|
||||||
|
force_env = name of the env key.
|
||||||
|
Useful for encoding against one precise env
|
||||||
|
"""
|
||||||
|
key_name = 'encryption_key_%s' % env
|
||||||
|
key_str = config.get(key_name)
|
||||||
|
if not key_str:
|
||||||
|
raise ValidationError(_(
|
||||||
|
"No '%s' entry found in config file. "
|
||||||
|
"Use a key similar to: %s") % (key_name, Fernet.generate_key())
|
||||||
|
)
|
||||||
|
# key should be in bytes format
|
||||||
|
key = key_str.encode()
|
||||||
|
return Fernet(key)
|
||||||
|
|
||||||
|
@api.model
|
||||||
|
def _encrypt_data(self, data, env):
|
||||||
|
cipher = self._get_cipher(env)
|
||||||
|
if not isinstance(data, bytes):
|
||||||
|
data = data.encode()
|
||||||
|
return cipher.encrypt(data or '')
|
||||||
|
|
||||||
|
@api.model
|
||||||
|
def _encrypted_store(self, name, data, env=None):
|
||||||
|
if not self.env.user._is_superuser():
|
||||||
|
raise AccessError(
|
||||||
|
_("You can only encrypt data as superuser"))
|
||||||
|
if not env:
|
||||||
|
env = self._retrieve_env()
|
||||||
|
encrypted_data = self._encrypt_data(data, env)
|
||||||
|
existing_data = self.search(
|
||||||
|
[('name', '=', name), ('environment', '=', env)])
|
||||||
|
if existing_data:
|
||||||
|
existing_data.write({'encrypted_data': encrypted_data})
|
||||||
|
else:
|
||||||
|
self.create({
|
||||||
|
'name': name,
|
||||||
|
'environment': env,
|
||||||
|
'encrypted_data': encrypted_data,
|
||||||
|
})
|
||||||
|
self._encrypted_get.clear_cache(self)
|
||||||
|
self._encrypted_read_json.clear_cache(self)
|
||||||
|
|
||||||
|
@api.model
|
||||||
|
def _encrypted_store_json(self, name, json_data, env=None):
|
||||||
|
return self._encrypted_store(name, json.dumps(json_data), env=env)
|
||||||
|
|
@ -0,0 +1,21 @@
|
||||||
|
To configure this module, you need to edit the main configuration file
|
||||||
|
of your instance, and add a directive called ``running_env``. Commonly
|
||||||
|
used values are 'dev', 'test', 'production'::
|
||||||
|
|
||||||
|
[options]
|
||||||
|
running_env=dev
|
||||||
|
|
||||||
|
|
||||||
|
You also need to set the encryption key(s). The main idea is to have different
|
||||||
|
encryption keys for your different environment, to avoid the possibility to retrieve
|
||||||
|
crucial information from the production environment in a developement environment, for instance.
|
||||||
|
So, if your running environment is 'dev'::
|
||||||
|
|
||||||
|
[options]
|
||||||
|
encryption_key_dev=fyeMIx9XVPBBky5XZeLDxVc9dFKy7Uzas3AoyMarHPA=
|
||||||
|
|
||||||
|
In the configuration file of your production environment, you may want to configure
|
||||||
|
all your other environments encryption key. This way, from production you can encrypt and decrypt
|
||||||
|
data for all environments.
|
||||||
|
|
||||||
|
You can generate keys with python -c 'from cryptography.fernet import Fernet; print Fernet.generate_key()'.
|
||||||
|
|
@ -0,0 +1,2 @@
|
||||||
|
* Raphaël Reverdy <raphael.reverdy@akretion.com>
|
||||||
|
* Florian da Costa <florian.dacosta@akretion.com>
|
||||||
|
|
@ -0,0 +1,3 @@
|
||||||
|
This module allows to encrypt and decrypt data. This module is not usable
|
||||||
|
by itself, it is a low level module which should work as a base for others.
|
||||||
|
An example is the module server_environment_data_encryption
|
||||||
|
|
@ -0,0 +1,5 @@
|
||||||
|
For now the encryption is dependent on the environment. It has been designed
|
||||||
|
to store the same kind of data with different values depending on the environement
|
||||||
|
(dev, preprod, prod...).
|
||||||
|
An improvement could be to split this in 2 modules. But the environment stuff
|
||||||
|
is not a big constraint.
|
||||||
|
|
@ -0,0 +1,2 @@
|
||||||
|
id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink
|
||||||
|
access_encrypted_data,access_encrypted_data,model_encrypted_data,base.group_system,0,0,0,0
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
from . import test_data_encrypt
|
||||||
|
|
@ -0,0 +1,39 @@
|
||||||
|
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
|
||||||
|
|
||||||
|
from odoo.tests.common import TransactionCase
|
||||||
|
from odoo.tools.config import config
|
||||||
|
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
_logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
try:
|
||||||
|
from cryptography.fernet import Fernet
|
||||||
|
except ImportError as err: # pragma: no cover
|
||||||
|
_logger.debug(err)
|
||||||
|
|
||||||
|
|
||||||
|
class CommonDataEncrypted(TransactionCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super().setUp()
|
||||||
|
|
||||||
|
self.encrypted_data = self.env['encrypted.data']
|
||||||
|
self.set_new_key_env('test')
|
||||||
|
self.old_running_env = config.get('running_env', '')
|
||||||
|
config['running_env'] = 'test'
|
||||||
|
self.crypted_data_name = 'test_model,1'
|
||||||
|
|
||||||
|
def set_new_key_env(self, environment):
|
||||||
|
crypting_key = Fernet.generate_key()
|
||||||
|
# The key is encoded to bytes in the module, because in real life
|
||||||
|
# the key com from the config file and is not in a binary format.
|
||||||
|
# So we decode here to avoid having a special behavior because of
|
||||||
|
# the tests.
|
||||||
|
config['encryption_key_{}'.format(environment)] = \
|
||||||
|
crypting_key.decode()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
config['running_env'] = self.old_running_env
|
||||||
|
return super().tearDown()
|
||||||
|
|
@ -0,0 +1,99 @@
|
||||||
|
# © 2016 Akretion Raphaël REVERDY
|
||||||
|
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
|
||||||
|
|
||||||
|
from .common import CommonDataEncrypted
|
||||||
|
from odoo.tools.config import config
|
||||||
|
from odoo.exceptions import AccessError, ValidationError
|
||||||
|
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
_logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
try:
|
||||||
|
from cryptography.fernet import Fernet
|
||||||
|
except ImportError as err: # pragma: no cover
|
||||||
|
_logger.debug(err)
|
||||||
|
|
||||||
|
|
||||||
|
class TestDataEncrypted(CommonDataEncrypted):
|
||||||
|
|
||||||
|
def test_store_data_no_superuser(self):
|
||||||
|
# only superuser can use this model
|
||||||
|
admin = self.env.ref('base.user_admin')
|
||||||
|
with self.assertRaises(AccessError):
|
||||||
|
self.encrypted_data.sudo(admin.id)._encrypted_store(
|
||||||
|
self.crypted_data_name, "My config")
|
||||||
|
|
||||||
|
def test_store_data_noenv_set(self):
|
||||||
|
config.pop('running_env', None)
|
||||||
|
with self.assertRaises(ValidationError):
|
||||||
|
self.encrypted_data.sudo()._encrypted_store(
|
||||||
|
self.crypted_data_name, "My config")
|
||||||
|
|
||||||
|
def test_store_data_nokey_set(self):
|
||||||
|
config.pop('encryption_key_test', None)
|
||||||
|
with self.assertRaises(ValidationError):
|
||||||
|
self.encrypted_data.sudo()._encrypted_store(
|
||||||
|
self.crypted_data_name, "My config")
|
||||||
|
|
||||||
|
def test_get_data_decrypted_and_cache(self):
|
||||||
|
self.encrypted_data.sudo()._encrypted_store(
|
||||||
|
'test_model,1', "My config")
|
||||||
|
data = self.encrypted_data.sudo()._encrypted_get(
|
||||||
|
self.crypted_data_name)
|
||||||
|
self.assertEqual(data, "My config")
|
||||||
|
|
||||||
|
# Test cache really depends on user (super user) else any user could
|
||||||
|
# access the data
|
||||||
|
admin = self.env.ref('base.user_admin')
|
||||||
|
with self.assertRaises(AccessError):
|
||||||
|
self.encrypted_data.sudo(admin)._encrypted_get(
|
||||||
|
self.crypted_data_name)
|
||||||
|
|
||||||
|
# Change value should invalidate cache
|
||||||
|
self.encrypted_data.sudo()._encrypted_store(
|
||||||
|
'test_model,1', "Other Config")
|
||||||
|
new_data = self.encrypted_data.sudo()._encrypted_get(
|
||||||
|
self.crypted_data_name)
|
||||||
|
self.assertEqual(new_data, "Other Config")
|
||||||
|
|
||||||
|
def test_get_data_wrong_key(self):
|
||||||
|
self.encrypted_data.sudo()._encrypted_store(
|
||||||
|
'test_model,1', "My config")
|
||||||
|
new_key = Fernet.generate_key()
|
||||||
|
config['encryption_key_test'] = new_key.decode()
|
||||||
|
with self.assertRaises(ValidationError):
|
||||||
|
self.encrypted_data.sudo()._encrypted_get(
|
||||||
|
self.crypted_data_name)
|
||||||
|
|
||||||
|
def test_get_empty_data(self):
|
||||||
|
empty_data = self.encrypted_data.sudo()._encrypted_get(
|
||||||
|
self.crypted_data_name)
|
||||||
|
self.assertEqual(empty_data, None)
|
||||||
|
|
||||||
|
def test_get_wrong_json(self):
|
||||||
|
self.encrypted_data.sudo()._encrypted_store(
|
||||||
|
self.crypted_data_name, 'config')
|
||||||
|
with self.assertRaises(ValidationError):
|
||||||
|
self.encrypted_data.sudo()._encrypted_read_json(
|
||||||
|
self.crypted_data_name)
|
||||||
|
|
||||||
|
def test_get_good_json(self):
|
||||||
|
self.encrypted_data.sudo()._encrypted_store_json(
|
||||||
|
self.crypted_data_name, {'key': 'value'})
|
||||||
|
data = self.encrypted_data.sudo()._encrypted_read_json(
|
||||||
|
self.crypted_data_name)
|
||||||
|
self.assertEqual(data, {'key': 'value'})
|
||||||
|
|
||||||
|
def test_get_empty_json(self):
|
||||||
|
data = self.encrypted_data.sudo()._encrypted_read_json(
|
||||||
|
self.crypted_data_name)
|
||||||
|
self.assertEqual(data, {})
|
||||||
|
|
||||||
|
def test_get_data_with_bin_size_context(self):
|
||||||
|
self.encrypted_data.sudo()._encrypted_store(
|
||||||
|
self.crypted_data_name, "test")
|
||||||
|
data = self.encrypted_data.sudo().with_context(bin_size=True).\
|
||||||
|
_encrypted_get(self.crypted_data_name)
|
||||||
|
self.assertEqual(data, "test")
|
||||||
Loading…
Reference in New Issue