From c4d82337301d69f0eaa30cee0f05bab26e4ce940 Mon Sep 17 00:00:00 2001 From: Donovan Date: Wed, 17 Jul 2024 08:43:45 -0500 Subject: [PATCH] rename functions and attributes --- nkode_api.py | 37 ++++----- nkode_demo.ipynb | 91 ++++++++++++++++++++++- src/{customer_db_model.py => customer.py} | 25 ++++++- src/customer_interface.py | 16 ++-- src/models.py | 8 ++ src/user_cipher_keys.py | 6 +- src/user_db_model.py | 4 +- src/user_interface.py | 36 ++++----- test/test_nkode_api.py | 32 ++++---- test/test_nkode_interface.py | 11 ++- test/test_user_cipher_keys.py | 6 +- test/test_user_interface.py | 2 +- 12 files changed, 195 insertions(+), 79 deletions(-) rename src/{customer_db_model.py => customer.py} (67%) diff --git a/nkode_api.py b/nkode_api.py index 14260d6..cee8e98 100644 --- a/nkode_api.py +++ b/nkode_api.py @@ -1,7 +1,8 @@ from uuid import UUID, uuid4 from pydantic import BaseModel -from src.customer_db_model import CustomerDBModel +from src.customer import Customer +from src.models import NKodePolicy from src.session_cache import SessionCacheModel from src.user_cipher_keys import UserCipherKeys from src.user_db_model import UserDBModel @@ -10,13 +11,13 @@ from src.customer_interface import CustomerInterface class NKodeAPI(BaseModel): - customers: dict[UUID, CustomerDBModel] = {} + customers: dict[UUID, Customer] = {} sessions: dict[UUID, SessionCacheModel] = {} def generate_index_interface(self, customer_id: UUID) -> tuple[UUID, list[int]]: assert (customer_id in self.customers.keys()) customer = self.customers[customer_id] - set_interface = UserInterface.new_interface(customer.interface.attrs_per_key, customer.interface.numb_keys) + set_interface = UserInterface.new(customer.interface.attrs_per_key, customer.interface.numb_of_keys) new_session = SessionCacheModel( session_id=uuid4(), set_interface=set_interface.interface_index, @@ -45,13 +46,13 @@ class NKodeAPI(BaseModel): assert (session_id in self.sessions.keys()) session = self.sessions[session_id] assert (customer_id == session.customer_id) - numb_of_keys = customer.interface.numb_keys + numb_of_keys = customer.interface.numb_of_keys attrs_per_key = customer.interface.attrs_per_key assert (all(0 <= key <= numb_of_keys for key in key_selection)) set_interface = UserInterface( interface_index=session.set_interface, - numb_sets=attrs_per_key, - numb_keys=numb_of_keys, + attrs_per_key=attrs_per_key, + numb_of_keys=numb_of_keys, ) set_interface.disperse_interface() session.username = username @@ -60,7 +61,7 @@ class NKodeAPI(BaseModel): self.sessions[session_id] = session return self.sessions[session_id].confirm_interface - def confirm_nkode(self, username: str, customer_id: UUID, confirm_key_entry: list[int], session_id: UUID): + def confirm_nkode(self, username: str, customer_id: UUID, confirm_key_entry: list[int], session_id: UUID) -> bool: assert ( session_id in self.sessions.keys() and customer_id == self.sessions[session_id].customer_id and @@ -68,12 +69,14 @@ class NKodeAPI(BaseModel): ) session = self.sessions[session_id] customer = self.customers[customer_id] - numb_of_keys = customer.interface.numb_keys + numb_of_keys = customer.interface.numb_of_keys attrs_per_key = customer.interface.attrs_per_key assert (all(0 <= key <= numb_of_keys for key in confirm_key_entry)) passcode = session.deduce_passcode(attrs_per_key, confirm_key_entry) set_values = customer.interface.set_vals - new_user_keys = UserCipherKeys.new_user_encipher_keys(numb_of_keys, attrs_per_key, set_values) + if not customer.valid_new_nkode(passcode): + return False + new_user_keys = UserCipherKeys.new(numb_of_keys, attrs_per_key, set_values) enciphered_passcode = new_user_keys.encipher_nkode(passcode, customer.interface) new_user = UserDBModel( username=username, @@ -81,15 +84,14 @@ class NKodeAPI(BaseModel): user_keys=new_user_keys, user_interface=UserInterface( interface_index=self.sessions[session_id].confirm_interface, - numb_sets=attrs_per_key, - numb_keys=numb_of_keys, + attrs_per_key=attrs_per_key, + numb_of_keys=numb_of_keys, ), ) self.customers[customer_id].add_new_user(new_user) - return "success" + return True # del self.sessions[session_id] - def login(self, customer_id: UUID, username: str, key_selection: list[int]) -> bool: assert (customer_id in self.customers.keys()) customer = self.customers[customer_id] @@ -99,12 +101,13 @@ class NKodeAPI(BaseModel): assert (customer_id in self.customers.keys()) return self.customers[customer_id].renew_keys() - def create_new_customer(self, numb_keys: int, numb_sets: int) -> CustomerDBModel: - new_customer = CustomerDBModel( + def create_new_customer(self, numb_keys: int, numb_sets: int, nkode_policy: NKodePolicy) -> UUID: + new_customer = Customer( customer_id=uuid4(), - interface=CustomerInterface.new_interface(numb_keys, numb_sets), + interface=CustomerInterface.new(numb_keys, numb_sets), users={}, + nkode_policy=nkode_policy ) self.customers[new_customer.customer_id] = new_customer - return new_customer + return new_customer.customer_id diff --git a/nkode_demo.ipynb b/nkode_demo.ipynb index db13099..30d40db 100644 --- a/nkode_demo.ipynb +++ b/nkode_demo.ipynb @@ -2,11 +2,98 @@ "cells": [ { "cell_type": "code", - "execution_count": null, + "execution_count": 1, + "outputs": [], + "source": [ + "from nkode_api import NKodeAPI\n", + "from src.models import NKodePolicy" + ], + "metadata": { + "collapsed": false, + "ExecuteTime": { + "end_time": "2024-07-17T13:17:41.756601Z", + "start_time": "2024-07-17T13:17:41.702924Z" + } + } + }, + { + "cell_type": "code", + "execution_count": 2, + "outputs": [], + "source": [ + "api = NKodeAPI()" + ], + "metadata": { + "collapsed": false, + "ExecuteTime": { + "end_time": "2024-07-17T13:17:41.759087Z", + "start_time": "2024-07-17T13:17:41.756977Z" + } + } + }, + { + "cell_type": "code", + "execution_count": 3, + "outputs": [], + "source": [ + "policy = NKodePolicy(\n", + " max_nkode_len=10,\n", + " min_nkode_len=4,\n", + " distinct_sets=0,\n", + " distinct_attributes=4\n", + ")\n", + "numb_of_keys = 10\n", + "attrs_per_key = 7 # aka number of sets\n", + "customer_id = api.create_new_customer(numb_of_keys, attrs_per_key, policy)" + ], + "metadata": { + "collapsed": false, + "ExecuteTime": { + "end_time": "2024-07-17T13:17:41.886328Z", + "start_time": "2024-07-17T13:17:41.760420Z" + } + } + }, + { + "cell_type": "code", + "execution_count": 4, + "outputs": [], + "source": [ + "customer = api.customers[customer_id]\n", + "customer.interface.customer_interface" + ], + "metadata": { + "collapsed": false, + "ExecuteTime": { + "end_time": "2024-07-17T13:17:41.888107Z", + "start_time": "2024-07-17T13:17:41.886565Z" + } + } + }, + { + "cell_type": "code", + "execution_count": 4, "outputs": [], "source": [], "metadata": { - "collapsed": false + "collapsed": false, + "ExecuteTime": { + "end_time": "2024-07-17T13:17:41.891074Z", + "start_time": "2024-07-17T13:17:41.889383Z" + } + } + }, + { + "cell_type": "code", + "execution_count": 4, + "outputs": [], + "source": [], + "metadata": { + "collapsed": false, + "ExecuteTime": { + "end_time": "2024-07-17T13:17:41.892614Z", + "start_time": "2024-07-17T13:17:41.891400Z" + } } } ], diff --git a/src/customer_db_model.py b/src/customer.py similarity index 67% rename from src/customer_db_model.py rename to src/customer.py index 585bcce..6ab6a0d 100644 --- a/src/customer_db_model.py +++ b/src/customer.py @@ -1,12 +1,14 @@ from uuid import UUID from pydantic import BaseModel from src.customer_interface import CustomerInterface +from src.models import NKodePolicy from src.user_db_model import UserDBModel from src.utils import xor_lists -class CustomerDBModel(BaseModel): +class Customer(BaseModel): customer_id: UUID + nkode_policy: NKodePolicy interface: CustomerInterface users: dict[str, UserDBModel] @@ -15,7 +17,7 @@ class CustomerDBModel(BaseModel): def valid_key_entry(self, username, selected_keys) -> bool: assert (username in self.users.keys()) - assert (all(key_idx < self.interface.numb_keys for key_idx in selected_keys)) + assert (all(key_idx < self.interface.numb_of_keys for key_idx in selected_keys)) passcode_len = len(selected_keys) user = self.users[username] @@ -43,10 +45,10 @@ class CustomerDBModel(BaseModel): return True def renew_keys(self) -> bool: - attrs_before = self.interface.customer_interface.copy() + attrs_before = self.interface.attr_vals.copy() sets_before = self.interface.set_vals.copy() self.interface.renew_interface() - attrs_after = self.interface.customer_interface + attrs_after = self.interface.attr_vals sets_after = self.interface.set_vals attrs_xor = xor_lists(attrs_after, attrs_before) @@ -54,3 +56,18 @@ class CustomerDBModel(BaseModel): for user in self.users.values(): user.renew_keys(set_xor, attrs_xor) return True + + def valid_new_nkode(self, passcode_attr_idx: list[int]) -> bool: + nkode_len = len(passcode_attr_idx) + passcode_set_values = [ + self.interface.get_attr_set_val(self.interface.attr_vals[attr_idx]) for attr_idx in passcode_attr_idx + ] + distinct_sets = len(set(passcode_set_values)) + distinct_attributes = len(set(passcode_attr_idx)) + if ( + self.nkode_policy.min_nkode_len <= nkode_len <= self.nkode_policy.max_nkode_len and + distinct_sets >= self.nkode_policy.distinct_sets and + distinct_attributes >= self.nkode_policy.distinct_attributes + ): + return True + return False diff --git a/src/customer_interface.py b/src/customer_interface.py index 06d4afd..653fd59 100644 --- a/src/customer_interface.py +++ b/src/customer_interface.py @@ -3,30 +3,30 @@ from src.utils import generate_random_nonrepeating_list class CustomerInterface(BaseModel): - customer_interface: list[int] + attr_vals: list[int] set_vals: list[int] - numb_keys: int + numb_of_keys: int attrs_per_key: int @staticmethod - def new_interface(numb_keys: int, attrs_per_key: int): + def new(numb_keys: int, attrs_per_key: int): assert (numb_keys <= 256) assert (attrs_per_key <= 256) return CustomerInterface( - customer_interface=generate_random_nonrepeating_list(attrs_per_key * numb_keys), + attr_vals=generate_random_nonrepeating_list(attrs_per_key * numb_keys), set_vals=generate_random_nonrepeating_list(attrs_per_key), - numb_keys=numb_keys, + numb_of_keys=numb_keys, attrs_per_key=attrs_per_key, ) def renew_interface(self): - self.customer_interface = generate_random_nonrepeating_list(self.attrs_per_key * self.numb_keys) + self.attr_vals = generate_random_nonrepeating_list(self.attrs_per_key * self.numb_of_keys) self.set_vals = generate_random_nonrepeating_list(self.attrs_per_key) def get_attr_set_val(self, attr: int) -> int: - assert (attr in self.customer_interface) - attr_idx = self.customer_interface.index(attr) + assert (attr in self.attr_vals) + attr_idx = self.attr_vals.index(attr) set_idx = attr_idx % self.attrs_per_key return self.set_vals[set_idx] diff --git a/src/models.py b/src/models.py index e57f805..f5eb97a 100644 --- a/src/models.py +++ b/src/models.py @@ -9,3 +9,11 @@ class NKodeAttribute(BaseModel): class EncipheredNKode(BaseModel): code: str mask: str + + +class NKodePolicy(BaseModel): + max_nkode_len: int = 10 + min_nkode_len: int = 4 + distinct_sets: int = 0 + distinct_attributes: int = 4 + # TODO: bytes_per_attr diff --git a/src/user_cipher_keys.py b/src/user_cipher_keys.py index d08f8b0..4653c59 100644 --- a/src/user_cipher_keys.py +++ b/src/user_cipher_keys.py @@ -18,7 +18,7 @@ class UserCipherKeys(BaseModel): max_nkode_len: int = 10 @staticmethod - def new_user_encipher_keys(numb_of_keys: int, attrs_per_key: int, set_values: list[int]): + def new(numb_of_keys: int, attrs_per_key: int, set_values: list[int]): assert len(set_values) == attrs_per_key set_key = generate_random_nonrepeating_list(attrs_per_key) @@ -65,7 +65,7 @@ class UserCipherKeys(BaseModel): customer_interface: CustomerInterface ) -> EncipheredNKode: - passcode_attrs = [customer_interface.customer_interface[idx] for idx in passcode_attr_idx] + passcode_attrs = [customer_interface.attr_vals[idx] for idx in passcode_attr_idx] passcode_sets = [customer_interface.get_attr_set_val(attr) for attr in passcode_attrs] code = self.encipher_salt_hash_code(passcode_attr_idx, customer_interface) mask = self.encipher_mask(passcode_sets, customer_interface) @@ -80,7 +80,7 @@ class UserCipherKeys(BaseModel): customer_interface: CustomerInterface, ) -> str: passcode_len = len(passcode_attr_idx) - passcode_attrs = [customer_interface.customer_interface[idx] for idx in passcode_attr_idx] + passcode_attrs = [customer_interface.attr_vals[idx] for idx in passcode_attr_idx] passcode_cipher = self.pass_key.copy() diff --git a/src/user_db_model.py b/src/user_db_model.py index 8046694..4bfd3eb 100644 --- a/src/user_db_model.py +++ b/src/user_db_model.py @@ -20,8 +20,8 @@ class UserDBModel(BaseModel): self.user_keys.alpha_key = xor_lists(self.user_keys.alpha_key, attrs_xor) def refresh_passcode(self, passcode_attr_idx: list[int], customer_interface: CustomerInterface): - self.user_keys = UserCipherKeys.new_user_encipher_keys( - customer_interface.numb_keys, + self.user_keys = UserCipherKeys.new( + customer_interface.numb_of_keys, customer_interface.attrs_per_key, customer_interface.set_vals diff --git a/src/user_interface.py b/src/user_interface.py index ae21cf1..c67fa1b 100644 --- a/src/user_interface.py +++ b/src/user_interface.py @@ -5,16 +5,16 @@ from src.utils import list_to_matrix, secure_fisher_yates_shuffle, matrix_to_lis class UserInterface(BaseModel): interface_index: list[int] - numb_sets: int - numb_keys: int + attrs_per_key: int + numb_of_keys: int - @classmethod - def new_interface(cls, numb_sets: int, numb_keys: int): + @staticmethod + def new(attrs_per_key: int, numb_of_keys: int): # Todo: this a hack do a proper random interface interface = UserInterface( - interface_index=list(range(numb_sets*numb_keys)), - numb_sets=numb_sets, - numb_keys=numb_keys, + interface_index=list(range(attrs_per_key * numb_of_keys)), + attrs_per_key=attrs_per_key, + numb_of_keys=numb_of_keys, ) interface.disperse_interface() for _ in range(10): @@ -22,9 +22,9 @@ class UserInterface(BaseModel): return interface def disperse_interface(self): - user_interface_matrix = list_to_matrix(self.interface_index, self.numb_sets) + user_interface_matrix = list_to_matrix(self.interface_index, self.attrs_per_key) shuffled_keys = secure_fisher_yates_shuffle(user_interface_matrix) - dispersed_interface = self._random_attribute_rotation(shuffled_keys, list(range(self.numb_sets))) + dispersed_interface = self._random_attribute_rotation(shuffled_keys, list(range(self.attrs_per_key))) self.interface_index = matrix_to_list(dispersed_interface) @staticmethod @@ -33,11 +33,11 @@ class UserInterface(BaseModel): def shuffle_interface(self): """just like dispersion but only half the sets are rotated""" - numb_of_selected_sets = self.numb_sets // 2 - # randomly shuffle half the sets. if numb_sets is odd, randomly add one 50% of the time - numb_of_selected_sets += choice([0, 1]) if (self.numb_sets & 1) == 1 else 0 - selected_sets = secure_fisher_yates_shuffle(list(range(self.numb_sets)))[:numb_of_selected_sets] - user_interface_matrix = list_to_matrix(self.interface_index, self.numb_sets) + numb_of_selected_sets = self.attrs_per_key // 2 + # randomly shuffle half the sets. if attrs_per_key is odd, randomly add one 50% of the time + numb_of_selected_sets += choice([0, 1]) if (self.attrs_per_key & 1) == 1 else 0 + selected_sets = secure_fisher_yates_shuffle(list(range(self.attrs_per_key)))[:numb_of_selected_sets] + user_interface_matrix = list_to_matrix(self.interface_index, self.attrs_per_key) shuffled_keys = secure_fisher_yates_shuffle(user_interface_matrix) interface_by_sets = [] for idx, attrs in enumerate(self.matrix_transpose(shuffled_keys)): @@ -48,7 +48,7 @@ class UserInterface(BaseModel): self.interface_index = matrix_to_list(self.matrix_transpose(interface_by_sets)) def _random_attribute_rotation(self, user_interface: list[list[int]], selected_sets: list[int]) -> list[list[int]]: - attr_rotation = secure_fisher_yates_shuffle(list(range(self.numb_keys)))[:self.numb_sets] + attr_rotation = secure_fisher_yates_shuffle(list(range(self.numb_of_keys)))[:self.attrs_per_key] transposed_user_interface = self.matrix_transpose(user_interface) assert (len(attr_rotation) == len(transposed_user_interface)) for idx, attr_set in enumerate(transposed_user_interface): @@ -59,7 +59,7 @@ class UserInterface(BaseModel): return self.matrix_transpose(transposed_user_interface) def attribute_adjacency_graph(self) -> dict[int, set[int]]: - user_interface_keypad = list_to_matrix(self.interface_index, self.numb_sets) + user_interface_keypad = list_to_matrix(self.interface_index, self.attrs_per_key) graph = {} for key in user_interface_keypad: for attr in key: @@ -69,6 +69,6 @@ class UserInterface(BaseModel): return graph def get_key_attr_idxs(self, key_numb: int) -> list[int]: - assert (0 <= key_numb < self.numb_keys) - keypad_attr_idx = list_to_matrix(self.interface_index, self.numb_sets) + assert (0 <= key_numb < self.numb_of_keys) + keypad_attr_idx = list_to_matrix(self.interface_index, self.attrs_per_key) return keypad_attr_idx[key_numb] diff --git a/test/test_nkode_api.py b/test/test_nkode_api.py index f553f75..e01e481 100644 --- a/test/test_nkode_api.py +++ b/test/test_nkode_api.py @@ -1,5 +1,6 @@ import pytest from nkode_api import NKodeAPI +from src.models import NKodePolicy @pytest.fixture() @@ -7,41 +8,42 @@ def nkode_api() -> NKodeAPI: return NKodeAPI() -@pytest.mark.parametrize("numb_keys,attrs_per_key,user_passcode", [ +@pytest.mark.parametrize("numb_of_keys,attrs_per_key,user_passcode", [ (10, 7, [3, 10, 27, 68]), ]) -def test_create_new_user_and_renew_keys(nkode_api, numb_keys, attrs_per_key, user_passcode): +def test_create_new_user_and_renew_keys(nkode_api, numb_of_keys, attrs_per_key, user_passcode): username = "test_username" - customer = nkode_api.create_new_customer(numb_keys, attrs_per_key) - session_id, set_interface = nkode_api.generate_index_interface(customer.customer_id) + nkode_policy = NKodePolicy() # default policy + customer_id = nkode_api.create_new_customer(numb_of_keys, attrs_per_key, nkode_policy) + session_id, set_interface = nkode_api.generate_index_interface(customer_id) key_selection = lambda interface: [interface.index(attr) // attrs_per_key for attr in user_passcode] set_key_selection = key_selection(set_interface) - confirm_interface = nkode_api.set_nkode(username, customer.customer_id, set_key_selection, session_id) + confirm_interface = nkode_api.set_nkode(username, customer_id, set_key_selection, session_id) confirm_key_selection = key_selection(confirm_interface) - response = nkode_api.confirm_nkode( + successful_confirm = nkode_api.confirm_nkode( username, - customer.customer_id, + customer_id, confirm_key_selection, session_id ) - assert ("success" == response) + assert successful_confirm - login_interface = nkode_api.get_login_index_interface(username, customer.customer_id) + login_interface = nkode_api.get_login_index_interface(username, customer_id) login_key_selection = key_selection(login_interface) - successful_login = nkode_api.login(customer.customer_id, username, login_key_selection) + successful_login = nkode_api.login(customer_id, username, login_key_selection) assert successful_login - successful_renew = nkode_api.renew_keys(customer.customer_id) + successful_renew = nkode_api.renew_keys(customer_id) assert successful_renew - login_interface = nkode_api.get_login_index_interface(username, customer.customer_id) + login_interface = nkode_api.get_login_index_interface(username, customer_id) login_key_selection = key_selection(login_interface) - successful_login = nkode_api.login(customer.customer_id, username, login_key_selection) + successful_login = nkode_api.login(customer_id, username, login_key_selection) assert successful_login - login_interface = nkode_api.get_login_index_interface(username, customer.customer_id) + login_interface = nkode_api.get_login_index_interface(username, customer_id) login_key_selection = key_selection(login_interface) - successful_login = nkode_api.login(customer.customer_id, username, login_key_selection) + successful_login = nkode_api.login(customer_id, username, login_key_selection) assert successful_login diff --git a/test/test_nkode_interface.py b/test/test_nkode_interface.py index b523990..8b6d4bd 100644 --- a/test/test_nkode_interface.py +++ b/test/test_nkode_interface.py @@ -1,14 +1,13 @@ import pytest -from src.customer_interface import CustomerInterface from src.user_interface import UserInterface + @pytest.mark.parametrize( - "numb_keys,attrs_per_key", - [(10, 7, )] + "numb_of_keys,attrs_per_key", + [(10, 7,)] ) -def test_attr_set_idx(numb_keys, attrs_per_key): - user_interface = UserInterface.new_interface(attrs_per_key, numb_keys) - customer_interface = CustomerInterface.new_interface(numb_keys, attrs_per_key) +def test_attr_set_idx(numb_of_keys, attrs_per_key): + user_interface = UserInterface.new(attrs_per_key, numb_of_keys) for attr_idx in range(70): user_interface_idx = user_interface.interface_index[attr_idx] diff --git a/test/test_user_cipher_keys.py b/test/test_user_cipher_keys.py index f484ad6..3e76dcf 100644 --- a/test/test_user_cipher_keys.py +++ b/test/test_user_cipher_keys.py @@ -22,11 +22,11 @@ def test_encode_decode_base64(passcode_len): (10, 7,) ]) def test_decode_mask(numb_of_keys, attrs_per_key): - customer = CustomerInterface.new_interface(numb_of_keys, attrs_per_key) + customer = CustomerInterface.new(numb_of_keys, attrs_per_key) passcode_entry = generate_random_nonrepeating_list(numb_of_keys*attrs_per_key, max_val=70)[:4] - passcode_values = [customer.customer_interface[idx] for idx in passcode_entry] + passcode_values = [customer.attr_vals[idx] for idx in passcode_entry] set_vals = customer.set_vals - user_keys = UserCipherKeys.new_user_encipher_keys(numb_of_keys, attrs_per_key, set_vals) + user_keys = UserCipherKeys.new(numb_of_keys, attrs_per_key, set_vals) passcode = user_keys.encipher_nkode(passcode_entry, customer) orig_passcode_set_vals = [customer.get_attr_set_val(attr) for attr in passcode_values] diff --git a/test/test_user_interface.py b/test/test_user_interface.py index c677385..a31635d 100644 --- a/test/test_user_interface.py +++ b/test/test_user_interface.py @@ -4,7 +4,7 @@ from src.user_interface import UserInterface @pytest.fixture() def user_interface(): - return UserInterface.new_interface(7, 10) + return UserInterface.new(7, 10) def test_dispersion(user_interface):