/usr/bin/python3 # =================================================================== # data store class # ------------------------------------------------------------------- # data store records are CSV strings. The first field is the record # ID. The next fields are data. The record ID is an integer in # string format (with start/end spaces removed). # =================================================================== import time class DataStore: def __init__(self,latency=0.0): self.ds = {} self.latency = latency def add(self,id,rec): if id in self.ds: return False self.ds[id] = rec return True def delete(self,id): if id not in self.ds.keys(): return False del self.ds[id] return True def fetch(self,id): if self.latency > 0.0: time.sleep(self.latency) if id not in self.ds.keys(): return None return self.ds[id] def change(self,id,rec): self.ds[id] = rec return True def count(self): return len(self.ds.keys()) def clear(self): c = self.count() self.ds = {} return c def set_fetch_latency(self,newlat): try: n = float(newlat) except: return False if n < 0.0 or n > 0.5: return False self.latency = n return True def get_fetch_latency(self): return self.latency def dump(self,limit=10): c = 0 for k in self.ds.keys(): print(f'[{k:3}] {self.ds[k]}') c += 1 if c >= limit: break return c def dump99(self): print(self.ds) # ------------------------------------------------------------------- # ---- main # ------------------------------------------------------------------- if __name__ == '__main__': def create_test_data(db): db.add('1','1,abc,def') db.add('2','2,ghi,jkl') db.add('3','3,lmn,opq') db.add('4','4,rst,uvw,xyz') return 4 import user_interface as ui db = DataStore() create_test_data(db) while True: # ---- menu ui.clear_screen() print('------test data store-----') print('option description') print('------ ------------------') print(' 1 add DB entry') print(' 2 delete DB entry') print(' 3 fetch DB entry') print(' 4 change DB entry') print(' 5 clear DB') print(' 6 set fetch latency') print(' 7 get fetch latency') print(' 10 DB entry count') print(' 11 dump DB') print(' 12 load test data') print(' 13 exit program') # ---- get user input print() s = ui.get_user_input(' enter menu option: ') if not s: # empty string break tf,i = ui.is_integer(s) if not tf: print() print('input option error') ui.pause() continue # ---- process selection if i == 1: # add DB entry print() msg = 'DB records are a CSV strings with the first\n' +\ 'field the ID (DB records and IDs are strings)' print(msg) print() rec = ui.get_user_input(' Enter DB record (CSV): ') if not rec: print() print('nothing entered') ui.pause() continue lst = rec.split(',',1) id = lst[0].strip() if not id: print() print('bad ID entered') elif not db.add(id,rec): print() print(f'ID {id} already in data store') ui.pause() continue if i == 2: # delete DB entry print() id = ui.get_user_input(' enter record ID: ') x = db.delete(id) print() if x: print('DB entry deleted') else: print('key not found') ui.pause() continue if i == 3: # fetch DB entry print() id = ui.get_user_input(' enter record ID: ') if not id: print() print(f'no ID entered') ui.puse() continue rec = db.fetch(id) print() if rec is None: print(f'DB record "{id}" not found') else: print(f'DB record is "{rec}"') ui.pause() continue if i == 4: # change DB entry print() msg = 'DB entries are a CSV strings with the first\n' +\ 'field the ID (DB entries and IDs are strings)' print(msg) print() rec = ui.get_user_input(' enter DB record: ') if not rec: print() print('nothing entered') ui.pause() continue lst = rec.split(',',1) id = lst[0].strip() if not id: print() print('bad ID entered') else: db.change(id,rec) ui.pause() continue if i == 5: # clear DB of entries c = db.clear() print() print(f'{c} entries deleted') ui.pause() continue if i == 6: # set fetch latency print() s = ui.get_user_input(' enter fetch latency (0.0 to 0.5): ') if not s: print() print('nothing entered') ui.pause() continue tf,f = ui.is_float(s) if not tf: print() print(f'bad latency value {s}') ui.pause() continue tf = db.set_fetch_latency(f) if not tf: print() print(f'set latency failed') ui.pause() continue if i == 7: # get fetch latency l = db.get_fetch_latency() print() print(f'fetch latency is {db.get_fetch_latency():.4}') ui.pause() continue if i == 10: # DB count print() print(f'count = {db.count()}') ui.pause() continue if i == 11: # DB dump print() c = db.dump() print() print(f'dump count = {c}') ui.pause() continue if i == 12: # load test data c = create_test_data(db) print() print(f'{db.count()} DB entries') ui.pause() continue if i == 13: # exit program break if i == 99: # 'raw' data dump print() db.dump99() ui.pause() continue # ---- selection error print(f'unknow selection {i} entered') ui.pause() print()