1
0
Fork 0
A Python based project which measures the light levels of an environment and forwards those measurements on to a server. This project is one of several which forms the software-side of the 'Return to Ritherdon' project.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
This repo is archived. You can view files and clone it, but cannot push or open issues/pull-requests.
 
 

131 lines
3.9 KiB

#!/usr/bin/python3
"""
C.L.I. Meter
======================================================================
I wrote this script with the intention of using it as one part within
a single cron-job. The cron-job begins with "startup.sh" and that,
in-turn, will call this script when it is ready to do so.
This scrip takes a reading of the current light-level in the room,
using the Raspberry Pi (and the attached light-meter attached) it is
running on. From there, it sends a J.S.O.N. object to the server
(specified at "api_url"). The process is then repeated until it is
killed manually or via "shutdown.sh" (as a cron-job most likely).
You can use this script on Raspberry Pis without a G.U.I. because it
is all C.L.I. based. If you prefer something more graphical, you can
use "light_meter.py" -- which is part of this repository at time of
writing.
To end this script manually, use the usual "Ctrl-c" keyboard
interrupt -- or something like Htop... doesn't matter.
"""
import json
import RPi.GPIO as GPIO
import time
import math
import requests
from datetime import datetime
import platform
# import pdb # For testing.
# Using BCM (Broadcom) names when referencing the GPIO pins.
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(True)
a_pin = 18 # Charges the capacitor.
b_pin = 23 # Discharges the capacitor.
device_id = platform.node() # For servers logs.
session = requests.Session() # Persist connection for REST calls.
def get_api_url():
# Yes, I know I could do this better. Stop moaning.
if (device_id == "factory1"):
return "http://ritherdon.abbether.net/api/readings/add/1"
elif (device_id == "factory2"):
return "http://ritherdon.abbether.net/api/readings/add/2"
api_url = get_api_url()
def discharge():
GPIO.setup(a_pin, GPIO.IN)
GPIO.setup(b_pin, GPIO.OUT)
GPIO.output(b_pin, False)
time.sleep(0.01) # 0.01 -- initial/default value.
def charge_time():
GPIO.setup(b_pin, GPIO.IN)
GPIO.setup(a_pin, GPIO.OUT)
GPIO.output(a_pin, True)
t1 = time.time()
while not GPIO.input(b_pin):
pass
t2 = time.time()
return (t2 - t1) * 1000000
def analog_read():
discharge()
return charge_time()
def read_resistance():
n = 20
total = 0
for i in range(1, n):
total = total + analog_read()
reading = total / float(n)
resistance = reading * 6.05 - 939
return resistance
def light_from_r(R):
return math.log(1000000.0/R) * 10.0
def get_timestamp():
return datetime.now().strftime(("%Y-%m-%d %H:%M:%S"))
def push_reading(lvalue):
time = get_timestamp()
headers = {"content-type": "application/json"}
payload = {"reading": int(lvalue), "time": time,
"token": "QWERTYuiopasdfghjklzxcvbnm_1234567890"}
print(payload) # For testing.
# res = requests.post(api_url, data=json.dumps(payload), headers=headers)
res = session.post(api_url, data=json.dumps(payload), headers=headers)
def update_reading():
light = light_from_r(read_resistance())
reading_str = "{:.0f}".format(light)
# print(reading_str) # For testing.
push_reading(light)
def main():
try:
while True:
# pdb.set_trace() # For testing.
update_reading()
except KeyboardInterrupt:
print("[INFO] KEYBOARD INTERRUPT: quitting program.")
except requests.exceptions.ConnectionError:
pause = 60
time.sleep(60)
print(f"[WARNING] MAX. REQUESTS EXCEEDED: Pausing requests for {pause} seconds...")
pass
except requests.exceptions.Timeout:
t_stamp = datetime.datetime.now()
print(f"[WARNING] TIMEOUT EXCEPTION: Request timed-out at {t_stamp}.")
time.sleep(60)
pass
except Exception as e:
print(f"[ERROR] GENERAL EXCEPTION: {e}")
finally:
print("[INFO] Terminating relay.py...")
print("[INFO] Cleaning up GPIO before closing...")
GPIO.cleanup()
if __name__ == "__main__":
# time.sleep(60) # For testing/debugging.
main()