You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
131 lines
3.9 KiB
131 lines
3.9 KiB
#!/usr/bin/python3 |
|
|
|
""" |
|
C.L.I. Meter |
|
====================================================================== |
|
I wrote this script with the intention of using it as one part within |
|
a single cron-job. The cron-job begins with "startup.sh" and that, |
|
in-turn, will call this script when it is ready to do so. |
|
|
|
This scrip takes a reading of the current light-level in the room, |
|
using the Raspberry Pi (and the attached light-meter attached) it is |
|
running on. From there, it sends a J.S.O.N. object to the server |
|
(specified at "api_url"). The process is then repeated until it is |
|
killed manually or via "shutdown.sh" (as a cron-job most likely). |
|
|
|
You can use this script on Raspberry Pis without a G.U.I. because it |
|
is all C.L.I. based. If you prefer something more graphical, you can |
|
use "light_meter.py" -- which is part of this repository at time of |
|
writing. |
|
|
|
To end this script manually, use the usual "Ctrl-c" keyboard |
|
interrupt -- or something like Htop... doesn't matter. |
|
""" |
|
|
|
import json |
|
import RPi.GPIO as GPIO |
|
import time |
|
import math |
|
import requests |
|
from datetime import datetime |
|
import platform |
|
# import pdb # For testing. |
|
|
|
# Using BCM (Broadcom) names when referencing the GPIO pins. |
|
GPIO.setmode(GPIO.BCM) |
|
GPIO.setwarnings(True) |
|
|
|
a_pin = 18 # Charges the capacitor. |
|
b_pin = 23 # Discharges the capacitor. |
|
|
|
device_id = platform.node() # For servers logs. |
|
session = requests.Session() # Persist connection for REST calls. |
|
|
|
def get_api_url(): |
|
# Yes, I know I could do this better. Stop moaning. |
|
if (device_id == "factory1"): |
|
return "http://ritherdon.abbether.net/api/readings/add/1" |
|
elif (device_id == "factory2"): |
|
return "http://ritherdon.abbether.net/api/readings/add/2" |
|
|
|
api_url = get_api_url() |
|
|
|
def discharge(): |
|
GPIO.setup(a_pin, GPIO.IN) |
|
GPIO.setup(b_pin, GPIO.OUT) |
|
GPIO.output(b_pin, False) |
|
time.sleep(0.01) # 0.01 -- initial/default value. |
|
|
|
def charge_time(): |
|
GPIO.setup(b_pin, GPIO.IN) |
|
GPIO.setup(a_pin, GPIO.OUT) |
|
GPIO.output(a_pin, True) |
|
t1 = time.time() |
|
while not GPIO.input(b_pin): |
|
pass |
|
t2 = time.time() |
|
return (t2 - t1) * 1000000 |
|
|
|
def analog_read(): |
|
discharge() |
|
return charge_time() |
|
|
|
def read_resistance(): |
|
n = 20 |
|
total = 0 |
|
for i in range(1, n): |
|
total = total + analog_read() |
|
reading = total / float(n) |
|
resistance = reading * 6.05 - 939 |
|
return resistance |
|
|
|
def light_from_r(R): |
|
return math.log(1000000.0/R) * 10.0 |
|
|
|
def get_timestamp(): |
|
return datetime.now().strftime(("%Y-%m-%d %H:%M:%S")) |
|
|
|
def push_reading(lvalue): |
|
time = get_timestamp() |
|
headers = {"content-type": "application/json"} |
|
payload = {"reading": int(lvalue), "time": time, |
|
"token": "QWERTYuiopasdfghjklzxcvbnm_1234567890"} |
|
print(payload) # For testing. |
|
# res = requests.post(api_url, data=json.dumps(payload), headers=headers) |
|
res = session.post(api_url, data=json.dumps(payload), headers=headers) |
|
|
|
def update_reading(): |
|
light = light_from_r(read_resistance()) |
|
reading_str = "{:.0f}".format(light) |
|
# print(reading_str) # For testing. |
|
push_reading(light) |
|
|
|
|
|
def main(): |
|
try: |
|
while True: |
|
# pdb.set_trace() # For testing. |
|
update_reading() |
|
except KeyboardInterrupt: |
|
print("[INFO] KEYBOARD INTERRUPT: quitting program.") |
|
except requests.exceptions.ConnectionError: |
|
pause = 60 |
|
time.sleep(60) |
|
print(f"[WARNING] MAX. REQUESTS EXCEEDED: Pausing requests for {pause} seconds...") |
|
pass |
|
except requests.exceptions.Timeout: |
|
t_stamp = datetime.datetime.now() |
|
print(f"[WARNING] TIMEOUT EXCEPTION: Request timed-out at {t_stamp}.") |
|
time.sleep(60) |
|
pass |
|
except Exception as e: |
|
print(f"[ERROR] GENERAL EXCEPTION: {e}") |
|
finally: |
|
print("[INFO] Terminating relay.py...") |
|
print("[INFO] Cleaning up GPIO before closing...") |
|
GPIO.cleanup() |
|
|
|
|
|
if __name__ == "__main__": |
|
# time.sleep(60) # For testing/debugging. |
|
main()
|
|
|