Adding security mechanisms

This commit is contained in:
chabisik 2022-12-22 21:39:32 +01:00
parent d75122ad5c
commit 6074076841
2 changed files with 148 additions and 12 deletions

Binary file not shown.

158
myapp.py
View file

@ -4,19 +4,142 @@ import flask
import evdev
import requests
import time
import datasaver
import argparse
import threading
import subprocess
import urllib.parse
parser = argparse.ArgumentParser(description="To set up database location and intial values by system administrator")
parser.add_argument("--setDatabase", type=int, default=0, help="Wether system administrator want to set up database (1) or not (0)")
set_database = parser.parse_args().setDatabase
app = flask.Flask(import_name=__name__, template_folder=".")
actuator_locker = threading.Lock()
database_locker = threading.Lock()
def database_setter():
database_location = ""
database_location_ok = False
#---trying to retrieve a previously used path---
if os.path.exists("database_location_path.pickle"):
dp = datasaver.DataPockets("database_location_path")
try:
previously_set_path = dp.get("database_location")[0]
decision = None
while decision!="yes" and decision!="no":
print("A previsouly used path was detected:",previously_set_path)
decision = str(input("Use it? ('yes'/'no'): "))
except:
pass
if decision=="yes":
database_location = previously_set_path
database_location_ok = True
#---setting or creating a path---
while database_location_ok != True:
database_location = str(input("Enter absolute path of folder containing 'database.pickle' to use: "))
if not os.path.exists(database_location):
decision = None
while decision!="yes" and decision!="no":
print("path '",database_location,"' does not exist!")
decision = str(input("Create it? ('yes'/'no'): "))
if decision=="yes":
try:
os.makedirs(database_location)
database_location_ok = True
except:
print("An error occurs during path creation!")
database_location_ok = False
else:
database_location_ok = False
else:
database_location_ok = True
#---add defined path into history---
dp = datasaver.DataPockets("database_location_path")
dp.append_as_it(pocket_name="database_location", what_to_append=database_location, erase_first=True)
dp.save()
#---adding 'database_location' into paths where python will search for files---
sys.path.insert(0,database_location)
#---open database---
trusted_admins_ok = True
dp = datasaver.DataPockets(filename="database",directory_path=database_location)
try:
trusted_admins = dp.get(pocket_name="trusted_admins")
print("Current trusted admins:")
for admin in trusted_admins: print(admin)
except:
print("There is no trusted admins: database will be cleared")
dp.append_as_it(pocket_name="trusted_admins", what_to_append="init", erase_first=True)
dp.append_as_it(pocket_name="upgraded_to_admins", what_to_append="init", erase_first=True)
dp.append_as_it(pocket_name="road_to_admin", what_to_append="init", erase_first=True)
dp.append_as_it(pocket_name="guests", what_to_append="init", erase_first=True)
dp.append_as_it(pocket_name="removed", what_to_append="init", erase_first=True)
trusted_admins_ok = False
if trusted_admins_ok:
try:
upgraded_to_admins = dp.get(pocket_name="upgraded_to_admins")
print("Current upgraded admins:")
for admin in upgraded_to_admins: print(admin)
except:
dp.append_as_it(pocket_name="upgraded_to_admins", what_to_append="empty", erase_first=True)
dp.clear_pocket(pocket_name="upgraded_to_admins")
try:
futur_admins = dp.get(pocket_name="road_to_admin")
print("Current in road_to_admin:")
for futur_admin in futur_admins: print(futur_admin)
except:
dp.append_as_it(pocket_name="road_to_admin", what_to_append="empty", erase_first=True)
dp.clear_pocket(pocket_name="road_to_admin")
try:
guests = dp.get(pocket_name="guests")
print("Current guests:")
for guest in guests: print(guest)
except:
dp.append_as_it(pocket_name="guests", what_to_append="empty", erase_first=True)
dp.clear_pocket(pocket_name="guests")
try:
rems = dp.get(pocket_name="removed")
print("Already removed:")
for rem in rems: print(rem)
except:
dp.append_as_it(pocket_name="removed", what_to_append="empty", erase_first=True)
dp.clear_pocket(pocket_name="removed")
dp.save()
#---filling trusted admins---
decision = None
while decision!="yes" and decision!="no":
decision = str(input("Would you like to insert trusted admins? ('yes'/'no'): "))
if decision=="yes":
print("INFO: end insertion by typing 'end'")
admin_to_insert = None
while admin_to_insert!="end":
admin_to_insert = str(input("Enter a trusted admin phone number: "))
if admin_to_insert!="end": dp.append_as_it(pocket_name="trusted_admins", what_to_append=admin_to_insert)
dp.save()
#---generate or change config file values---
if not os.path.exists(database_location+"/config.py"):
subprocess.run("touch "+database_location+"/config.py", shell=True, executable="/bin/bash")
#---entering SMS keywords---
decision = None
while decision!="yes" and decision!="no":
decision = str(input("Would you like to change SMS keywords? ('yes'/'no'): "))
if decision=="yes":
keywords = []
print("INFO: end insertion by typing 'end'")
keyword = None
while keyword!="end":
keyword = str(input("Enter a keyword to detected for actioning actuator: "))
keywords.append(keyword)
subprocess.run("echo \"KEYWORDS="+str(keywords)+"\" >> "+database_location+"/config.py", shell=True, executable="/bin/bash")
#---entering internet option password---
decision = None
while decision!="yes" and decision!="no":
decision = str(input("Would you like to change internet option password? ('yes'/'no'): "))
if decision=="yes":
passw = str(input("Enter the password: "))
subprocess.run("echo \"INTERNET_PWD='"+passw+"'\" >> "+database_location+"/config.py", shell=True, executable="/bin/bash")
try:
sys.path.insert(0,'/home/pi/information')
import config
except:
print("Unable to add 'information' folder into system path OR 'config.py' not found")
lock = threading.Lock()
def opener(lck):
with lck:
for _ in range(5):
@ -69,8 +192,8 @@ def receiver_from_internet():
data = flask.request.get_data(as_text=True)
data_dict = urllib.parse.parse_qs(qs=data)
print(data_dict)
if "pwd_retriever" in data_dict and data_dict["pwd_retriever"][0]=='please':
op = threading.Thread(target=opener, args=(lock,))
if "pwd_retriever" in data_dict and data_dict["pwd_retriever"][0]==config.INTERNET_PWD:
op = threading.Thread(target=opener, args=(actuator_locker,))
op.start()
return flask.render_template("success.html")
else:
@ -85,9 +208,11 @@ def receiver():
response_events = {"events":[]}
#---action=incoming---
if "action" in data_dict and data_dict["action"]=="incoming":
if "please" in data_dict["message"].lower():
op = threading.Thread(target=opener, args=(lock,))
for keyword in config.KEYWORDS:
if keyword in data_dict["message"].lower():
op = threading.Thread(target=opener, args=(actuator_locker,))
op.start()
break
response_events["events"].append({"event":"log","message":"Server received "+data_dict["message_type"]+" from "+data_dict["from"]})
return flask.jsonify(response_events)
#---action=outgoing---
@ -114,7 +239,18 @@ def receiver():
if __name__=="__main__":
remote_handler_thread = threading.Thread(target=remote_handler, args=(lock,))
if set_database==1: database_setter()
else:
dp = datasaver.DataPockets("database_location_path")
try:
database_location = dp.get("database_location")[0]
sys.path.insert(0,database_location)
except:
print("Unable to find database location folder")
exit(-1)
#---import config file---
import config
remote_handler_thread = threading.Thread(target=remote_handler, args=(actuator_locker,))
remote_handler_thread.start()
localtunnel_thread = threading.Thread(target=localtunnel_launcher)
localtunnel_thread.start()