You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
132 lines
3.9 KiB
132 lines
3.9 KiB
3 years ago
|
#!/usr/bin/python3
|
||
|
|
||
|
"""
|
||
|
C.L.I. Meter
|
||
|
======================================================================
|
||
|
I wrote this script with the intention of using it as one part within
|
||
|
a single cron-job. The cron-job begins with "startup.sh" and that,
|
||
|
in-turn, will call this script when it is ready to do so.
|
||
|
|
||
|
This scrip takes a reading of the current light-level in the room,
|
||
|
using the Raspberry Pi (and the attached light-meter attached) it is
|
||
|
running on. From there, it sends a J.S.O.N. object to the server
|
||
|
(specified at "api_url"). The process is then repeated until it is
|
||
|
killed manually or via "shutdown.sh" (as a cron-job most likely).
|
||
|
|
||
|
You can use this script on Raspberry Pis without a G.U.I. because it
|
||
|
is all C.L.I. based. If you prefer something more graphical, you can
|
||
|
use "light_meter.py" -- which is part of this repository at time of
|
||
|
writing.
|
||
|
|
||
|
To end this script manually, use the usual "Ctrl-c" keyboard
|
||
|
interrupt -- or something like Htop... doesn't matter.
|
||
|
"""
|
||
|
|
||
|
import json
|
||
|
import RPi.GPIO as GPIO
|
||
|
import time
|
||
|
import math
|
||
|
import requests
|
||
|
from datetime import datetime
|
||
|
import platform
|
||
|
# import pdb # For testing.
|
||
|
|
||
|
# Using BCM (Broadcom) names when referencing the GPIO pins.
|
||
|
GPIO.setmode(GPIO.BCM)
|
||
|
GPIO.setwarnings(True)
|
||
|
|
||
|
a_pin = 18 # Charges the capacitor.
|
||
|
b_pin = 23 # Discharges the capacitor.
|
||
|
|
||
|
device_id = platform.node() # For servers logs.
|
||
|
session = requests.Session() # Persist connection for REST calls.
|
||
|
|
||
|
def get_api_url():
|
||
|
# Yes, I know I could do this better. Stop moaning.
|
||
|
if (device_id == "factory1"):
|
||
|
return "http://ritherdon.abbether.net/api/readings/add/1"
|
||
|
elif (device_id == "factory2"):
|
||
|
return "http://ritherdon.abbether.net/api/readings/add/2"
|
||
|
|
||
|
api_url = get_api_url()
|
||
|
|
||
|
def discharge():
|
||
|
GPIO.setup(a_pin, GPIO.IN)
|
||
|
GPIO.setup(b_pin, GPIO.OUT)
|
||
|
GPIO.output(b_pin, False)
|
||
|
time.sleep(0.01) # 0.01 -- initial/default value.
|
||
|
|
||
|
def charge_time():
|
||
|
GPIO.setup(b_pin, GPIO.IN)
|
||
|
GPIO.setup(a_pin, GPIO.OUT)
|
||
|
GPIO.output(a_pin, True)
|
||
|
t1 = time.time()
|
||
|
while not GPIO.input(b_pin):
|
||
|
pass
|
||
|
t2 = time.time()
|
||
|
return (t2 - t1) * 1000000
|
||
|
|
||
|
def analog_read():
|
||
|
discharge()
|
||
|
return charge_time()
|
||
|
|
||
|
def read_resistance():
|
||
|
n = 20
|
||
|
total = 0
|
||
|
for i in range(1, n):
|
||
|
total = total + analog_read()
|
||
|
reading = total / float(n)
|
||
|
resistance = reading * 6.05 - 939
|
||
|
return resistance
|
||
|
|
||
|
def light_from_r(R):
|
||
|
return math.log(1000000.0/R) * 10.0
|
||
|
|
||
|
def get_timestamp():
|
||
|
return datetime.now().strftime(("%Y-%m-%d %H:%M:%S"))
|
||
|
|
||
|
def push_reading(lvalue):
|
||
|
time = get_timestamp()
|
||
|
headers = {"content-type": "application/json"}
|
||
|
payload = {"reading": int(lvalue), "time": time,
|
||
|
"token": "QWERTYuiopasdfghjklzxcvbnm_1234567890"}
|
||
|
print(payload) # For testing.
|
||
|
# res = requests.post(api_url, data=json.dumps(payload), headers=headers)
|
||
|
res = session.post(api_url, data=json.dumps(payload), headers=headers)
|
||
|
|
||
|
def update_reading():
|
||
|
light = light_from_r(read_resistance())
|
||
|
reading_str = "{:.0f}".format(light)
|
||
|
# print(reading_str) # For testing.
|
||
|
push_reading(light)
|
||
|
|
||
|
|
||
|
def main():
|
||
|
try:
|
||
|
while True:
|
||
|
# pdb.set_trace() # For testing.
|
||
|
update_reading()
|
||
|
except KeyboardInterrupt:
|
||
|
print("[INFO] KEYBOARD INTERRUPT: quitting program.")
|
||
|
except requests.exceptions.ConnectionError:
|
||
|
pause = 60
|
||
|
time.sleep(60)
|
||
|
print(f"[WARNING] MAX. REQUESTS EXCEEDED: Pausing requests for {pause} seconds...")
|
||
|
pass
|
||
|
except requests.exceptions.Timeout:
|
||
|
t_stamp = datetime.datetime.now()
|
||
|
print(f"[WARNING] TIMEOUT EXCEPTION: Request timed-out at {t_stamp}.")
|
||
|
time.sleep(60)
|
||
|
pass
|
||
|
except Exception as e:
|
||
|
print(f"[ERROR] GENERAL EXCEPTION: {e}")
|
||
|
finally:
|
||
|
print("[INFO] Terminating relay.py...")
|
||
|
print("[INFO] Cleaning up GPIO before closing...")
|
||
|
GPIO.cleanup()
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
# time.sleep(60) # For testing/debugging.
|
||
|
main()
|