[MIG] data_encryption: Migration to 13.0
This commit is contained in:
parent
df274a2e41
commit
2e3f0e6b99
|
|
@ -3,7 +3,7 @@
|
||||||
{
|
{
|
||||||
"name": "Encryption data",
|
"name": "Encryption data",
|
||||||
"summary": "Store accounts and credentials encrypted by environment",
|
"summary": "Store accounts and credentials encrypted by environment",
|
||||||
"version": "12.0.1.0.0",
|
"version": "13.0.1.0.0",
|
||||||
"development_status": "Alpha",
|
"development_status": "Alpha",
|
||||||
"category": "Tools",
|
"category": "Tools",
|
||||||
"website": "https://github/oca/server-env",
|
"website": "https://github/oca/server-env",
|
||||||
|
|
@ -11,7 +11,7 @@
|
||||||
"license": "AGPL-3",
|
"license": "AGPL-3",
|
||||||
"application": False,
|
"application": False,
|
||||||
"installable": True,
|
"installable": True,
|
||||||
"external_dependencies": {"python": ["cryptography"],},
|
"external_dependencies": {"python": ["cryptography"]},
|
||||||
"depends": ["base",],
|
"depends": ["base"],
|
||||||
"data": ["security/ir.model.access.csv",],
|
"data": ["security/ir.model.access.csv"],
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,12 +1,12 @@
|
||||||
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
|
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
|
||||||
import logging
|
|
||||||
import json
|
import json
|
||||||
|
import logging
|
||||||
|
|
||||||
from odoo import api, fields, models
|
from odoo import api, fields, models
|
||||||
from odoo.exceptions import AccessError, ValidationError
|
from odoo.exceptions import AccessError, ValidationError
|
||||||
|
from odoo.tools import ormcache
|
||||||
from odoo.tools.config import config
|
from odoo.tools.config import config
|
||||||
from odoo.tools.translate import _
|
from odoo.tools.translate import _
|
||||||
from odoo.tools import ormcache
|
|
||||||
|
|
||||||
_logger = logging.getLogger(__name__)
|
_logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
@ -22,15 +22,11 @@ class EncryptedData(models.Model):
|
||||||
_name = "encrypted.data"
|
_name = "encrypted.data"
|
||||||
_description = "Store any encrypted data by environment"
|
_description = "Store any encrypted data by environment"
|
||||||
|
|
||||||
name = fields.Char(
|
name = fields.Char(required=True, readonly=True, index=True, help="Technical name")
|
||||||
required=True, readonly=True, index=True, help="Technical name"
|
|
||||||
)
|
|
||||||
environment = fields.Char(
|
environment = fields.Char(
|
||||||
required=True,
|
required=True, index=True, help="Concerned Odoo environment (prod, preprod...)",
|
||||||
index=True,
|
|
||||||
help="Concerned Odoo environment (prod, preprod...)",
|
|
||||||
)
|
)
|
||||||
encrypted_data = fields.Binary()
|
encrypted_data = fields.Binary(attachment=False)
|
||||||
|
|
||||||
_sql_constraints = [
|
_sql_constraints = [
|
||||||
(
|
(
|
||||||
|
|
@ -60,15 +56,13 @@ class EncryptedData(models.Model):
|
||||||
def _encrypted_get(self, name, env=None):
|
def _encrypted_get(self, name, env=None):
|
||||||
if self.env.context.get("bin_size"):
|
if self.env.context.get("bin_size"):
|
||||||
self = self.with_context(bin_size=False)
|
self = self.with_context(bin_size=False)
|
||||||
if not self.env.user._is_superuser():
|
if not self.env.su:
|
||||||
raise AccessError(
|
raise AccessError(
|
||||||
_("Encrypted data can only be read as superuser")
|
_("Encrypted data can only be read with suspended security (sudo)")
|
||||||
)
|
)
|
||||||
if not env:
|
if not env:
|
||||||
env = self._retrieve_env()
|
env = self._retrieve_env()
|
||||||
encrypted_rec = self.search(
|
encrypted_rec = self.search([("name", "=", name), ("environment", "=", env)])
|
||||||
[("name", "=", name), ("environment", "=", env)]
|
|
||||||
)
|
|
||||||
if not encrypted_rec:
|
if not encrypted_rec:
|
||||||
return None
|
return None
|
||||||
return encrypted_rec._decrypt_data(env)
|
return encrypted_rec._decrypt_data(env)
|
||||||
|
|
@ -111,10 +105,7 @@ class EncryptedData(models.Model):
|
||||||
key_str = config.get(key_name)
|
key_str = config.get(key_name)
|
||||||
if not key_str:
|
if not key_str:
|
||||||
raise ValidationError(
|
raise ValidationError(
|
||||||
_(
|
_("No '%s' entry found in config file. " "Use a key similar to: %s")
|
||||||
"No '%s' entry found in config file. "
|
|
||||||
"Use a key similar to: %s"
|
|
||||||
)
|
|
||||||
% (key_name, Fernet.generate_key())
|
% (key_name, Fernet.generate_key())
|
||||||
)
|
)
|
||||||
# key should be in bytes format
|
# key should be in bytes format
|
||||||
|
|
@ -130,23 +121,19 @@ class EncryptedData(models.Model):
|
||||||
|
|
||||||
@api.model
|
@api.model
|
||||||
def _encrypted_store(self, name, data, env=None):
|
def _encrypted_store(self, name, data, env=None):
|
||||||
if not self.env.user._is_superuser():
|
if not self.env.su:
|
||||||
raise AccessError(_("You can only encrypt data as superuser"))
|
raise AccessError(
|
||||||
|
_("You can only encrypt data with suspended security (sudo)")
|
||||||
|
)
|
||||||
if not env:
|
if not env:
|
||||||
env = self._retrieve_env()
|
env = self._retrieve_env()
|
||||||
encrypted_data = self._encrypt_data(data, env)
|
encrypted_data = self._encrypt_data(data, env)
|
||||||
existing_data = self.search(
|
existing_data = self.search([("name", "=", name), ("environment", "=", env)])
|
||||||
[("name", "=", name), ("environment", "=", env)]
|
|
||||||
)
|
|
||||||
if existing_data:
|
if existing_data:
|
||||||
existing_data.write({"encrypted_data": encrypted_data})
|
existing_data.write({"encrypted_data": encrypted_data})
|
||||||
else:
|
else:
|
||||||
self.create(
|
self.create(
|
||||||
{
|
{"name": name, "environment": env, "encrypted_data": encrypted_data}
|
||||||
"name": name,
|
|
||||||
"environment": env,
|
|
||||||
"encrypted_data": encrypted_data,
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
self._encrypted_get.clear_cache(self)
|
self._encrypted_get.clear_cache(self)
|
||||||
self._encrypted_read_json.clear_cache(self)
|
self._encrypted_read_json.clear_cache(self)
|
||||||
|
|
|
||||||
|
|
@ -1,11 +1,10 @@
|
||||||
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
|
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
from odoo.tests.common import TransactionCase
|
from odoo.tests.common import TransactionCase
|
||||||
from odoo.tools.config import config
|
from odoo.tools.config import config
|
||||||
|
|
||||||
|
|
||||||
import logging
|
|
||||||
|
|
||||||
_logger = logging.getLogger(__name__)
|
_logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|
|
||||||
|
|
@ -1,13 +1,13 @@
|
||||||
# © 2016 Akretion Raphaël REVERDY
|
# © 2016 Akretion Raphaël REVERDY
|
||||||
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
|
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
|
||||||
|
|
||||||
from .common import CommonDataEncrypted
|
|
||||||
from odoo.tools.config import config
|
|
||||||
from odoo.exceptions import AccessError, ValidationError
|
|
||||||
|
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
from odoo.exceptions import AccessError, ValidationError
|
||||||
|
from odoo.tools.config import config
|
||||||
|
|
||||||
|
from .common import CommonDataEncrypted
|
||||||
|
|
||||||
_logger = logging.getLogger(__name__)
|
_logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|
@ -21,7 +21,7 @@ class TestDataEncrypted(CommonDataEncrypted):
|
||||||
# only superuser can use this model
|
# only superuser can use this model
|
||||||
admin = self.env.ref("base.user_admin")
|
admin = self.env.ref("base.user_admin")
|
||||||
with self.assertRaises(AccessError):
|
with self.assertRaises(AccessError):
|
||||||
self.encrypted_data.sudo(admin.id)._encrypted_store(
|
self.encrypted_data.with_user(admin.id)._encrypted_store(
|
||||||
self.crypted_data_name, "My config"
|
self.crypted_data_name, "My config"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -40,74 +40,50 @@ class TestDataEncrypted(CommonDataEncrypted):
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_get_data_decrypted_and_cache(self):
|
def test_get_data_decrypted_and_cache(self):
|
||||||
self.encrypted_data.sudo()._encrypted_store(
|
self.encrypted_data.sudo()._encrypted_store("test_model,1", "My config")
|
||||||
"test_model,1", "My config"
|
data = self.encrypted_data.sudo()._encrypted_get(self.crypted_data_name)
|
||||||
)
|
|
||||||
data = self.encrypted_data.sudo()._encrypted_get(
|
|
||||||
self.crypted_data_name
|
|
||||||
)
|
|
||||||
self.assertEqual(data, "My config")
|
self.assertEqual(data, "My config")
|
||||||
|
|
||||||
# Test cache really depends on user (super user) else any user could
|
# Test cache really depends on user (super user) else any user could
|
||||||
# access the data
|
# access the data
|
||||||
admin = self.env.ref("base.user_admin")
|
admin = self.env.ref("base.user_admin")
|
||||||
with self.assertRaises(AccessError):
|
with self.assertRaises(AccessError):
|
||||||
self.encrypted_data.sudo(admin)._encrypted_get(
|
self.encrypted_data.with_user(admin)._encrypted_get(self.crypted_data_name)
|
||||||
self.crypted_data_name
|
|
||||||
)
|
|
||||||
|
|
||||||
# Change value should invalidate cache
|
# Change value should invalidate cache
|
||||||
self.encrypted_data.sudo()._encrypted_store(
|
self.encrypted_data.sudo()._encrypted_store("test_model,1", "Other Config")
|
||||||
"test_model,1", "Other Config"
|
new_data = self.encrypted_data.sudo()._encrypted_get(self.crypted_data_name)
|
||||||
)
|
|
||||||
new_data = self.encrypted_data.sudo()._encrypted_get(
|
|
||||||
self.crypted_data_name
|
|
||||||
)
|
|
||||||
self.assertEqual(new_data, "Other Config")
|
self.assertEqual(new_data, "Other Config")
|
||||||
|
|
||||||
def test_get_data_wrong_key(self):
|
def test_get_data_wrong_key(self):
|
||||||
self.encrypted_data.sudo()._encrypted_store(
|
self.encrypted_data.sudo()._encrypted_store("test_model,1", "My config")
|
||||||
"test_model,1", "My config"
|
|
||||||
)
|
|
||||||
new_key = Fernet.generate_key()
|
new_key = Fernet.generate_key()
|
||||||
config["encryption_key_test"] = new_key.decode()
|
config["encryption_key_test"] = new_key.decode()
|
||||||
with self.assertRaises(ValidationError):
|
with self.assertRaises(ValidationError):
|
||||||
self.encrypted_data.sudo()._encrypted_get(self.crypted_data_name)
|
self.encrypted_data.sudo()._encrypted_get(self.crypted_data_name)
|
||||||
|
|
||||||
def test_get_empty_data(self):
|
def test_get_empty_data(self):
|
||||||
empty_data = self.encrypted_data.sudo()._encrypted_get(
|
empty_data = self.encrypted_data.sudo()._encrypted_get(self.crypted_data_name)
|
||||||
self.crypted_data_name
|
|
||||||
)
|
|
||||||
self.assertEqual(empty_data, None)
|
self.assertEqual(empty_data, None)
|
||||||
|
|
||||||
def test_get_wrong_json(self):
|
def test_get_wrong_json(self):
|
||||||
self.encrypted_data.sudo()._encrypted_store(
|
self.encrypted_data.sudo()._encrypted_store(self.crypted_data_name, "config")
|
||||||
self.crypted_data_name, "config"
|
|
||||||
)
|
|
||||||
with self.assertRaises(ValidationError):
|
with self.assertRaises(ValidationError):
|
||||||
self.encrypted_data.sudo()._encrypted_read_json(
|
self.encrypted_data.sudo()._encrypted_read_json(self.crypted_data_name)
|
||||||
self.crypted_data_name
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_get_good_json(self):
|
def test_get_good_json(self):
|
||||||
self.encrypted_data.sudo()._encrypted_store_json(
|
self.encrypted_data.sudo()._encrypted_store_json(
|
||||||
self.crypted_data_name, {"key": "value"}
|
self.crypted_data_name, {"key": "value"}
|
||||||
)
|
)
|
||||||
data = self.encrypted_data.sudo()._encrypted_read_json(
|
data = self.encrypted_data.sudo()._encrypted_read_json(self.crypted_data_name)
|
||||||
self.crypted_data_name
|
|
||||||
)
|
|
||||||
self.assertEqual(data, {"key": "value"})
|
self.assertEqual(data, {"key": "value"})
|
||||||
|
|
||||||
def test_get_empty_json(self):
|
def test_get_empty_json(self):
|
||||||
data = self.encrypted_data.sudo()._encrypted_read_json(
|
data = self.encrypted_data.sudo()._encrypted_read_json(self.crypted_data_name)
|
||||||
self.crypted_data_name
|
|
||||||
)
|
|
||||||
self.assertEqual(data, {})
|
self.assertEqual(data, {})
|
||||||
|
|
||||||
def test_get_data_with_bin_size_context(self):
|
def test_get_data_with_bin_size_context(self):
|
||||||
self.encrypted_data.sudo()._encrypted_store(
|
self.encrypted_data.sudo()._encrypted_store(self.crypted_data_name, "test")
|
||||||
self.crypted_data_name, "test"
|
|
||||||
)
|
|
||||||
data = (
|
data = (
|
||||||
self.encrypted_data.sudo()
|
self.encrypted_data.sudo()
|
||||||
.with_context(bin_size=True)
|
.with_context(bin_size=True)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue