main.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. from class_xml import XML
  2. import config
  3. import notification
  4. from itertools import chain
  5. import datetime
  6. class Event:
  7. def __init__(self):
  8. pass
  9. class Event_ratp(Event):
  10. def __init__(self, ident, status, message):
  11. self.source = 'ratp_traffic'
  12. self.id = ident
  13. self.status = status
  14. self.message = message
  15. def problem(self):
  16. return self.status != 'normal'
  17. def ratp_traffic():
  18. for tag in XML(url="http://www.ratp.fr/meteo/", lang="html").data.select('div.encadre_ligne'):
  19. yield Event_ratp(tag['id'], tag.img['alt'], tag.select('span.perturb_message')[0].string)
  20. class Event_jcdecaux_vls_full(Event):
  21. def __init__(self, ident, nom, timestamp, places):
  22. self.source = 'jcdecaux_vls'
  23. self.id = ident + "_full"
  24. self.places = int(places)
  25. date = datetime.datetime.fromtimestamp(int(timestamp)/1000).strftime('à %Hh%M le %d/%m')
  26. self.message = 'Station vélo ' + nom.lower() + ' ' + date + ' : plus que ' + places + ' places disponibles !'
  27. def problem(self):
  28. return self.places <= 4 # TODO config
  29. class Event_jcdecaux_vls_empty(Event):
  30. def __init__(self, ident, nom, timestamp, bikes):
  31. self.source = 'jcdecaux_vls'
  32. self.id = ident + "_empty"
  33. self.bikes = int(bikes)
  34. date = datetime.datetime.fromtimestamp(int(timestamp)/1000).strftime('à %Hh%M le %d/%m')
  35. self.message = 'Station vélo ' + nom.lower() + ' ' + date + ' : plus que ' + bikes + ' vélos !'
  36. def problem(self):
  37. return self.bikes <= 4 # TODO config
  38. def jcdecaux_vls():
  39. ids = set(map(lambda s : s.split('_')[0], config.events['jcdecaux_vls']))
  40. for station in ids:
  41. xml = XML(url="https://api.jcdecaux.com/vls/v1/stations/" + station + "?contract=paris&apiKey="+config.api_key['jcdecaux'], lang="json")
  42. tag = xml.data.json
  43. yield Event_jcdecaux_vls_full(tag.number.string, tag.find('name').string, tag.last_update.string, tag.available_bike_stands.string)
  44. yield Event_jcdecaux_vls_empty(tag.number.string, tag.find('name').string, tag.last_update.string, tag.available_bikes.string)
  45. events=chain(ratp_traffic(), jcdecaux_vls())
  46. for event in events:
  47. if event.id in config.events.get(event.source, []):
  48. if event.problem():
  49. notification.notify(event.message)