Browse Source
Part of a move to clean-up flicker.py -- making it a place to call the functions in a way which is easy to 'switch' functions off via comments.stable
Craig Oates
3 years ago
2 changed files with 51 additions and 43 deletions
@ -1,56 +1,24 @@
|
||||
import csv |
||||
import datetime |
||||
from services import io_services, log_services |
||||
from services import io_services, log_services, data_services |
||||
# import pdb |
||||
|
||||
def tally_readings_per_second(data): |
||||
time_tallies = dict() |
||||
for item in data: |
||||
key = item[0] |
||||
if (key in time_tallies): |
||||
time_tallies[key] = time_tallies[key] + 1 |
||||
else: |
||||
time_tallies[key] = 1 |
||||
return time_tallies |
||||
|
||||
def total_count_for_each_reading_per_second(time_tallies): |
||||
tally_totals = dict() |
||||
for val in time_tallies.values(): |
||||
if (val in tally_totals): |
||||
tally_totals[val] = (tally_totals[val]) + 1 |
||||
else: |
||||
tally_totals[val] = 1 |
||||
return tally_totals |
||||
|
||||
def get_rps_above(threshold, readings): |
||||
times = list() |
||||
# print(readings.values()) |
||||
for k, v in readings.items(): |
||||
# print([k, v]) |
||||
if (v >= threshold): |
||||
times.append(k) |
||||
return times |
||||
|
||||
def tally_flickers(selected_readings, all_readings): |
||||
selected = dict() |
||||
for time in selected_readings: |
||||
# print(f"Looking for {time}...") |
||||
selected[time] = all_readings[time] |
||||
# for k, v in all_readings.items(): |
||||
return selected |
||||
|
||||
def main(): |
||||
raw_data_path = "data/test-data-lite.csv" |
||||
rps_save_path ="data/results/readings-per-sec.csv" |
||||
rps_save_path = "data/results/readings-per-sec.csv" |
||||
rps_above_thresh = "data/results/readings_above_threshold.csv" |
||||
raw_data = io_services.load_raw_data(raw_data_path) |
||||
# log_services.print_list(raw_data) |
||||
time_tallies = tally_readings_per_second(raw_data) |
||||
time_tallies = data_services.tally_readings_per_second(raw_data) |
||||
# log_services.print_dictionary(time_tallies) |
||||
rps_totals = total_count_for_each_reading_per_second(time_tallies) |
||||
rps_totals = data_services.total_count_for_each_reading_per_second(time_tallies) |
||||
# log_services.print_dictionary(rps_totals) |
||||
io_services.save_rps_totals(rps_totals, rps_save_path) |
||||
# rsp_above_two = get_rps_above(2, time_tallies) |
||||
# flicker_tallies = tally_flickers(rsp_above_two, time_tallies) |
||||
# io_services.save_rps_totals(rps_totals, rps_save_path) |
||||
rps_above_two = data_services.get_rps_above(2, time_tallies) |
||||
# log_services.print_list(rps_above_two) |
||||
io_services.save_rps_above_threshold(rps_above_two, rps_above_thresh) |
||||
# flicker_tallies = data_services.tally_flickers(rsp_above_two, raw_data) |
||||
# log_services.print_dictionary(flicker_tallies) |
||||
|
||||
if __name__ == "__main__": |
||||
main() |
||||
|
@ -0,0 +1,40 @@
|
||||
from services import log_services |
||||
|
||||
def tally_readings_per_second(data): |
||||
time_tallies = dict() |
||||
for item in data: |
||||
key = item[0] |
||||
if (key in time_tallies): |
||||
time_tallies[key] = time_tallies[key] + 1 |
||||
else: |
||||
time_tallies[key] = 1 |
||||
return time_tallies |
||||
|
||||
def total_count_for_each_reading_per_second(time_tallies): |
||||
tally_totals = dict() |
||||
for val in time_tallies.values(): |
||||
if (val in tally_totals): |
||||
tally_totals[val] = (tally_totals[val]) + 1 |
||||
else: |
||||
tally_totals[val] = 1 |
||||
return tally_totals |
||||
|
||||
def get_rps_above(threshold, readings): |
||||
times = list() |
||||
# print(readings.values()) |
||||
for k, v in readings.items(): |
||||
# print([k, v]) |
||||
if (v >= threshold): |
||||
times.append(k) |
||||
return times |
||||
|
||||
def tally_flickers(selected_readings, all_readings): |
||||
selected = dict() |
||||
log_services.print_list(selected_readings) |
||||
# for time in selected_readings: |
||||
|
||||
# for time in selected_readings: |
||||
# # print(f"Looking for {time}...") |
||||
# selected[time] = all_readings[time] |
||||
# # for k, v in all_readings.items(): |
||||
return selected |
Reference in new issue