285 lines
No EOL
12 KiB
Python
285 lines
No EOL
12 KiB
Python
import os
|
|
import sys
|
|
import flask
|
|
import evdev
|
|
import requests
|
|
import time
|
|
import datasaver
|
|
import argparse
|
|
import threading
|
|
import subprocess
|
|
import urllib.parse
|
|
|
|
parser = argparse.ArgumentParser(description="To set up database location and intial values by system administrator")
|
|
parser.add_argument("--setDatabase", type=int, default=0, help="Wether system administrator want to set up database (1) or not (0)")
|
|
parser.add_argument("--launch", type=int, default=1, help="Wether to launch the application (1) or not (0)")
|
|
set_database = parser.parse_args().setDatabase
|
|
launch_app = parser.parse_args().launch
|
|
|
|
|
|
app = flask.Flask(import_name=__name__, template_folder=".")
|
|
actuator_locker = threading.Lock()
|
|
database_locker = threading.Lock()
|
|
|
|
def database_setter():
|
|
database_location = ""
|
|
database_location_ok = False
|
|
#---trying to retrieve a previously used path---
|
|
if os.path.exists("database_location_path.pickle"):
|
|
dp = datasaver.DataPockets("database_location_path")
|
|
try:
|
|
previously_set_path = dp.get("database_location")[0]
|
|
decision = None
|
|
while decision!="yes" and decision!="no":
|
|
print("A previsouly used path was detected:",previously_set_path)
|
|
decision = str(input("Use it? ('yes'/'no'): "))
|
|
except:
|
|
pass
|
|
if decision=="yes":
|
|
database_location = previously_set_path
|
|
database_location_ok = True
|
|
#---setting or creating a path---
|
|
while database_location_ok != True:
|
|
database_location = str(input("Enter absolute path of folder containing 'database.pickle' to use: "))
|
|
if not os.path.exists(database_location):
|
|
decision = None
|
|
while decision!="yes" and decision!="no":
|
|
print("path '",database_location,"' does not exist!")
|
|
decision = str(input("Create it? ('yes'/'no'): "))
|
|
if decision=="yes":
|
|
try:
|
|
os.makedirs(database_location)
|
|
database_location_ok = True
|
|
except:
|
|
print("An error occurs during path creation!")
|
|
database_location_ok = False
|
|
else:
|
|
database_location_ok = False
|
|
else:
|
|
database_location_ok = True
|
|
#---add defined path into history---
|
|
dp = datasaver.DataPockets("database_location_path")
|
|
dp.append_as_it(pocket_name="database_location", what_to_append=database_location, erase_first=True)
|
|
dp.save()
|
|
#---adding 'database_location' into paths where python will search for files---
|
|
sys.path.insert(0,database_location)
|
|
#---open database---
|
|
trusted_admins_ok = True
|
|
dp = datasaver.DataPockets(filename="database",directory_path=database_location)
|
|
try:
|
|
trusted_admins = dp.get(pocket_name="trusted_admins")
|
|
print("Current trusted admins:")
|
|
for admin in trusted_admins: print(admin)
|
|
except:
|
|
print("There is no trusted admins: database will be cleared")
|
|
dp.append_as_it(pocket_name="trusted_admins", what_to_append="init", erase_first=True)
|
|
dp.append_as_it(pocket_name="upgraded_to_admins", what_to_append="init", erase_first=True)
|
|
dp.append_as_it(pocket_name="road_to_admin", what_to_append="init", erase_first=True)
|
|
dp.append_as_it(pocket_name="guests", what_to_append="init", erase_first=True)
|
|
dp.append_as_it(pocket_name="removed", what_to_append="init", erase_first=True)
|
|
trusted_admins_ok = False
|
|
if trusted_admins_ok:
|
|
try:
|
|
upgraded_to_admins = dp.get(pocket_name="upgraded_to_admins")
|
|
print("Current upgraded admins:")
|
|
for admin in upgraded_to_admins: print(admin)
|
|
except:
|
|
dp.append_as_it(pocket_name="upgraded_to_admins", what_to_append="empty", erase_first=True)
|
|
dp.clear_pocket(pocket_name="upgraded_to_admins")
|
|
try:
|
|
futur_admins = dp.get(pocket_name="road_to_admin")
|
|
print("Current in road_to_admin:")
|
|
for futur_admin in futur_admins: print(futur_admin)
|
|
except:
|
|
dp.append_as_it(pocket_name="road_to_admin", what_to_append="empty", erase_first=True)
|
|
dp.clear_pocket(pocket_name="road_to_admin")
|
|
try:
|
|
guests = dp.get(pocket_name="guests")
|
|
print("Current guests:")
|
|
for guest in guests: print(guest)
|
|
except:
|
|
dp.append_as_it(pocket_name="guests", what_to_append="empty", erase_first=True)
|
|
dp.clear_pocket(pocket_name="guests")
|
|
try:
|
|
rems = dp.get(pocket_name="removed")
|
|
print("Already removed:")
|
|
for rem in rems: print(rem)
|
|
except:
|
|
dp.append_as_it(pocket_name="removed", what_to_append="empty", erase_first=True)
|
|
dp.clear_pocket(pocket_name="removed")
|
|
dp.save()
|
|
#---filling trusted admins---
|
|
decision = None
|
|
while decision!="yes" and decision!="no":
|
|
decision = str(input("Would you like to insert trusted admins? ('yes'/'no'): "))
|
|
if decision=="yes":
|
|
print("INFO: end insertion by typing 'end'")
|
|
admin_to_insert = None
|
|
while admin_to_insert!="end":
|
|
admin_to_insert = str(input("Enter a trusted admin phone number: "))
|
|
if admin_to_insert!="end": dp.append_as_it(pocket_name="trusted_admins", what_to_append=admin_to_insert)
|
|
dp.save()
|
|
#---generate or change config file values---
|
|
if not os.path.exists(database_location+"/config.py"):
|
|
subprocess.run("touch "+database_location+"/config.py", shell=True, executable="/bin/bash")
|
|
#---entering SMS keywords---
|
|
decision = None
|
|
while decision!="yes" and decision!="no":
|
|
decision = str(input("Would you like to change SMS keywords? ('yes'/'no'): "))
|
|
if decision=="yes":
|
|
keywords = []
|
|
print("INFO: end insertion by typing 'end'")
|
|
keyword = None
|
|
while keyword!="end":
|
|
keyword = str(input("Enter a keyword to detected for actioning actuator: "))
|
|
keywords.append(keyword)
|
|
subprocess.run("echo \"KEYWORDS="+str(keywords)+"\" >> "+database_location+"/config.py", shell=True, executable="/bin/bash")
|
|
#---entering internet option password---
|
|
decision = None
|
|
while decision!="yes" and decision!="no":
|
|
decision = str(input("Would you like to change internet option password? ('yes'/'no'): "))
|
|
if decision=="yes":
|
|
passw = str(input("Enter the password: "))
|
|
subprocess.run("echo \"INTERNET_PWD='"+passw+"'\" >> "+database_location+"/config.py", shell=True, executable="/bin/bash")
|
|
|
|
def users_manager(lck, instruction="ADD", phone_number="000000000", duration=60):
|
|
dp = datasaver.DataPockets(filename="database_location_path")
|
|
database_location = dp.get("database_location")[0]
|
|
dp = datasaver.DataPockets(filename="database", directory_path=database_location)
|
|
if instruction=="ADD" and (phone_number_to_long_format(phone_number) in dp.get("admins") or phone_number_to_long_format(phone_number) in dp.get("upgraded_to_admins")):
|
|
dp.append_as_it(pocket_name="guests", what_to_append=phone_number)
|
|
elif instruction=="REMOVE":
|
|
pass
|
|
elif instruction=="UPGRADE":
|
|
pass
|
|
|
|
def phone_number_to_long_format(number="0000000000"):
|
|
if len(number)==12 and number[0]=="+":
|
|
return number
|
|
elif len(number)==10:
|
|
return "+33"+number[1:]
|
|
|
|
|
|
def opener(lck):
|
|
with lck:
|
|
for _ in range(5):
|
|
requests.get("http://192.168.0.103/open")
|
|
|
|
def localtunnel_launcher(port=8000, wanted_subdomain="open-domodoor"):
|
|
while True:
|
|
try:
|
|
result = subprocess.run(args="lt --port {} --subdomain {}".format(port,wanted_subdomain), shell=True, executable="/bin/bash")
|
|
print("Process returned:", result)
|
|
except KeyboardInterrupt:
|
|
break
|
|
|
|
def remote_handler(lck):
|
|
n_attempts = 0
|
|
remote = None
|
|
while remote is None:
|
|
#---search for remote---
|
|
input_devices = os.listdir("/dev/input")
|
|
for device in input_devices:
|
|
try:
|
|
dev = evdev.InputDevice("/dev/input/"+device)
|
|
if "Wireless Present" in dev.name and "Keyboard" in dev.name and "usb" in dev.phys:
|
|
remote = dev
|
|
print("Remote found")
|
|
break
|
|
except:
|
|
pass
|
|
#---start retrieving events if remote detected---
|
|
if remote is not None:
|
|
try:
|
|
for event in remote.read_loop():
|
|
if event.type==evdev.ecodes.EV_KEY and event.code==evdev.ecodes.KEY_B and event.value==1:
|
|
print("Command received from remote")
|
|
opener(lck)
|
|
except:
|
|
remote = None
|
|
n_attempts += 1
|
|
print("Attempt "+str(n_attempts)+" fails in finding remote")
|
|
time.sleep(3)
|
|
|
|
#------ENDPOINTS------
|
|
|
|
@app.route(rule="/")
|
|
def index():
|
|
return flask.render_template("index.html")
|
|
|
|
@app.route(rule="/state")
|
|
def state_checker():
|
|
return "200 OK (working)\n"
|
|
|
|
@app.route(rule="/from_internet", methods=["POST"])
|
|
def receiver_from_internet():
|
|
data = flask.request.get_data(as_text=True)
|
|
data_dict = urllib.parse.parse_qs(qs=data)
|
|
print(data_dict)
|
|
if "pwd_retriever" in data_dict and data_dict["pwd_retriever"][0]==config.INTERNET_PWD:
|
|
op = threading.Thread(target=opener, args=(actuator_locker,))
|
|
op.start()
|
|
return flask.render_template("success.html")
|
|
else:
|
|
return flask.render_template("fail.html")
|
|
|
|
@app.route(rule="/receiving", methods=["POST"])
|
|
def receiver():
|
|
data = flask.request.get_data(as_text=True)
|
|
data_dict = urllib.parse.parse_qs(qs=data)
|
|
for p in data_dict: data_dict[p]=data_dict[p][0]
|
|
print(data_dict)
|
|
response_events = {"events":[]}
|
|
#---action=incoming---
|
|
if "action" in data_dict and data_dict["action"]=="incoming":
|
|
for keyword in config.KEYWORDS:
|
|
if keyword in data_dict["message"].lower():
|
|
op = threading.Thread(target=opener, args=(actuator_locker,))
|
|
op.start()
|
|
break
|
|
response_events["events"].append({"event":"log","message":"Server received "+data_dict["message_type"]+" from "+data_dict["from"]})
|
|
return flask.jsonify(response_events)
|
|
#---action=outgoing---
|
|
if "action" in data_dict and data_dict["action"]=="outgoing":
|
|
response_events["events"].append({"event":"log","message":"Server received outgoing action"})
|
|
return flask.jsonify(response_events)
|
|
#---action=send_status---
|
|
if "action" in data_dict and data_dict["action"]=="send_status":
|
|
response_events["events"].append({"event":"log","message":"Server received send_status action"})
|
|
return flask.jsonify(response_events)
|
|
#---action=forward_sent---
|
|
if "action" in data_dict and data_dict["action"]=="forward_sent":
|
|
response_events["events"].append({"event":"log","message":"Server received forward_sent action"})
|
|
return flask.jsonify(response_events)
|
|
#---action=amqp_started---
|
|
if "action" in data_dict and data_dict["action"]=="amqp_started":
|
|
response_events["events"].append({"event":"log","message":"Server received amqp_started action"})
|
|
return flask.jsonify(response_events)
|
|
#---action=device_status---
|
|
if "action" in data_dict and data_dict["action"]=="device_status":
|
|
response_events["events"].append({"event":"log","message":"Server received notification of "+data_dict["status"]})
|
|
return flask.jsonify(response_events)
|
|
return flask.abort(400)
|
|
|
|
|
|
if __name__=="__main__":
|
|
if set_database==1: database_setter()
|
|
else:
|
|
dp = datasaver.DataPockets(filename="database_location_path")
|
|
try:
|
|
database_location = dp.get("database_location")[0]
|
|
sys.path.insert(0,database_location)
|
|
except:
|
|
print("Unable to find database location folder")
|
|
exit(-1)
|
|
#---import config file---
|
|
try:
|
|
import config
|
|
except:
|
|
print("Unable to find config file")
|
|
if launch_app==1:
|
|
remote_handler_thread = threading.Thread(target=remote_handler, args=(actuator_locker,))
|
|
remote_handler_thread.start()
|
|
localtunnel_thread = threading.Thread(target=localtunnel_launcher)
|
|
localtunnel_thread.start()
|
|
app.run(host="0.0.0.0", port=8000) |