From df274a2e4125dde7d3a05b92cddf0f5264442284 Mon Sep 17 00:00:00 2001 From: Thomas Binsfeld Date: Fri, 2 Oct 2020 11:21:36 +0200 Subject: [PATCH] [REF] data_encryption: Black python code --- data_encryption/__manifest__.py | 15 +--- data_encryption/models/encrypted_data.py | 90 +++++++++++++--------- data_encryption/tests/common.py | 16 ++-- data_encryption/tests/test_data_encrypt.py | 71 ++++++++++------- 4 files changed, 109 insertions(+), 83 deletions(-) diff --git a/data_encryption/__manifest__.py b/data_encryption/__manifest__.py index 501f45a..292f6f0 100644 --- a/data_encryption/__manifest__.py +++ b/data_encryption/__manifest__.py @@ -4,21 +4,14 @@ "name": "Encryption data", "summary": "Store accounts and credentials encrypted by environment", "version": "12.0.1.0.0", - "development_status": 'Alpha', + "development_status": "Alpha", "category": "Tools", "website": "https://github/oca/server-env", "author": "Akretion, Odoo Community Association (OCA)", "license": "AGPL-3", "application": False, "installable": True, - "external_dependencies": { - "python": [ - 'cryptography'], - }, - "depends": [ - "base", - ], - "data": [ - "security/ir.model.access.csv", - ], + "external_dependencies": {"python": ["cryptography"],}, + "depends": ["base",], + "data": ["security/ir.model.access.csv",], } diff --git a/data_encryption/models/encrypted_data.py b/data_encryption/models/encrypted_data.py index 5e103b6..3353c53 100644 --- a/data_encryption/models/encrypted_data.py +++ b/data_encryption/models/encrypted_data.py @@ -19,22 +19,26 @@ except ImportError as err: # pragma: no cover class EncryptedData(models.Model): """Model to store encrypted data by environment (prod, preprod...)""" - _name = 'encrypted.data' - _description = 'Store any encrypted data by environment' + _name = "encrypted.data" + _description = "Store any encrypted data by environment" name = fields.Char( - required=True, readonly=True, index=True, - help="Technical name") + required=True, readonly=True, index=True, help="Technical name" + ) environment = fields.Char( required=True, index=True, - help="Concerned Odoo environment (prod, preprod...)") + help="Concerned Odoo environment (prod, preprod...)", + ) encrypted_data = fields.Binary() _sql_constraints = [ - ('name_environment_uniq', 'unique (name, environment)', - 'You can not store multiple encrypted data for the same record and \ - environment') + ( + "name_environment_uniq", + "unique (name, environment)", + "You can not store multiple encrypted data for the same record and \ + environment", + ) ] def _decrypt_data(self, env): @@ -43,29 +47,34 @@ class EncryptedData(models.Model): try: return cipher.decrypt(self.encrypted_data).decode() except InvalidToken: - raise ValidationError(_( - "Password has been encrypted with a different " - "key. Unless you can recover the previous key, " - "this password is unreadable.")) + raise ValidationError( + _( + "Password has been encrypted with a different " + "key. Unless you can recover the previous key, " + "this password is unreadable." + ) + ) @api.model - @ormcache('self._uid', 'name', 'env') + @ormcache("self._uid", "name", "env") def _encrypted_get(self, name, env=None): - if self.env.context.get('bin_size'): + if self.env.context.get("bin_size"): self = self.with_context(bin_size=False) if not self.env.user._is_superuser(): raise AccessError( - _("Encrypted data can only be read as superuser")) + _("Encrypted data can only be read as superuser") + ) if not env: env = self._retrieve_env() encrypted_rec = self.search( - [('name', '=', name), ('environment', '=', env)]) + [("name", "=", name), ("environment", "=", env)] + ) if not encrypted_rec: return None return encrypted_rec._decrypt_data(env) @api.model - @ormcache('self._uid', 'name', 'env') + @ormcache("self._uid", "name", "env") def _encrypted_read_json(self, name, env=None): data = self._encrypted_get(name, env=env) if not data: @@ -74,18 +83,22 @@ class EncryptedData(models.Model): return json.loads(data) except (ValueError, TypeError): raise ValidationError( - _("The data you are trying to read are not in a json format")) + _("The data you are trying to read are not in a json format") + ) @staticmethod def _retrieve_env(): """Return the current environment Raise if none is found """ - current = config.get('running_env', False) + current = config.get("running_env", False) if not current: raise ValidationError( - _('No environment found, please check your running_env ' - 'entry in your config file.')) + _( + "No environment found, please check your running_env " + "entry in your config file." + ) + ) return current @classmethod @@ -94,12 +107,15 @@ class EncryptedData(models.Model): force_env = name of the env key. Useful for encoding against one precise env """ - key_name = 'encryption_key_%s' % env + key_name = "encryption_key_%s" % env key_str = config.get(key_name) if not key_str: - raise ValidationError(_( - "No '%s' entry found in config file. " - "Use a key similar to: %s") % (key_name, Fernet.generate_key()) + raise ValidationError( + _( + "No '%s' entry found in config file. " + "Use a key similar to: %s" + ) + % (key_name, Fernet.generate_key()) ) # key should be in bytes format key = key_str.encode() @@ -110,26 +126,28 @@ class EncryptedData(models.Model): cipher = self._get_cipher(env) if not isinstance(data, bytes): data = data.encode() - return cipher.encrypt(data or '') + return cipher.encrypt(data or "") @api.model - def _encrypted_store(self, name, data, env=None): + def _encrypted_store(self, name, data, env=None): if not self.env.user._is_superuser(): - raise AccessError( - _("You can only encrypt data as superuser")) + raise AccessError(_("You can only encrypt data as superuser")) if not env: env = self._retrieve_env() encrypted_data = self._encrypt_data(data, env) existing_data = self.search( - [('name', '=', name), ('environment', '=', env)]) + [("name", "=", name), ("environment", "=", env)] + ) if existing_data: - existing_data.write({'encrypted_data': encrypted_data}) + existing_data.write({"encrypted_data": encrypted_data}) else: - self.create({ - 'name': name, - 'environment': env, - 'encrypted_data': encrypted_data, - }) + self.create( + { + "name": name, + "environment": env, + "encrypted_data": encrypted_data, + } + ) self._encrypted_get.clear_cache(self) self._encrypted_read_json.clear_cache(self) diff --git a/data_encryption/tests/common.py b/data_encryption/tests/common.py index f044569..0ab30b8 100644 --- a/data_encryption/tests/common.py +++ b/data_encryption/tests/common.py @@ -15,15 +15,14 @@ except ImportError as err: # pragma: no cover class CommonDataEncrypted(TransactionCase): - def setUp(self): super().setUp() - self.encrypted_data = self.env['encrypted.data'] - self.set_new_key_env('test') - self.old_running_env = config.get('running_env', '') - config['running_env'] = 'test' - self.crypted_data_name = 'test_model,1' + self.encrypted_data = self.env["encrypted.data"] + self.set_new_key_env("test") + self.old_running_env = config.get("running_env", "") + config["running_env"] = "test" + self.crypted_data_name = "test_model,1" def set_new_key_env(self, environment): crypting_key = Fernet.generate_key() @@ -31,9 +30,8 @@ class CommonDataEncrypted(TransactionCase): # the key com from the config file and is not in a binary format. # So we decode here to avoid having a special behavior because of # the tests. - config['encryption_key_{}'.format(environment)] = \ - crypting_key.decode() + config["encryption_key_{}".format(environment)] = crypting_key.decode() def tearDown(self): - config['running_env'] = self.old_running_env + config["running_env"] = self.old_running_env return super().tearDown() diff --git a/data_encryption/tests/test_data_encrypt.py b/data_encryption/tests/test_data_encrypt.py index 6e42722..1cbb997 100644 --- a/data_encryption/tests/test_data_encrypt.py +++ b/data_encryption/tests/test_data_encrypt.py @@ -17,83 +17,100 @@ except ImportError as err: # pragma: no cover class TestDataEncrypted(CommonDataEncrypted): - def test_store_data_no_superuser(self): # only superuser can use this model - admin = self.env.ref('base.user_admin') + admin = self.env.ref("base.user_admin") with self.assertRaises(AccessError): self.encrypted_data.sudo(admin.id)._encrypted_store( - self.crypted_data_name, "My config") + self.crypted_data_name, "My config" + ) def test_store_data_noenv_set(self): - config.pop('running_env', None) + config.pop("running_env", None) with self.assertRaises(ValidationError): self.encrypted_data.sudo()._encrypted_store( - self.crypted_data_name, "My config") + self.crypted_data_name, "My config" + ) def test_store_data_nokey_set(self): - config.pop('encryption_key_test', None) + config.pop("encryption_key_test", None) with self.assertRaises(ValidationError): self.encrypted_data.sudo()._encrypted_store( - self.crypted_data_name, "My config") + self.crypted_data_name, "My config" + ) def test_get_data_decrypted_and_cache(self): self.encrypted_data.sudo()._encrypted_store( - 'test_model,1', "My config") + "test_model,1", "My config" + ) data = self.encrypted_data.sudo()._encrypted_get( - self.crypted_data_name) + self.crypted_data_name + ) self.assertEqual(data, "My config") # Test cache really depends on user (super user) else any user could # access the data - admin = self.env.ref('base.user_admin') + admin = self.env.ref("base.user_admin") with self.assertRaises(AccessError): self.encrypted_data.sudo(admin)._encrypted_get( - self.crypted_data_name) + self.crypted_data_name + ) # Change value should invalidate cache self.encrypted_data.sudo()._encrypted_store( - 'test_model,1', "Other Config") + "test_model,1", "Other Config" + ) new_data = self.encrypted_data.sudo()._encrypted_get( - self.crypted_data_name) + self.crypted_data_name + ) self.assertEqual(new_data, "Other Config") def test_get_data_wrong_key(self): self.encrypted_data.sudo()._encrypted_store( - 'test_model,1', "My config") + "test_model,1", "My config" + ) new_key = Fernet.generate_key() - config['encryption_key_test'] = new_key.decode() + config["encryption_key_test"] = new_key.decode() with self.assertRaises(ValidationError): - self.encrypted_data.sudo()._encrypted_get( - self.crypted_data_name) + self.encrypted_data.sudo()._encrypted_get(self.crypted_data_name) def test_get_empty_data(self): empty_data = self.encrypted_data.sudo()._encrypted_get( - self.crypted_data_name) + self.crypted_data_name + ) self.assertEqual(empty_data, None) def test_get_wrong_json(self): self.encrypted_data.sudo()._encrypted_store( - self.crypted_data_name, 'config') + self.crypted_data_name, "config" + ) with self.assertRaises(ValidationError): self.encrypted_data.sudo()._encrypted_read_json( - self.crypted_data_name) + self.crypted_data_name + ) def test_get_good_json(self): self.encrypted_data.sudo()._encrypted_store_json( - self.crypted_data_name, {'key': 'value'}) + self.crypted_data_name, {"key": "value"} + ) data = self.encrypted_data.sudo()._encrypted_read_json( - self.crypted_data_name) - self.assertEqual(data, {'key': 'value'}) + self.crypted_data_name + ) + self.assertEqual(data, {"key": "value"}) def test_get_empty_json(self): data = self.encrypted_data.sudo()._encrypted_read_json( - self.crypted_data_name) + self.crypted_data_name + ) self.assertEqual(data, {}) def test_get_data_with_bin_size_context(self): self.encrypted_data.sudo()._encrypted_store( - self.crypted_data_name, "test") - data = self.encrypted_data.sudo().with_context(bin_size=True).\ - _encrypted_get(self.crypted_data_name) + self.crypted_data_name, "test" + ) + data = ( + self.encrypted_data.sudo() + .with_context(bin_size=True) + ._encrypted_get(self.crypted_data_name) + ) self.assertEqual(data, "test")