domotik/sim800l.py
2022-12-15 12:15:38 +01:00

127 lines
4.2 KiB
Python

import os,time,sys
import serial
def convert_to_string(buf):
try:
tt = buf.decode('utf-8').strip()
return tt
except UnicodeError:
tmp = bytearray(buf)
for i in range(len(tmp)):
if tmp[i]>127:
tmp[i] = ord('#')
return bytes(tmp).decode('utf-8').strip()
class SIM800L:
def __init__(self,ser="/dev/serial0"):
try:
self.ser=serial.Serial(ser, baudrate=9600, timeout=1)
except Exception as e:
sys.exit("Error: {}".format(e))
self.incoming_action = None
self.no_carrier_action = None
self.clip_action = None
self._clip = None
self.msg_action = None
self._msgid = 0
self.savbuf = None
def setup(self):
self.command('ATE0\n') # command echo off
self.command('AT+CFUN=1\n') # set
self.command('AT+CLIP=1\n') # caller line identification
self.command('AT+CMGF=1\n') # plain text SMS
self.command('AT+CLTS=1\n') # enable get local timestamp mode
self.command('AT+CSCLK=0\n') # disable automatic sleep
self.command('AT+CNMI=2,1,0,0,0\n') # allow new SMS notification as +CMTI: "SM",<index> where <index> is location of SMS
def callback_incoming(self,action):
self.incoming_action = action
def callback_no_carrier(self,action):
self.no_carrier_action = action
def get_clip(self):
return self._clip
def callback_msg(self,action):
self.msg_action = action
def get_msgid(self):
return self._msgid
def command(self, cmdstr, lines=1, waitfor=500, msgtext=None):
while self.ser.in_waiting:
self.ser.readline()
self.ser.write(cmdstr.encode())
if msgtext:
self.ser.write(msgtext.encode())
if waitfor>1000:
time.sleep((waitfor-1000)/1000)
buf=self.ser.readline() #discard linefeed etc
#print(buf)
buf=self.ser.readline()
if not buf:
return None
result = convert_to_string(buf)
if lines>1:
self.savbuf = ''
for i in range(lines-1):
buf=self.ser.readline()
if not buf:
return result
buf = convert_to_string(buf)
if not buf == '' and not buf == 'OK':
self.savbuf += buf+'\n'
return result
def send_sms(self,destno,msgtext):
result = self.command('AT+CMGS="{}"\n'.format(destno),99,5000,msgtext+'\x1A')
if result and result=='>' and self.savbuf:
params = self.savbuf.split(':')
if params[0]=='+CUSD' or params[0] == '+CMGS':
return 'OK'
return 'ERROR'
def read_sms(self,id):
result = self.command('AT+CMGR={}\n'.format(id),99)
if result:
params=result.split(',')
if not params[0] == '':
params2 = params[0].split(':')
if params2[0]=='+CMGR':
number = params[1].replace('"',' ').strip()
date = params[3].replace('"',' ').strip()
reception_time = params[4].replace('"',' ').strip()
return [number,date,reception_time,self.savbuf]
return None
def delete_sms(self,id):
self.command('AT+CMGD={}\n'.format(id),1)
def check_incoming(self):
if self.ser.in_waiting:
buf=self.ser.readline()
# print(buf)
buf = convert_to_string(buf)
params=buf.split(',')
if params[0][0:5] == "+CMTI":
self._msgid = int(params[1])
if self.msg_action:
self.msg_action()
elif params[0] == "NO CARRIER":
self.no_carrier_action()
elif params[0] == "RING" or params[0][0:5] == "+CLIP":
#@todo handle
pass
def read_and_delete_all(self):
try:
return self.read_sms(1)
finally:
self.command('AT+CMGDA="DEL ALL"\n',1)
#-------------------Callback functions--------------------------------