1
0
Fork 0
Browse Source

extend URL list for adding new light readings.

The base URL uses ritherdon.abbether.net now, in the main part of the
python code. I have added checks to see which device is running the
code and forms a complete URL for the API call.

The way the URL is formed for the API call is not the most elegant
solution but is was quick to write and the code has a limited
life-span. It will not require any further modification when it goes
live. So, the speed it took to write it was a good trade-off in my opinion.
pull/1/head
Craig Oates 3 years ago
parent
commit
6e4e4254cf
  1. 41
      cli_meter.py

41
cli_meter.py

@ -24,26 +24,42 @@ interrupt -- or something like Htop... doesn't matter.
import json
import RPi.GPIO as GPIO
import time, math
import time
import math
import requests
from datetime import datetime
import platform
# import pdb # For testing.
# Using BCM (Broadcom) names when referencing the GPIO pins.
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(True)
a_pin = 18 # Charges the capacitor.
b_pin = 23 # Discharges the capacitor.
a_pin = 18 # Charges the capacitor.
b_pin = 23 # Discharges the capacitor.
device_id = platform.node() # For servers logs.
def get_api_url():
# Yes, I know I could do this better. Stop moaning.
if (device_id == "factory1"):
return "http://ritherdon.abbether.net/api/readings/add/1"
elif (device_id == "factory2"):
return "http://ritherdon.abbether.net/api/readings/add/2"
# Make sure this is valid.
api_url = "http://3.9.19.84/api/readings/add/1"
# api_url = "http://ritherdon.abbether.net/api/readings/add/1"
api_url = get_api_url()
def discharge():
GPIO.setup(a_pin, GPIO.IN)
GPIO.setup(b_pin, GPIO.OUT)
GPIO.output(b_pin, False)
time.sleep(0.01) # 0.01 -- initial/default value.
time.sleep(0.01) # 0.01 -- initial/default value.
def charge_time():
GPIO.setup(b_pin, GPIO.IN)
@ -55,42 +71,50 @@ def charge_time():
t2 = time.time()
return (t2 - t1) * 1000000
def analog_read():
discharge()
return charge_time()
def read_resistance():
n = 20
total = 0;
total = 0
for i in range(1, n):
total = total + analog_read()
reading = total / float(n)
resistance = reading * 6.05 - 939
return resistance
def light_from_r(R):
return math.log(1000000.0/R) * 10.0
def get_timestamp():
return datetime.now().strftime(("%Y-%m-%d %H:%M:%S"))
def push_reading(lvalue):
time = get_timestamp()
headers = {"content-type": "application/json"}
payload = { "reading": int(lvalue), "time": time, "token": "QWERTYuiopasdfghjklzxcvbnm_1234567890"}
payload = {"reading": int(lvalue), "time": time,
"token": "QWERTYuiopasdfghjklzxcvbnm_1234567890"}
# print(payload) # For testing.
res = requests.post(api_url, data=json.dumps(payload), headers=headers)
def update_reading():
light = light_from_r(read_resistance())
reading_str = "{:.0f}".format(light)
# print(reading_str) # For testing.
push_reading(light)
def main():
try:
while True:
#pdb.set_trace() # For testing.
# pdb.set_trace() # For testing.
update_reading()
except KeyboardInterrupt:
print("Keyboard Interrupt: quitting program.")
@ -100,6 +124,7 @@ def main():
print("Cleaning up GPIO before closing...")
GPIO.cleanup()
if __name__ == "__main__":
# time.sleep(60) # For testing/debugging.
main()