From ec398c064a2d3def0c63c9e64ec3bc3f748e1ee0 Mon Sep 17 00:00:00 2001 From: Donovan Date: Tue, 16 Jul 2024 12:52:44 -0500 Subject: [PATCH] refactor code --- nkode_demo.ipynb | 55 --------- src/customer_db_model.py | 56 +++++++++ ...ode_interface.py => customer_interface.py} | 18 +-- src/pseudo_nkode_api.py | 112 +----------------- src/session_cache.py | 31 +++++ src/user_cipher_keys.py | 8 +- src/user_db_model.py | 30 +++++ test/test_nkode_interface.py | 2 +- 8 files changed, 126 insertions(+), 186 deletions(-) create mode 100644 src/customer_db_model.py rename src/{nkode_interface.py => customer_interface.py} (62%) create mode 100644 src/session_cache.py create mode 100644 src/user_db_model.py diff --git a/nkode_demo.ipynb b/nkode_demo.ipynb index 5652054..db13099 100644 --- a/nkode_demo.ipynb +++ b/nkode_demo.ipynb @@ -1,60 +1,5 @@ { "cells": [ - { - "cell_type": "code", - "execution_count": 12, - "outputs": [], - "source": [ - "import string" - ], - "metadata": { - "collapsed": false, - "ExecuteTime": { - "end_time": "2024-07-10T23:28:52.928913Z", - "start_time": "2024-07-10T23:28:52.924646Z" - } - } - }, - { - "cell_type": "code", - "execution_count": 26, - "outputs": [], - "source": [ - "attributes = string.ascii_letters + string.digits + string.punctuation\n", - "attributes = attributes.replace(\"'\", \"\")\n", - "attributes = attributes.replace('\"', \"\")\n", - "attributes = attributes[:70]" - ], - "metadata": { - "collapsed": false, - "ExecuteTime": { - "end_time": "2024-07-11T11:58:52.615905Z", - "start_time": "2024-07-11T11:58:52.614545Z" - } - } - }, - { - "cell_type": "code", - "execution_count": 28, - "outputs": [ - { - "data": { - "text/plain": "70" - }, - "execution_count": 28, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [], - "metadata": { - "collapsed": false, - "ExecuteTime": { - "end_time": "2024-07-11T12:00:03.004548Z", - "start_time": "2024-07-11T12:00:03.000564Z" - } - } - }, { "cell_type": "code", "execution_count": null, diff --git a/src/customer_db_model.py b/src/customer_db_model.py new file mode 100644 index 0000000..585bcce --- /dev/null +++ b/src/customer_db_model.py @@ -0,0 +1,56 @@ +from uuid import UUID +from pydantic import BaseModel +from src.customer_interface import CustomerInterface +from src.user_db_model import UserDBModel +from src.utils import xor_lists + + +class CustomerDBModel(BaseModel): + customer_id: UUID + interface: CustomerInterface + users: dict[str, UserDBModel] + + def add_new_user(self, user: UserDBModel): + self.users[user.username] = user + + def valid_key_entry(self, username, selected_keys) -> bool: + assert (username in self.users.keys()) + assert (all(key_idx < self.interface.numb_keys for key_idx in selected_keys)) + passcode_len = len(selected_keys) + user = self.users[username] + + passcode_set_vals = user.user_keys.decipher_mask( + user.enciphered_passcode.mask, self.interface.set_vals, len(selected_keys)) + set_vals_idx = [self.interface.get_set_index(set_val) for set_val in passcode_set_vals] + + presumed_selected_attributes = [] + presumed_selected_attributes_idx = [] + for idx in range(passcode_len): + key_numb = selected_keys[idx] + key_attribute_idxs = user.user_interface.get_key_attr_idxs(key_numb) + + set_idx = set_vals_idx[idx] + selected_attr_idx = key_attribute_idxs[set_idx] + presumed_selected_attributes_idx.append(selected_attr_idx) + presumed_selected_attributes.append(selected_attr_idx) + + enciphered_attr = user.user_keys.encipher_salt_hash_code(presumed_selected_attributes, self.interface) + if enciphered_attr != user.enciphered_passcode.code: + return False + + if user.renew: + user.refresh_passcode(presumed_selected_attributes_idx, self.interface) + return True + + def renew_keys(self) -> bool: + attrs_before = self.interface.customer_interface.copy() + sets_before = self.interface.set_vals.copy() + self.interface.renew_interface() + attrs_after = self.interface.customer_interface + sets_after = self.interface.set_vals + + attrs_xor = xor_lists(attrs_after, attrs_before) + set_xor = xor_lists(sets_after, sets_before) + for user in self.users.values(): + user.renew_keys(set_xor, attrs_xor) + return True diff --git a/src/nkode_interface.py b/src/customer_interface.py similarity index 62% rename from src/nkode_interface.py rename to src/customer_interface.py index 40e6671..06d4afd 100644 --- a/src/nkode_interface.py +++ b/src/customer_interface.py @@ -1,8 +1,5 @@ from pydantic import BaseModel -from src.utils import ( - generate_random_nonrepeating_list, list_to_matrix, - secure_fisher_yates_shuffle -) +from src.utils import generate_random_nonrepeating_list class CustomerInterface(BaseModel): @@ -27,19 +24,6 @@ class CustomerInterface(BaseModel): self.customer_interface = generate_random_nonrepeating_list(self.attrs_per_key * self.numb_keys) self.set_vals = generate_random_nonrepeating_list(self.attrs_per_key) - def get_interface_by_set(self) -> dict[int, list[int]]: - interface_by_set = {self.set_vals[set_val]: [] for set_val in self.set_vals} - for idx, attr_set in enumerate(self.interface): - set_idx = idx % self.numb_of_sets - interface_by_set[self.set_vals[set_idx]].append(attr_set) - return interface_by_set - - def get_interface_by_key(self) -> list[list[int]]: - return list_to_matrix(self.interface, self.numb_keys) - - def get_random_index_interface(self) -> list[int]: - return secure_fisher_yates_shuffle(list(range(len(self.interface)))) - def get_attr_set_val(self, attr: int) -> int: assert (attr in self.customer_interface) attr_idx = self.customer_interface.index(attr) diff --git a/src/pseudo_nkode_api.py b/src/pseudo_nkode_api.py index ea2421e..a2ebbc8 100644 --- a/src/pseudo_nkode_api.py +++ b/src/pseudo_nkode_api.py @@ -1,95 +1,12 @@ from uuid import UUID, uuid4 from pydantic import BaseModel -from src.models import EncipheredNKode +from src.customer_db_model import CustomerDBModel +from src.session_cache import SessionCacheModel from src.user_cipher_keys import UserCipherKeys +from src.user_db_model import UserDBModel from src.user_interface import UserInterface -from src.nkode_interface import CustomerInterface -from src.utils import xor_lists - - -class UserDBModel(BaseModel): - username: str - enciphered_passcode: EncipheredNKode - user_keys: UserCipherKeys - user_interface: UserInterface - - def renew_keys(self, sets_xor: list[int], attrs_xor: list[int]): - self.user_keys.renew = True - self.user_keys.set_key = xor_lists(self.user_keys.set_key, sets_xor) - self.user_keys.alpha_key = xor_lists(self.user_keys.alpha_key, attrs_xor) - - def refresh_passcode(self, passcode_attr_idx: list[int], customer_interface: CustomerInterface): - self.user_keys = UserCipherKeys.new_user_encipher_keys( - customer_interface.numb_keys, - customer_interface.attrs_per_key, - customer_interface.set_vals - - ) - self.enciphered_passcode = self.user_keys.encipher_nkode(passcode_attr_idx, customer_interface) - -class CustomerDBModel(BaseModel): - customer_id: UUID - interface: CustomerInterface - users: dict[str, UserDBModel] - - def add_new_user(self, user: UserDBModel): - self.users[user.username] = user - - def valid_key_entry(self, username, selected_keys) -> bool: - assert (username in self.users.keys()) - assert (all(key_idx < self.interface.numb_keys for key_idx in selected_keys)) - passcode_len = len(selected_keys) - user = self.users[username] - - passcode_set_vals = user.user_keys.decipher_mask( - user.enciphered_passcode.mask, self.interface.set_vals, len(selected_keys)) - set_vals_idx = [self.interface.get_set_index(set_val) for set_val in passcode_set_vals] - - presumed_selected_attributes = [] - presumed_selected_attributes_idx = [] - for idx in range(passcode_len): - key_numb = selected_keys[idx] - key_attribute_idxs = user.user_interface.get_key_attr_idxs(key_numb) - - set_idx = set_vals_idx[idx] - selected_attr_idx = key_attribute_idxs[set_idx] - presumed_selected_attributes_idx.append(selected_attr_idx) - presumed_selected_attributes.append(selected_attr_idx) - - enciphered_attr = user.user_keys.encipher_salt_hash_code(presumed_selected_attributes, self.interface) - if enciphered_attr != user.enciphered_passcode.code: - return False - - if user.user_keys.renew: - user.refresh_passcode(presumed_selected_attributes_idx, self.interface) - - return True - - def renew_keys(self) -> bool: - attrs_before = self.interface.customer_interface.copy() - sets_before = self.interface.set_vals.copy() - self.interface.renew_interface() - attrs_after = self.interface.customer_interface - sets_after = self.interface.set_vals - - attrs_xor = xor_lists(attrs_after, attrs_before) - set_xor = xor_lists(sets_after, sets_before) - for user in self.users.values(): - user.renew_keys(set_xor, attrs_xor) - return True - - -class SessionCacheModel(BaseModel): - session_id: UUID - set_interface: list[int] | None = None - confirm_interface: list[int] | None = None - set_key_entry: list[int] | None = None - confirm_key_entry: list[int] | None = None - username: str | None = None - customer_id: UUID - - # timeout: datetime +from src.customer_interface import CustomerInterface class PseudoNKodeAPI(BaseModel): @@ -149,11 +66,12 @@ class PseudoNKodeAPI(BaseModel): customer_id == self.sessions[session_id].customer_id and username == self.sessions[session_id].username ) + session = self.sessions[session_id] customer = self.customers[customer_id] numb_of_keys = customer.interface.numb_keys attrs_per_key = customer.interface.attrs_per_key assert (all(0 <= key <= numb_of_keys for key in confirm_key_entry)) - passcode = self._deduce_passcode(session_id, attrs_per_key, confirm_key_entry) + passcode = session.deduce_passcode(attrs_per_key, confirm_key_entry) set_values = customer.interface.set_vals new_user_keys = UserCipherKeys.new_user_encipher_keys(numb_of_keys, attrs_per_key, set_values) enciphered_passcode = new_user_keys.encipher_nkode(passcode, customer.interface) @@ -171,24 +89,6 @@ class PseudoNKodeAPI(BaseModel): return "success" # del self.sessions[session_id] - def _deduce_passcode(self, session_id: UUID, attrs_per_key, confirm_key_entry: list[int]) -> list[int]: - session = self.sessions[session_id] - set_key_entry = session.set_key_entry - assert (len(set_key_entry) == len(confirm_key_entry)) - set_interface = session.set_interface - confirm_interface = session.confirm_interface - set_key_vals = [set_interface[key * attrs_per_key:(key + 1) * attrs_per_key] for key in set_key_entry] - confirm_key_vals = [confirm_interface[key * attrs_per_key:(key + 1) * attrs_per_key] for key in - confirm_key_entry] - - passcode = [] - for idx in range(len(set_key_entry)): - set_key = set(set_key_vals[idx]) - confirm_key = set(confirm_key_vals[idx]) - intersection = list(set_key.intersection(confirm_key)) - assert (len(intersection) == 1) - passcode.append(intersection[0]) - return passcode def login(self, customer_id: UUID, username: str, key_selection: list[int]) -> bool: assert (customer_id in self.customers.keys()) diff --git a/src/session_cache.py b/src/session_cache.py new file mode 100644 index 0000000..8ca5891 --- /dev/null +++ b/src/session_cache.py @@ -0,0 +1,31 @@ +from uuid import UUID + +from pydantic import BaseModel + + +class SessionCacheModel(BaseModel): + session_id: UUID + set_interface: list[int] | None = None + confirm_interface: list[int] | None = None + set_key_entry: list[int] | None = None + confirm_key_entry: list[int] | None = None + username: str | None = None + customer_id: UUID + + def deduce_passcode(self, attrs_per_key, confirm_key_entry: list[int]) -> list[int]: + set_key_entry = self.set_key_entry + assert (len(set_key_entry) == len(confirm_key_entry)) + set_interface = self.set_interface + confirm_interface = self.confirm_interface + set_key_vals = [set_interface[key * attrs_per_key:(key + 1) * attrs_per_key] for key in set_key_entry] + confirm_key_vals = [confirm_interface[key * attrs_per_key:(key + 1) * attrs_per_key] for key in + confirm_key_entry] + + passcode = [] + for idx in range(len(set_key_entry)): + set_key = set(set_key_vals[idx]) + confirm_key = set(confirm_key_vals[idx]) + intersection = list(set_key.intersection(confirm_key)) + assert (len(intersection) == 1) + passcode.append(intersection[0]) + return passcode diff --git a/src/user_cipher_keys.py b/src/user_cipher_keys.py index b5e100d..d08f8b0 100644 --- a/src/user_cipher_keys.py +++ b/src/user_cipher_keys.py @@ -5,7 +5,7 @@ from secrets import choice from pydantic import BaseModel from src.models import EncipheredNKode -from src.nkode_interface import CustomerInterface +from src.customer_interface import CustomerInterface from src.utils import generate_random_nonrepeating_list, xor_lists, int_array_to_bytes @@ -16,7 +16,6 @@ class UserCipherKeys(BaseModel): mask_key: list[int] salt: bytes max_nkode_len: int = 10 - renew: bool = False @staticmethod def new_user_encipher_keys(numb_of_keys: int, attrs_per_key: int, set_values: list[int]): @@ -40,11 +39,6 @@ class UserCipherKeys(BaseModel): padded_user_mask.append(choice(set_vals)) return padded_user_mask - @staticmethod - def pad_user_code(user_code: list[int], max_nkode_len: int) -> list[int]: - assert (len(user_code) <= max_nkode_len) - return user_code + [0 for _ in range(max_nkode_len - len(user_code))] - @staticmethod def encode_base64_str(data: list[int]) -> str: return base64.b64encode(int_array_to_bytes(data)).decode("utf-8") diff --git a/src/user_db_model.py b/src/user_db_model.py new file mode 100644 index 0000000..8046694 --- /dev/null +++ b/src/user_db_model.py @@ -0,0 +1,30 @@ +from pydantic import BaseModel + +from src.models import EncipheredNKode +from src.customer_interface import CustomerInterface +from src.user_cipher_keys import UserCipherKeys +from src.user_interface import UserInterface +from src.utils import xor_lists + + +class UserDBModel(BaseModel): + username: str + enciphered_passcode: EncipheredNKode + user_keys: UserCipherKeys + user_interface: UserInterface + renew: bool = False + + def renew_keys(self, sets_xor: list[int], attrs_xor: list[int]): + self.renew = True + self.user_keys.set_key = xor_lists(self.user_keys.set_key, sets_xor) + self.user_keys.alpha_key = xor_lists(self.user_keys.alpha_key, attrs_xor) + + def refresh_passcode(self, passcode_attr_idx: list[int], customer_interface: CustomerInterface): + self.user_keys = UserCipherKeys.new_user_encipher_keys( + customer_interface.numb_keys, + customer_interface.attrs_per_key, + customer_interface.set_vals + + ) + self.enciphered_passcode = self.user_keys.encipher_nkode(passcode_attr_idx, customer_interface) + self.renew = False diff --git a/test/test_nkode_interface.py b/test/test_nkode_interface.py index c527821..b523990 100644 --- a/test/test_nkode_interface.py +++ b/test/test_nkode_interface.py @@ -1,5 +1,5 @@ import pytest -from src.nkode_interface import CustomerInterface +from src.customer_interface import CustomerInterface from src.user_interface import UserInterface @pytest.mark.parametrize(