You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

204 lines
6.9 KiB

import datetime, json
import threading
import time
from itertools import zip_longest
import requests
from staticmap import StaticMap, CircleMarker
from colour import Color
import argparse
import oauthlib
import requests
from requests_oauthlib import OAuth1Session, OAuth1
token_url = 'https://account.api.here.com/oauth2/token'
matrix_url = "https://matrix.router.hereapi.com/v8/matrix"
matrix_status_url = "https://matrix.router.hereapi.com/v8/matrix/{0}/status"
matrix_retrieve_url = "https://matrix.router.hereapi.com/v8/matrix/{0}"
granularity = 0.0050
commute_range = 75
work_group_size = 100
color_scale_width = 60 # this is equal to minutes until full red saturation, 60 == 60 minutes
alpha = "70"
blot_size = 13
map_scale = 11
reverse = True
def get_bearer_token():
data = {"grant_type": "client_credentials"}
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
oauth = OAuth1(client_key, client_secret=client_secret)
r = requests.post(url=token_url, auth=oauth, headers=headers, data=data)
access_token = json.loads(r.content)['access_token']
return access_token
# Groups home_points into n groups, fills the non-fits with fillvalue
def group_elements(home_points, n, fillvalue=None):
args = [iter(home_points)] * n # this is some 9000IQ syntax
return zip_longest(*args, fillvalue=fillvalue)
# This is called with the
class Worker(threading.Thread):
def __init__(self, destination, coord_batch, graphical_map, access_token, backwards=False):
threading.Thread.__init__(self)
self.coord_batch = coord_batch
self.access_token = access_token
self.graphical_map = graphical_map
self.destination = destination
self.backwards = backwards
def run(self):
start_points = []
for idx, tup in enumerate(self.coord_batch):
if tup is not None:
start_points.append({'lat': tup[0], 'lng': tup[1]})
if self.backwards is True:
origins = [self.destination]
destinations = start_points
else:
origins = start_points
destinations = [self.destination]
request_payload = {
"origins": origins,
"destinations": destinations,
"regionDefinition": {
"type": "world",
# "type": "circle",
# "center": self.destination,
# "radius": 10000
}
}
headers = {
'Authorization': 'Bearer {}'.format(self.access_token),
'Content-Type': 'application/json'
}
response = requests.request("POST", matrix_url, headers=headers, data=json.dumps(request_payload))
post_response = json.loads(response.text)
matrix_id = post_response['matrixId']
while True:
response = requests.request("GET", matrix_status_url.format(matrix_id), headers=headers, allow_redirects=False)
jsons = json.loads(response.text)
if jsons['status'] == "inProgress" or jsons['status'] == "accepted":
time.sleep(10)
continue
elif jsons['status'] == "completed":
break
else:
print("big problems " + response.text)
break
headers = {
'Authorization': 'Bearer {}'.format(self.access_token),
# 'Content-Type': 'application/json',
'Accept - Encoding': 'gzip'
}
response = requests.request("GET", matrix_retrieve_url.format(matrix_id), headers=headers)
jsons = json.loads(response.text)
green = Color("green")
colors = list(green.range_to(Color("red"), color_scale_width))
for idx, tup in enumerate(self.coord_batch):
try:
timelen = jsons["matrix"]["travelTimes"][idx]
except:
continue
try:
if jsons["matrix"]["errorCodes"][idx] != 0:
error = True
else:
error = False
except:
error = False
pass
tup = (tup[1], tup[0])
col_val = int((timelen / 60)) # set the color bucket to the number of minutes
# so this means that full saturation will == the number of minutes when setting color_scale_width
if error is True:
marker = CircleMarker(tup, "blue", 6)
elif col_val >= color_scale_width:
# the + 70 here is just setting the aplha value
marker = CircleMarker(tup, Color("red").get_hex_l() + alpha, blot_size)
else:
if col_val == color_scale_width / 2:
marker = CircleMarker(tup, colors[col_val].get_hex_l() + "FF", blot_size)
else:
marker = CircleMarker(tup, colors[col_val].get_hex_l() + alpha, blot_size)
self.graphical_map.add_marker(marker)
print("Done with this one")
def entry():
access_token = get_bearer_token()
url = 'http://a.tile.osm.org/{z}/{x}/{y}.png'
url = 'https://cartodb-basemaps-b.global.ssl.fastly.net/light_nolabels/{z}/{x}/{y}.png '
graphical_map = StaticMap(3000, 3000, url_template=url)
# Since we are testing commute-to, we set the destination of "work"
destination_point = {'lat': 47.5121104, "lng": -121.8852897}
# Populate this list with an complete xy grid of points centered on destination_point
coord_grid = []
for x in range(-commute_range, commute_range):
for y in range(-commute_range, commute_range):
x_point = destination_point['lat'] + granularity * x
y_point = destination_point['lng'] + granularity * y
coord_grid.append((x_point, y_point))
coords_left_to_query = len(coord_grid)
# dispatch the workers to the pool
worker_pool = []
for coord_batch in group_elements(coord_grid, work_group_size):
w = Worker(destination_point, coord_batch, graphical_map, access_token, backwards=reverse)
w.start()
worker_pool.append(w)
# Print out our progress as wel join up the workers
for w in worker_pool:
w.join()
coords_left_to_query -= work_group_size
print("Coords left to query: {}".format(coords_left_to_query))
marker = CircleMarker((destination_point['lng'], destination_point['lat']), "red", 12)
graphical_map.add_marker(marker)
# finish up our work with the map
image = graphical_map.render(zoom=map_scale)
image.save('output-map.png')
# Program start
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--key", help="")
parser.add_argument("--secret", help="")
args = parser.parse_args()
client_key = args.key
client_secret = args.secret
entry()