1
0
Fork 0
Browse Source

extend URL list for adding new light readings.

The base URL uses ritherdon.abbether.net now, in the main part of the
python code. I have added checks to see which device is running the
code and forms a complete URL for the API call.

The way the URL is formed for the API call is not the most elegant
solution but is was quick to write and the code has a limited
life-span. It will not require any further modification when it goes
live. So, the speed it took to write it was a good trade-off in my opinion.
pull/1/head
Craig Oates 3 years ago
parent
commit
6e4e4254cf
  1. 41
      cli_meter.py

41
cli_meter.py

@ -24,26 +24,42 @@ interrupt -- or something like Htop... doesn't matter.
import json import json
import RPi.GPIO as GPIO import RPi.GPIO as GPIO
import time, math import time
import math
import requests import requests
from datetime import datetime from datetime import datetime
import platform
# import pdb # For testing. # import pdb # For testing.
# Using BCM (Broadcom) names when referencing the GPIO pins. # Using BCM (Broadcom) names when referencing the GPIO pins.
GPIO.setmode(GPIO.BCM) GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(True) GPIO.setwarnings(True)
a_pin = 18 # Charges the capacitor. a_pin = 18 # Charges the capacitor.
b_pin = 23 # Discharges the capacitor. b_pin = 23 # Discharges the capacitor.
device_id = platform.node() # For servers logs.
def get_api_url():
# Yes, I know I could do this better. Stop moaning.
if (device_id == "factory1"):
return "http://ritherdon.abbether.net/api/readings/add/1"
elif (device_id == "factory2"):
return "http://ritherdon.abbether.net/api/readings/add/2"
# Make sure this is valid. # Make sure this is valid.
api_url = "http://3.9.19.84/api/readings/add/1" # api_url = "http://ritherdon.abbether.net/api/readings/add/1"
api_url = get_api_url()
def discharge(): def discharge():
GPIO.setup(a_pin, GPIO.IN) GPIO.setup(a_pin, GPIO.IN)
GPIO.setup(b_pin, GPIO.OUT) GPIO.setup(b_pin, GPIO.OUT)
GPIO.output(b_pin, False) GPIO.output(b_pin, False)
time.sleep(0.01) # 0.01 -- initial/default value. time.sleep(0.01) # 0.01 -- initial/default value.
def charge_time(): def charge_time():
GPIO.setup(b_pin, GPIO.IN) GPIO.setup(b_pin, GPIO.IN)
@ -55,42 +71,50 @@ def charge_time():
t2 = time.time() t2 = time.time()
return (t2 - t1) * 1000000 return (t2 - t1) * 1000000
def analog_read(): def analog_read():
discharge() discharge()
return charge_time() return charge_time()
def read_resistance(): def read_resistance():
n = 20 n = 20
total = 0; total = 0
for i in range(1, n): for i in range(1, n):
total = total + analog_read() total = total + analog_read()
reading = total / float(n) reading = total / float(n)
resistance = reading * 6.05 - 939 resistance = reading * 6.05 - 939
return resistance return resistance
def light_from_r(R): def light_from_r(R):
return math.log(1000000.0/R) * 10.0 return math.log(1000000.0/R) * 10.0
def get_timestamp(): def get_timestamp():
return datetime.now().strftime(("%Y-%m-%d %H:%M:%S")) return datetime.now().strftime(("%Y-%m-%d %H:%M:%S"))
def push_reading(lvalue): def push_reading(lvalue):
time = get_timestamp() time = get_timestamp()
headers = {"content-type": "application/json"} headers = {"content-type": "application/json"}
payload = { "reading": int(lvalue), "time": time, "token": "QWERTYuiopasdfghjklzxcvbnm_1234567890"} payload = {"reading": int(lvalue), "time": time,
"token": "QWERTYuiopasdfghjklzxcvbnm_1234567890"}
# print(payload) # For testing. # print(payload) # For testing.
res = requests.post(api_url, data=json.dumps(payload), headers=headers) res = requests.post(api_url, data=json.dumps(payload), headers=headers)
def update_reading(): def update_reading():
light = light_from_r(read_resistance()) light = light_from_r(read_resistance())
reading_str = "{:.0f}".format(light) reading_str = "{:.0f}".format(light)
# print(reading_str) # For testing. # print(reading_str) # For testing.
push_reading(light) push_reading(light)
def main(): def main():
try: try:
while True: while True:
#pdb.set_trace() # For testing. # pdb.set_trace() # For testing.
update_reading() update_reading()
except KeyboardInterrupt: except KeyboardInterrupt:
print("Keyboard Interrupt: quitting program.") print("Keyboard Interrupt: quitting program.")
@ -100,6 +124,7 @@ def main():
print("Cleaning up GPIO before closing...") print("Cleaning up GPIO before closing...")
GPIO.cleanup() GPIO.cleanup()
if __name__ == "__main__": if __name__ == "__main__":
# time.sleep(60) # For testing/debugging. # time.sleep(60) # For testing/debugging.
main() main()