[MIG] data_encryption: Migration to 13.0

This commit is contained in:
Thomas Binsfeld 2020-10-02 11:21:36 +02:00 committed by Sébastien BEAU
parent 4e471dd390
commit 4e38d0e359
4 changed files with 39 additions and 77 deletions

View File

@ -3,7 +3,7 @@
{ {
"name": "Encryption data", "name": "Encryption data",
"summary": "Store accounts and credentials encrypted by environment", "summary": "Store accounts and credentials encrypted by environment",
"version": "12.0.1.0.0", "version": "13.0.1.0.0",
"development_status": "Alpha", "development_status": "Alpha",
"category": "Tools", "category": "Tools",
"website": "https://github/oca/server-env", "website": "https://github/oca/server-env",
@ -11,7 +11,7 @@
"license": "AGPL-3", "license": "AGPL-3",
"application": False, "application": False,
"installable": True, "installable": True,
"external_dependencies": {"python": ["cryptography"],}, "external_dependencies": {"python": ["cryptography"]},
"depends": ["base",], "depends": ["base"],
"data": ["security/ir.model.access.csv",], "data": ["security/ir.model.access.csv"],
} }

View File

@ -1,12 +1,12 @@
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
import logging
import json import json
import logging
from odoo import api, fields, models from odoo import api, fields, models
from odoo.exceptions import AccessError, ValidationError from odoo.exceptions import AccessError, ValidationError
from odoo.tools import ormcache
from odoo.tools.config import config from odoo.tools.config import config
from odoo.tools.translate import _ from odoo.tools.translate import _
from odoo.tools import ormcache
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
@ -22,15 +22,11 @@ class EncryptedData(models.Model):
_name = "encrypted.data" _name = "encrypted.data"
_description = "Store any encrypted data by environment" _description = "Store any encrypted data by environment"
name = fields.Char( name = fields.Char(required=True, readonly=True, index=True, help="Technical name")
required=True, readonly=True, index=True, help="Technical name"
)
environment = fields.Char( environment = fields.Char(
required=True, required=True, index=True, help="Concerned Odoo environment (prod, preprod...)",
index=True,
help="Concerned Odoo environment (prod, preprod...)",
) )
encrypted_data = fields.Binary() encrypted_data = fields.Binary(attachment=False)
_sql_constraints = [ _sql_constraints = [
( (
@ -60,15 +56,13 @@ class EncryptedData(models.Model):
def _encrypted_get(self, name, env=None): def _encrypted_get(self, name, env=None):
if self.env.context.get("bin_size"): if self.env.context.get("bin_size"):
self = self.with_context(bin_size=False) self = self.with_context(bin_size=False)
if not self.env.user._is_superuser(): if not self.env.su:
raise AccessError( raise AccessError(
_("Encrypted data can only be read as superuser") _("Encrypted data can only be read with suspended security (sudo)")
) )
if not env: if not env:
env = self._retrieve_env() env = self._retrieve_env()
encrypted_rec = self.search( encrypted_rec = self.search([("name", "=", name), ("environment", "=", env)])
[("name", "=", name), ("environment", "=", env)]
)
if not encrypted_rec: if not encrypted_rec:
return None return None
return encrypted_rec._decrypt_data(env) return encrypted_rec._decrypt_data(env)
@ -111,10 +105,7 @@ class EncryptedData(models.Model):
key_str = config.get(key_name) key_str = config.get(key_name)
if not key_str: if not key_str:
raise ValidationError( raise ValidationError(
_( _("No '%s' entry found in config file. " "Use a key similar to: %s")
"No '%s' entry found in config file. "
"Use a key similar to: %s"
)
% (key_name, Fernet.generate_key()) % (key_name, Fernet.generate_key())
) )
# key should be in bytes format # key should be in bytes format
@ -130,23 +121,19 @@ class EncryptedData(models.Model):
@api.model @api.model
def _encrypted_store(self, name, data, env=None): def _encrypted_store(self, name, data, env=None):
if not self.env.user._is_superuser(): if not self.env.su:
raise AccessError(_("You can only encrypt data as superuser")) raise AccessError(
_("You can only encrypt data with suspended security (sudo)")
)
if not env: if not env:
env = self._retrieve_env() env = self._retrieve_env()
encrypted_data = self._encrypt_data(data, env) encrypted_data = self._encrypt_data(data, env)
existing_data = self.search( existing_data = self.search([("name", "=", name), ("environment", "=", env)])
[("name", "=", name), ("environment", "=", env)]
)
if existing_data: if existing_data:
existing_data.write({"encrypted_data": encrypted_data}) existing_data.write({"encrypted_data": encrypted_data})
else: else:
self.create( self.create(
{ {"name": name, "environment": env, "encrypted_data": encrypted_data}
"name": name,
"environment": env,
"encrypted_data": encrypted_data,
}
) )
self._encrypted_get.clear_cache(self) self._encrypted_get.clear_cache(self)
self._encrypted_read_json.clear_cache(self) self._encrypted_read_json.clear_cache(self)

View File

@ -1,11 +1,10 @@
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
import logging
from odoo.tests.common import TransactionCase from odoo.tests.common import TransactionCase
from odoo.tools.config import config from odoo.tools.config import config
import logging
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
try: try:

View File

@ -1,13 +1,13 @@
# © 2016 Akretion Raphaël REVERDY # © 2016 Akretion Raphaël REVERDY
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
from .common import CommonDataEncrypted
from odoo.tools.config import config
from odoo.exceptions import AccessError, ValidationError
import logging import logging
from odoo.exceptions import AccessError, ValidationError
from odoo.tools.config import config
from .common import CommonDataEncrypted
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
try: try:
@ -21,7 +21,7 @@ class TestDataEncrypted(CommonDataEncrypted):
# only superuser can use this model # only superuser can use this model
admin = self.env.ref("base.user_admin") admin = self.env.ref("base.user_admin")
with self.assertRaises(AccessError): with self.assertRaises(AccessError):
self.encrypted_data.sudo(admin.id)._encrypted_store( self.encrypted_data.with_user(admin.id)._encrypted_store(
self.crypted_data_name, "My config" self.crypted_data_name, "My config"
) )
@ -40,74 +40,50 @@ class TestDataEncrypted(CommonDataEncrypted):
) )
def test_get_data_decrypted_and_cache(self): def test_get_data_decrypted_and_cache(self):
self.encrypted_data.sudo()._encrypted_store( self.encrypted_data.sudo()._encrypted_store("test_model,1", "My config")
"test_model,1", "My config" data = self.encrypted_data.sudo()._encrypted_get(self.crypted_data_name)
)
data = self.encrypted_data.sudo()._encrypted_get(
self.crypted_data_name
)
self.assertEqual(data, "My config") self.assertEqual(data, "My config")
# Test cache really depends on user (super user) else any user could # Test cache really depends on user (super user) else any user could
# access the data # access the data
admin = self.env.ref("base.user_admin") admin = self.env.ref("base.user_admin")
with self.assertRaises(AccessError): with self.assertRaises(AccessError):
self.encrypted_data.sudo(admin)._encrypted_get( self.encrypted_data.with_user(admin)._encrypted_get(self.crypted_data_name)
self.crypted_data_name
)
# Change value should invalidate cache # Change value should invalidate cache
self.encrypted_data.sudo()._encrypted_store( self.encrypted_data.sudo()._encrypted_store("test_model,1", "Other Config")
"test_model,1", "Other Config" new_data = self.encrypted_data.sudo()._encrypted_get(self.crypted_data_name)
)
new_data = self.encrypted_data.sudo()._encrypted_get(
self.crypted_data_name
)
self.assertEqual(new_data, "Other Config") self.assertEqual(new_data, "Other Config")
def test_get_data_wrong_key(self): def test_get_data_wrong_key(self):
self.encrypted_data.sudo()._encrypted_store( self.encrypted_data.sudo()._encrypted_store("test_model,1", "My config")
"test_model,1", "My config"
)
new_key = Fernet.generate_key() new_key = Fernet.generate_key()
config["encryption_key_test"] = new_key.decode() config["encryption_key_test"] = new_key.decode()
with self.assertRaises(ValidationError): with self.assertRaises(ValidationError):
self.encrypted_data.sudo()._encrypted_get(self.crypted_data_name) self.encrypted_data.sudo()._encrypted_get(self.crypted_data_name)
def test_get_empty_data(self): def test_get_empty_data(self):
empty_data = self.encrypted_data.sudo()._encrypted_get( empty_data = self.encrypted_data.sudo()._encrypted_get(self.crypted_data_name)
self.crypted_data_name
)
self.assertEqual(empty_data, None) self.assertEqual(empty_data, None)
def test_get_wrong_json(self): def test_get_wrong_json(self):
self.encrypted_data.sudo()._encrypted_store( self.encrypted_data.sudo()._encrypted_store(self.crypted_data_name, "config")
self.crypted_data_name, "config"
)
with self.assertRaises(ValidationError): with self.assertRaises(ValidationError):
self.encrypted_data.sudo()._encrypted_read_json( self.encrypted_data.sudo()._encrypted_read_json(self.crypted_data_name)
self.crypted_data_name
)
def test_get_good_json(self): def test_get_good_json(self):
self.encrypted_data.sudo()._encrypted_store_json( self.encrypted_data.sudo()._encrypted_store_json(
self.crypted_data_name, {"key": "value"} self.crypted_data_name, {"key": "value"}
) )
data = self.encrypted_data.sudo()._encrypted_read_json( data = self.encrypted_data.sudo()._encrypted_read_json(self.crypted_data_name)
self.crypted_data_name
)
self.assertEqual(data, {"key": "value"}) self.assertEqual(data, {"key": "value"})
def test_get_empty_json(self): def test_get_empty_json(self):
data = self.encrypted_data.sudo()._encrypted_read_json( data = self.encrypted_data.sudo()._encrypted_read_json(self.crypted_data_name)
self.crypted_data_name
)
self.assertEqual(data, {}) self.assertEqual(data, {})
def test_get_data_with_bin_size_context(self): def test_get_data_with_bin_size_context(self):
self.encrypted_data.sudo()._encrypted_store( self.encrypted_data.sudo()._encrypted_store(self.crypted_data_name, "test")
self.crypted_data_name, "test"
)
data = ( data = (
self.encrypted_data.sudo() self.encrypted_data.sudo()
.with_context(bin_size=True) .with_context(bin_size=True)