Source code for nextrip

"""
.. _reference: http://svc.metrotransit.org/

A NexTrip API wrapper. See the NexTrip API `reference`_ for more information.
"""

import logging
import json
import time
import urllib.request

from enum import IntEnum
from functools import wraps
from typing import Union, Callable


__version__ = '1.0.1'

# Cache lifespan constants in seconds
GENERAL_CACHE_LIFESPAN = 3600
DEPARTURES_CACHE_LIFESPAN = 30

DEBUG = False


[docs]class CacheEntry: """ A cache entry object that contains cached data, a lifespan, and a property that returns whether or not the entry has expired. """
[docs] class CacheExpiredException(Exception): """Thrown if an entry was expired upon retrieval.""" def __init__(self): super().__init__('cache entry expired')
def __init__(self, lifespan: int, *, initial=None, debug=False): """ Creates a cache entry with the given lifespan. :param lifespan: number of seconds until the cache data expires :param initial: initial data to add to the cache """ # Debug handler = logging.StreamHandler() handler.setFormatter(logging.Formatter('[CE]: %(message)s')) self._logger = logging.getLogger(str(id(self))) self._logger.addHandler(handler) if debug: self._logger.setLevel(logging.DEBUG) self._data = initial self.last_fetched = time.time() if initial else 0 self.lifespan = lifespan @property def data(self): """The entry's cached data value""" if self.expired: self._logger.debug("cache entry expired (last_fetched %s)", self.last_fetched) raise self.CacheExpiredException return self._data @data.setter def data(self, new_data): self.last_fetched = time.time() self._data = new_data self._logger.debug("cache data set (last_fetched %s)", self.last_fetched) @property def expired(self) -> bool: """Whether or not the cache entry has expired""" return time.time() - self.last_fetched > self.lifespan
[docs] def hook(self, cache_bust: bool, callback: Callable, *args, **kwargs) -> 'CacheEntry': """ :param cache_bust: forces the callback to be made, overwriting any cached data :param callback: the function to be called with ``args`` and ``kwargs`` :Returns: the entry's data. If it has expired, it will be updated first with the return value of the callback. Additional (positional) arguments are used as parameters for the callback if it is needed. """ if self.expired or cache_bust: self._logger.debug("using callback (last_fetched %s)", self.last_fetched) self.data = callback(*args, **kwargs) return self.data
[docs]class NexTrip: """ .. |route ID| replace:: :meth:`route ID <routes>` .. |direction| replace:: :class:`direction <NexTrip.Directions>` .. |stop ID| replace:: `stop ID <https://gisdata.mn.gov/dataset/ us-mn-state-metc-trans-transit-schedule-google-fd>`__ A NexTrip API wrapper object. This provides coverage of all of the endpoints available and respects the caching requests as stated on the NexTrip API `reference`_. The return types of these endpoint methods are those returned by ``json.loads``. This will raise `urllib <https://docs.python.org/3/library/urllib.error.html>`_ and `json <https://docs.python.org/3/library/json.html#exceptions>`_ exceptions. More information regarding return types and constants can be found in the `reference`_. .. note:: All endpoint methods that include a ``cache_bust`` keyword argument provide the option to bypass the cache by setting it to ``True``. Example usage: .. code-block:: python :linenos: # Create a NexTrip wrapper object from nextrip import NexTrip nt = NexTrip() # METRO Blue cardinal directions dirs = nt.directions(901) # Get stops in the first direction provided stops = nt.stops(901, dirs[0]['Value']) # Print the last stop's code and name print('{0[Value]}: {0[Text]}'.format(stops[-1])) For a more extensive example, see the :func:`demo` source code. """
[docs] class Directions(IntEnum): """ Integer enumeration of available cardinal directions. These are defined on the NexTrip API `reference`_. """ SOUTH, EAST, WEST, NORTH = range(1, 5) # Why def __str__(self): return str(self.value)
_base = 'http://svc.metrotransit.org/NexTrip/' _headers = { 'Content-Type': 'application/json', 'Accept': 'application/json' } def __init__(self, *, debug=False): """Creates a NexTrip API wrapper object with caches.""" # Debug self._use_debug = debug handler = logging.StreamHandler() handler.setFormatter(logging.Formatter('[NT]: %(message)s')) self._logger = logging.getLogger(str(id(self))) self._logger.addHandler(handler) if debug: self._logger.setLevel(logging.DEBUG) self._providers_cache = CacheEntry(GENERAL_CACHE_LIFESPAN, debug=self._use_debug) self._routes_cache = CacheEntry(GENERAL_CACHE_LIFESPAN, debug=self._use_debug) self._directions_cache = {} self._stops_cache = {} self._departures_cache = {} self._timepoint_departures_cache = {} @staticmethod def _get_cache_key(*args): """Creates a cache key from the given arguments.""" return ':::'.join(str(it) for it in args) def _endpoint_method(method): """Adds logging calls to each endpoint method""" @wraps(method) def _decorated(self, *args, **kwargs): self._logger.debug("getting %s", method.__name__) return method(self, *args, **kwargs) return _decorated def _setdefault_entry(self, cache, key, lifespan=GENERAL_CACHE_LIFESPAN): """ Returns a :class:`.CacheEntry` in the given cache with the given key if it exists. If it does not exist, it is efficiently created with the given lifespan. Similar to ``dict.setdefault``, but does not always create a :class:`.CacheEntry` object. """ entry = cache.get(key) if not entry: entry = CacheEntry(lifespan, debug=self._use_debug) cache[key] = entry return entry def _request(self, path): req = urllib.request.Request(self._base + path, headers=self._headers) self._logger.debug("requesting %s", req.full_url) with urllib.request.urlopen(req) as response: return json.loads(response.read())
[docs] @_endpoint_method def providers(self, cache_bust=False): """:Returns: a list of provider names and their respective provider IDs.""" return self._providers_cache.hook(cache_bust, self._request, 'Providers')
[docs] @_endpoint_method def routes(self, cache_bust=False): """:Returns: a list of routes and their respective route IDs.""" return self._routes_cache.hook(cache_bust, self._request, 'Routes')
[docs] @_endpoint_method def directions(self, route: Union[str, int], cache_bust=False): """ :param route: a |route ID| :Returns: the pair of directions allowed for the given route. """ entry = self._setdefault_entry(self._directions_cache, self._get_cache_key(route)) return entry.hook(cache_bust, self._request, f'Directions/{route}')
[docs] @_endpoint_method def stops( self, route: Union[str, int], direction: Union[str, int, 'NexTrip.Directions'], cache_bust=False): """ :param route: a |route ID| :param direction: a cardinal |direction| number :Returns: a list of stops along the route in the direction specified. Results include the stop name and 4 character stop code. """ entry = self._setdefault_entry(self._stops_cache, self._get_cache_key(route, direction)) return entry.hook(cache_bust, self._request, f'Stops/{route}/{direction}')
[docs] @_endpoint_method def departures(self, stop_id: int, cache_bust=False): """ :param stop_id: a |stop ID|. :Returns: a list of departures scheduled for the given |stop ID|. """ entry = self._setdefault_entry( self._departures_cache, self._get_cache_key(stop_id), lifespan=DEPARTURES_CACHE_LIFESPAN) return entry.hook(cache_bust, self._request, f'Departures/{stop_id}')
[docs] @_endpoint_method def timepoint_departures( self, route: Union[str, int], direction: Union[str, int, 'NexTrip.Directions'], stop: str, cache_bust=False): """ :param route: a |route ID| :param direction: a cardinal |direction| number :param stop: a 4 character stop code :Returns: a list of departures from the given arguments. """ entry = self._setdefault_entry( self._timepoint_departures_cache, self._get_cache_key(route, direction, stop), lifespan=DEPARTURES_CACHE_LIFESPAN) return entry.hook(cache_bust, self._request, f'{route}/{direction}/{stop}')
[docs] @_endpoint_method def vehicle_locations(self, route: Union[str, int]): """ :param route: a |route ID| :Returns: the vehicles and their properties for the given route. """ return self._request(f'VehicleLocations/{route}')
[docs]def demo(route_name: str, stop_name: str, direction_name: str, debug=DEBUG, session=None): """ Demos the functionality of the wrapper. :param route_name: substring of the bus route name :param stop_name: substring of the bus stop name :param direction_name: a cardinal direction name :param NexTrip session: an existing NexTrip object can be provided to preserve cache entries :Returns: the number of minutes until the next bus at the given stop going in the given direction will leave. """ if session: nt = session else: nt = NexTrip(debug=DEBUG) # Debug logger = logging.getLogger('demo') if not logger.hasHandlers(): handler = logging.StreamHandler() handler.setFormatter(logging.Formatter('[Demo]: %(message)s')) logger.addHandler(handler) if debug or nt._use_debug: logger.setLevel(logging.DEBUG) # Get route logger.debug("getting route %s", route_name) routes = nt.routes() route = None for it in routes: if route_name in it['Description']: route = it['Route'] break if not route: raise Exception("Route not found") logger.debug("found route %s", route) # Convert direction logger.debug("getting direction %s", direction_name) direction = { 'south': NexTrip.Directions.SOUTH, 'east': NexTrip.Directions.EAST, 'west': NexTrip.Directions.WEST, 'north': NexTrip.Directions.NORTH }.get(direction_name) if not direction: raise Exception("Invalid direction") logger.debug("found direction %s", direction) # Get stop code logger.debug("getting stop %s", stop_name) stops = nt.stops(route, direction) stop = None for it in stops: if stop_name in it['Text']: stop = it['Value'] break if not stop: raise Exception("Stop not found") logger.debug("found stop %s", stop) # Get departures logger.debug("getting departures") departures = nt.timepoint_departures(route, direction, stop) if not departures: return None logger.debug("found departures") # Get latest departure and the time until the bus leaves logger.debug("getting departure time") raw_time = departures[0]['DepartureTime'] logger.debug("parsing time %s", raw_time) seconds = int(raw_time[6:].split('-')[0]) / 1000 logger.debug("time parsed to %s", seconds) remaining = (seconds - time.time()) / 60 if remaining <= 0: return None # Always have one minute remaining at minimum return round(remaining) or 1
if __name__ == '__main__': import sys if len(sys.argv) != 4: usage = ( "Usage:\n\tpython3 nextrip.py <route name> <stop name> <direction>\n\n\t" "Valid direction names: north, east, south, west") print(usage, file=sys.stderr) sys.exit(2) try: result = demo(*sys.argv[1:]) except Exception as e: print("Error: {}".format(e.args[0]), file=sys.stderr) sys.exit(1) if result: print("{} Minute{}".format(result, '' if result == 1 else 's')) sys.exit(0)