1
0
Fork 0
Browse Source

begin moving code to 'services'.

I've began getting into a mess with trying to use duplicated code and
data. I've began to move functions to their own services folder and
files to help reduce the duplication.
stable
Craig Oates 3 years ago
parent
commit
25277572c0
  1. 78
      src/flicker.py

78
src/flicker.py

@ -1,10 +1,33 @@
import csv
import datetime
from services import io_services
def load_data():
data = dict()
with open("data/test-data-lite.csv") as csv_file:
csv_reader = csv.reader(csv_file, delimiter=",")
for r in csv_reader:
if (r[1] != "Time Stamp"):
data[r[1]] = r[2] # Ignores the 'Id' column.
return data
def tally_readings_per_second2(data):
time_tallies = dict()
for k, v in data.items():
# print(f"{k} : {v}")
# time_str = k
if (k in time_tallies):
time_tallies[k] = (time_tallies[k]) + 1
print(f"Updated {time_tallies[k]}")
else:
time_tallies[k] = 1
# print(f"Added: {time_tallies[k]}")
# print(time_tallies)
return time_tallies
def tally_readings_per_second():
time_tallies = dict()
tally_totals = dict()
with open("data/test-data.csv") as csv_file:
with open("data/test-data-lite.csv") as csv_file:
csv_reader = csv.reader(csv_file, delimiter=",")
for r in csv_reader:
if (r[1] != "Time Stamp"):
@ -13,24 +36,53 @@ def tally_readings_per_second():
time_tallies[time_str] = (time_tallies[time_str]) + 1
else:
time_tallies[time_str] = 1
for r2 in time_tallies.values():
if (r2 in tally_totals):
tally_totals[r2] = (tally_totals[r2]) + 1
else:
tally_totals[r2] = 1
# print(tally_totals.items())
return time_tallies
def total_count_for_each_reading_per_second(time_tallies):
tally_totals = dict()
# print(time_tallies)
for val in time_tallies.values():
if (val in tally_totals):
tally_totals[val] = (tally_totals[val]) + 1
else:
tally_totals[val] = 1
# print(tally_totals.items())
return tally_totals
def save_tally_totals(tally_totals):
def save_rps_totals(totals):
with open("data/results/readings-per-sec.csv", mode="w") as result:
wtr = csv.writer(result)
for k, v in tally_totals.items():
print(f"{k}: {v}")
for k, v in totals.items():
# print(f"{k}: {v}")
wtr.writerow([k,v])
def get_rps_above(threshold, readings):
times = list()
# print(readings.values())
for k, v in readings.items():
# print([k, v])
if (v >= threshold):
times.append(k)
return times
def tally_flickers(selected_readings, all_readings):
selected = dict()
for time in selected_readings:
# print(f"Looking for {time}...")
selected[time] = all_readings[time]
# for k, v in all_readings.items():
return selected
def main():
time_tallies = tally_readings_per_second()
save_tally_totals(time_tallies)
# raw_data = load_data()
# time_tallies = tally_readings_per_second2(raw_data)
# # time_tallies = tally_readings_per_second()
# rps_totals = total_count_for_each_reading_per_second(time_tallies)
# save_rps_totals(rps_totals)
# rsp_above_two = get_rps_above(2, time_tallies)
# flicker_tallies = tally_flickers(rsp_above_two, time_tallies)
raw_data = io_services.load_raw_data("data/test-data-lite.csv")
print(raw_data)
if __name__ == "__main__":
main()