diff --git a/README.md b/README.md index 61ff8ff..4179844 100644 --- a/README.md +++ b/README.md @@ -1 +1,7 @@ -# infinity_tracker \ No newline at end of file +#Infinity Tracker +Inspired by infinitude (https://github.com/nebulous/infinitude/). Please give them all credit for the idea. + +Leverages https://github.com/inaz2/proxy2 + +####Usage +python tracker.py -h diff --git a/proxy.py b/proxy.py new file mode 100644 index 0000000..b325456 --- /dev/null +++ b/proxy.py @@ -0,0 +1,386 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# This proxies connections for a Carrier Infinity system. +# It reads the data being transferred and logs it to an influxdb server +import gzip +import httplib +import json +import os +import re +import select +import socket +import ssl +import sys +import threading +import time +import urlparse +import zlib +from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler +from SocketServer import ThreadingMixIn +from cStringIO import StringIO +from subprocess import Popen, PIPE +from HTMLParser import HTMLParser + + +def with_color(c, s): + return "\x1b[%dm%s\x1b[0m" % (c, s) + + +class ThreadingHTTPServer(ThreadingMixIn, HTTPServer): + address_family = socket.AF_INET6 + daemon_threads = True + + def handle_error(self, request, client_address): + # surpress socket/ssl related errors + cls, e = sys.exc_info()[:2] + if cls is socket.error or cls is ssl.SSLError: + pass + else: + return HTTPServer.handle_error(self, request, client_address) + + +class ProxyRequestHandler(BaseHTTPRequestHandler): + cakey = 'ca.key' + cacert = 'ca.crt' + certkey = 'cert.key' + certdir = 'certs/' + timeout = 5 + lock = threading.Lock() + + def __init__(self, *args, **kwargs): + self.tls = threading.local() + self.tls.conns = {} + + BaseHTTPRequestHandler.__init__(self, *args, **kwargs) + + def log_error(self, format, *args): + # surpress "Request timed out: timeout('timed out',)" + if isinstance(args[0], socket.timeout): + return + + self.log_message(format, *args) + + def do_CONNECT(self): + if (os.path.isfile(self.cakey) + and os.path.isfile(self.cacert) + and os.path.isfile(self.certkey) + and os.path.isdir(self.certdir)): + self.connect_intercept() + else: + self.connect_relay() + + def connect_intercept(self): + hostname = self.path.split(':')[0] + certpath = "%s/%s.crt" % (self.certdir.rstrip('/'), hostname) + + with self.lock: + if not os.path.isfile(certpath): + epoch = "%d" % (time.time() * 1000) + p1 = Popen(["openssl", "req", "-new", "-key", self.certkey, + "-subj", "/CN=%s" % hostname], stdout=PIPE) + p2 = Popen(["openssl", "x509", "-req", "-days", "3650", "-CA", + self.cacert, "-CAkey", self.cakey, "-set_serial", + epoch, "-out", certpath], stdin=p1.stdout, stderr=PIPE) # NOQA + p2.communicate() + + self.wfile.write("%s %d %s\r\n" % (self.protocol_version, + 200, + 'Connection Established') + ) + self.end_headers() + + self.connection = ssl.wrap_socket(self.connection, + keyfile=self.certkey, + certfile=certpath, + server_side=True + ) + self.rfile = self.connection.makefile("rb", self.rbufsize) + self.wfile = self.connection.makefile("wb", self.wbufsize) + + conntype = self.headers.get('Proxy-Connection', '') + if conntype.lower() == 'close': + self.close_connection = 1 + elif (conntype.lower() == 'keep-alive' + and self.protocol_version >= "HTTP/1.1"): + self.close_connection = 0 + + def connect_relay(self): + address = self.path.split(':', 1) + address[1] = int(address[1]) or 443 + try: + s = socket.create_connection(address, timeout=self.timeout) + except Exception: + self.send_error(502) + return + self.send_response(200, 'Connection Established') + self.end_headers() + + conns = [self.connection, s] + self.close_connection = 0 + while not self.close_connection: + rlist, wlist, xlist = select.select(conns, [], conns, self.timeout) + if xlist or not rlist: + break + for r in rlist: + other = conns[1] if r is conns[0] else conns[0] + data = r.recv(8192) + if not data: + self.close_connection = 1 + break + other.sendall(data) + + def do_GET(self): + if self.path == 'http://proxy2.test/': + self.send_cacert() + return + + req = self + content_length = int(req.headers.get('Content-Length', 0)) + req_body = self.rfile.read(content_length) if content_length else None + + if req.path[0] == '/': + if isinstance(self.connection, ssl.SSLSocket): + req.path = "https://%s%s" % (req.headers['Host'], req.path) + else: + req.path = "http://%s%s" % (req.headers['Host'], req.path) + + req_body_modified = self.request_handler(req, req_body) + if req_body_modified is False: + self.send_error(403) + return + elif req_body_modified is not None: + req_body = req_body_modified + req.headers['Content-length'] = str(len(req_body)) + + u = urlparse.urlsplit(req.path) + scheme, netloc = u.scheme, u.netloc + path = (u.path + '?' + u.query if u.query else u.path) + assert scheme in ('http', 'https') + if netloc: + req.headers['Host'] = netloc + req_headers = self.filter_headers(req.headers) + + try: + origin = (scheme, netloc) + if origin not in self.tls.conns: + if scheme == 'https': + ConnClass = httplib.HTTPSConnection + self.tls.conns[origin] = ConnClass(netloc, + timeout=self.timeout) + else: + ConnClass = httplib.HTTPConnection + self.tls.conns[origin] = ConnClass(netloc, + timeout=self.timeout) + conn = self.tls.conns[origin] + conn.request(self.command, path, req_body, dict(req_headers)) + res = conn.getresponse() + res_body = res.read() + except Exception: + if origin in self.tls.conns: + del self.tls.conns[origin] + self.send_error(502) + return + + version_table = {10: 'HTTP/1.0', 11: 'HTTP/1.1'} + setattr(res, 'headers', res.msg) + setattr(res, 'response_version', version_table[res.version]) + + content_encoding = res.headers.get('Content-Encoding', 'identity') + res_body_plain = self.decode_content_body(res_body, content_encoding) + + res_body_modified = self.response_handler(req, + req_body, + res, + res_body_plain) + if res_body_modified is False: + self.send_error(403) + return + elif res_body_modified is not None: + res_body_plain = res_body_modified + res_body = self.encode_content_body(res_body_plain, + content_encoding) + res.headers['Content-Length'] = str(len(res_body)) + + res_headers = self.filter_headers(res.headers) + + msg_write = "%s %d %s\r\n" % (self.protocol_version, + res.status, res.reason) + self.wfile.write(msg_write) + for line in res_headers.headers: + self.wfile.write(line) + self.end_headers() + self.wfile.write(res_body) + self.wfile.flush() + + with self.lock: + self.save_handler(req, req_body, res, res_body_plain) + + do_HEAD = do_GET + do_POST = do_GET + do_OPTIONS = do_GET + + def filter_headers(self, headers): + # http://tools.ietf.org/html/rfc2616#section-13.5.1 + hop_by_hop = ('connection', 'keep-alive', 'proxy-authenticate', + 'proxy-authorization', 'te', 'trailers', + 'transfer-encoding', 'upgrade') + for k in hop_by_hop: + del headers[k] + return headers + + def encode_content_body(self, text, encoding): + if encoding == 'identity': + data = text + elif encoding in ('gzip', 'x-gzip'): + io = StringIO() + with gzip.GzipFile(fileobj=io, mode='wb') as f: + f.write(text) + data = io.getvalue() + elif encoding == 'deflate': + data = zlib.compress(text) + else: + raise Exception("Unknown Content-Encoding: %s" % encoding) + return data + + def decode_content_body(self, data, encoding): + if encoding == 'identity': + text = data + elif encoding in ('gzip', 'x-gzip'): + io = StringIO(data) + with gzip.GzipFile(fileobj=io) as f: + text = f.read() + elif encoding == 'deflate': + try: + text = zlib.decompress(data) + except zlib.error: + text = zlib.decompress(data, -zlib.MAX_WBITS) + else: + raise Exception("Unknown Content-Encoding: %s" % encoding) + return text + + def send_cacert(self): + with open(self.cacert, 'rb') as f: + data = f.read() + + self.wfile.write("%s %d %s\r\n" % (self.protocol_version, 200, 'OK')) + self.send_header('Content-Type', 'application/x-x509-ca-cert') + self.send_header('Content-Length', len(data)) + self.send_header('Connection', 'close') + self.end_headers() + self.wfile.write(data) + + def print_info(self, req, req_body, res, res_body): + def parse_qsl(s): + return '\n'.join("%-20s %s" % (k, v) for k, v in urlparse.parse_qsl(s, keep_blank_values=True)) # NOQA + + req_header_text = "%s %s %s\n%s" % (req.command, req.path, req.request_version, req.headers) # NOQA + res_header_text = "%s %d %s\n%s" % (res.response_version, res.status, res.reason, res.headers) # NOQA + + print with_color(33, req_header_text) + + u = urlparse.urlsplit(req.path) + if u.query: + query_text = parse_qsl(u.query) + print with_color(32, "==== QUERY PARAMETERS ====\n%s\n" % query_text) # NOQA + + cookie = req.headers.get('Cookie', '') + if cookie: + cookie = parse_qsl(re.sub(r';\s*', '&', cookie)) + print with_color(32, "==== COOKIE ====\n%s\n" % cookie) + + auth = req.headers.get('Authorization', '') + if auth.lower().startswith('basic'): + token = auth.split()[1].decode('base64') + print with_color(31, "==== BASIC AUTH ====\n%s\n" % token) + + if req_body is not None: + req_body_text = None + content_type = req.headers.get('Content-Type', '') + + if content_type.startswith('application/x-www-form-urlencoded'): + req_body_text = parse_qsl(req_body) + elif content_type.startswith('application/json'): + try: + json_obj = json.loads(req_body) + json_str = json.dumps(json_obj, indent=2) + if json_str.count('\n') < 50: + req_body_text = json_str + else: + lines = json_str.splitlines() + req_body_text = "%s\n(%d lines)" % ('\n'.join(lines[:50]), len(lines)) # NOQA + except ValueError: + req_body_text = req_body + elif len(req_body) < 1024: + req_body_text = req_body + + if req_body_text: + print with_color(32, "==== REQUEST BODY ====\n%s\n" % req_body_text) # NOQA + + print with_color(36, res_header_text) + + cookies = res.headers.getheaders('Set-Cookie') + if cookies: + cookies = '\n'.join(cookies) + print with_color(31, "==== SET-COOKIE ====\n%s\n" % cookies) + + if res_body is not None: + res_body_text = None + content_type = res.headers.get('Content-Type', '') + + if content_type.startswith('application/json'): + try: + json_obj = json.loads(res_body) + json_str = json.dumps(json_obj, indent=2) + if json_str.count('\n') < 50: + res_body_text = json_str + else: + lines = json_str.splitlines() + res_body_text = "%s\n(%d lines)" % ( + '\n'.join(lines[:50]), + len(lines) + ) + except ValueError: + res_body_text = res_body + elif content_type.startswith('text/html'): + m = re.search(r']*>\s*([^<]+?)\s*', + res_body, re.I) + if m: + h = HTMLParser() + + s = m.group(1).decode('utf-8') + s = "==== HTML TITLE ====\n%s\n" % h.unescape(s) + print with_color(32, s) + elif content_type.startswith('text/') and len(res_body) < 1024: + res_body_text = res_body + elif content_type.startswith('application/xml'): + res_body_text = res_body + + if res_body_text: + s = "==== RESPONSE BODY ====\n%s\n" % res_body_text + print with_color(32, s) + + def request_handler(self, req, req_body): + pass + + def response_handler(self, req, req_body, res, res_body): + pass + + def save_handler(self, req, req_body, res, res_body): + self.print_info(req, req_body, res, res_body) + + +def test(HandlerClass=ProxyRequestHandler, ServerClass=ThreadingHTTPServer, + protocol="HTTP/1.1"): + # Set up clean logging to stderr + server_address = ('', '8080') + + HandlerClass.protocol_version = protocol + httpd = ServerClass(server_address, HandlerClass) + + sa = httpd.socket.getsockname() + print "Serving HTTP Proxy on", sa[0], "port", sa[1], "..." + httpd.serve_forever() + + +if __name__ == '__main__': + test() diff --git a/tracker.py b/tracker.py new file mode 100644 index 0000000..34566f6 --- /dev/null +++ b/tracker.py @@ -0,0 +1,360 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# This proxies connections for a Carrier Infinity system. +# It reads the data being transferred and logs it to an influxdb server +import argparse +import json +import logging +import proxy +import re +import requests +import urllib2 +import urlparse +import xml.etree.ElementTree as ET + +from utils import _escape_tag, _escape_value, extract_req_body + +logger = logging.getLogger() +logging.getLogger("requests").setLevel(logging.WARNING) +logging.getLogger("urllib3").setLevel(logging.WARNING) + + +# global for passing influxdb URL +influxdb = None + +# global for passing wunderground api key and location +api_key = None +location_query = None + + +# Registration of routes +routes = { + 'save': {}, + 'request': {}, + 'response': {} +} + + +def route(rule, f_type='save'): + """A decorator that is used to register a view function for a + given URL rule. + + f_type allows specifying whether it handles a request, a response, + or a save + """ + def decorator(f): + routes[f_type][rule] = f + return f + return decorator + + +def get_current_temp(api_key, location_query): + if not (api_key and location_query): + return False + url = 'http://api.wunderground.com/api/{}/geolookup/conditions/q/{}.json' + url = url.format(api_key, location_query) + f = urllib2.urlopen(url) + json_string = f.read() + parsed_json = json.loads(json_string) + temp_f = parsed_json['current_observation']['temp_f'] + f.close() + return temp_f + + +def status_handler(update_text, sn='Unknown'): + if not influxdb: + return + try: + root = ET.fromstring(update_text) + except Exception as e: # NOQA + logger.exception('Failed to parse request: %s', + update_text) + return + sn = _escape_tag(sn) + lines = [] + value = _escape_value(float(root.find('filtrlvl').text)) + lines.append('filter,sn={} value={}'.format(sn, value)) + unit_mode = _escape_value(root.find('mode').text) + lines.append('mode,sn={} value={}'.format(sn, unit_mode)) + zones = [] + known_tags = { + 'enabled': None, + 'currentActivity': 'activity', + 'rt': 'temp', + 'rh': 'humidity', + 'fan': 'fan', + 'htsp': 'heat_set_point', + 'clsp': 'cool_set_point', + 'hold': 'hold', + 'name': None, + 'otmr': None, + } + transforms = { + 'temp': float, + 'humidity': float, + 'heat_set_point': float, + 'cool_set_point': float, + 'fan': lambda val: val != 'off', # Converts to boolean + 'hold': lambda val: val != 'off' # Converts to boolean + } + + for zone_set in root.findall('zones'): + for zone in zone_set.findall('zone'): + if zone.find('enabled').text == 'off': + continue + hvac_zone = { + 'zone_id': _escape_tag(zone.attrib['id']), + 'name': _escape_tag(zone.find('name').text), + 'attributes': {} + } + for tag, key in known_tags.items(): + node = zone.find(tag) + if node is None: + logger.debug('Could not find tag %s in body: %s', tag, + zone.find(tag)) + continue + value = node.text or '0' + transform = transforms.get(key, str) + value = transform(value) + hvac_zone['attributes'][key] = _escape_value(value) + zones.append(hvac_zone) + for child in zone: + if child.tag not in known_tags: + logger.info('Unknown tag: %s: %s', child.tag, child.text) + for zone in zones: + templ = 'sn={},zone={},zone_id={}'.format(sn, zone['name'], + zone['zone_id']) + for field, value in zone['attributes'].items(): + if not field: + continue + if isinstance(value, float): + line = '{},{} value={}'.format(field, templ, value) + else: + line = '{},{} value={}'.format(field, templ, value) + lines.append(line) + logger.debug(unit_mode) + logger.debug(unit_mode == '"cool"') + if unit_mode == '"cool"' or unit_mode == '"dehumidify"': + logger.debug('Cooling') + field = 'cooling' + value = zone['attributes']['temp'] + line = '{},{} value={}'.format(field, templ, value) + lines.append(line) + if unit_mode == '"heat"': + field = 'heating' + value = zone['attributes']['temp'] + line = '{},{} value={}'.format(field, templ, value) + lines.append(line) + + headers = { + 'Content-Type': 'application/octet-stream', + 'Accept': 'text/plain' + } + try: + temp_f = get_current_temp(api_key, location_query) + if temp_f: + lines.append('outside_temp,sn={} value={}'.format(sn, temp_f)) + except Exception as e: + logger.exception('Failed to get current temp: %s', e) + lines = '\n'.join(lines) + lines = lines + '\n' + # logger.debug('Submitting %s', lines) + r = requests.post(influxdb, headers=headers, data=lines) + logging.getLogger('requests').debug(r.text) + return + + +@route('/systems/(?P.*)/status', 'request') +def systems_status_req_handler(req, req_body, sn): + """Handle save requests for systems status.""" + content_length = req.headers.get('Content-Length', '') + if content_length == 0: + logger.debug('Status check') + else: + req_body_text = None + content_type = req.headers.get('Content-Type', '') + if content_type.startswith('application/x-www-form-urlencoded'): + if req_body is not None: + req_body_text = extract_req_body(req_body) + status_handler(req_body_text, sn) + + +@route('/systems/(?P.*)/status') +def systems_status_save_handler(req, req_body, res, res_body, sn): + """Handle save requests for systems status.""" + content_type = res.headers.get('Content-Type', '') + + if content_type.startswith('application/xml'): + try: + root = ET.fromstring(res_body) + server_has_changes = root.find('serverHasChanges').text + if server_has_changes != 'false': + logger.debug('Remote changes') + else: + logger.debug('No remote changes') + except Exception as e: # NOQA + logger.exception('Failed to parse response: %s', res_body) + return True + + +@route('/systems/(?P.*)') +def config_handler(req, req_body, res, res_body, sn): + """Handle system config updates.""" + logger.info('System config update') + return True + + +@route('/systems/(?P.*)/idu_config') +def idu_config_handler(req, req_body, res, res_body, sn): + """Handle InDoor Unit config updates.""" + logger.info('InDoor Unit config update') + pass + + +@route('/systems/(?P.*)/odu_config') +def odu_config_handler(req, req_body, res, res_body, sn): + """Handle OutDoor Unit config updates.""" + logger.info('OutDoor Unit config update') + pass + + +@route('/systems/(?P.*)/idu_status') +def idu_status_handler(req, req_body, res, res_body, sn): + """Handle InDoor Unit status updates.""" + logger.info('InDoor Unit status update') + pass + + +@route('/systems/(?P.*)/odu_status') +def odu_status_handler(req, req_body, res, res_body, sn): + """Handle OutDoor Unit status updates.""" + logger.info('OutDoor Unit status update') + pass + + +@route('/Alive') +def alive_handler(req, req_body, res, res_body): + """Handles Alive calls.""" + logger.info('Alive called') + return True + + +@route('/weather/(?P.*)/forecast') +def forecast_handler(req, req_body, res, res_body, zip): + """Handles forecast requests""" + return True + + +class CarrierProxyRequestHandler(proxy.ProxyRequestHandler): + def request_handler(self, req, req_body): + """Used to modify requests.""" + u = urlparse.urlsplit(req.path) + path = u.path + handler = None + handler_routes = routes['request'] + for route in handler_routes: + route_re = '^{}$'.format(route) # Find exact matches only + m = re.match(route_re, path) + if m: + handler = handler_routes[route] + # From https://stackoverflow.com/q/11065419 + # Convert match elements to kw args + handler(req, req_body, **m.groupdict()) + pass + + def response_handler(self, req, req_body, res, res_body): + """Used to modify responses.""" + u = urlparse.urlsplit(req.path) + path = u.path + handler = None + handler_routes = routes['response'] + for route in handler_routes: + route_re = '^{}$'.format(route) # Find exact matches only + m = re.match(route_re, path) + if m: + handler = handler_routes[route] + # From https://stackoverflow.com/q/11065419 + # Convert match elements to kw args + handler(req, req_body, res, res_body, **m.groupdict()) + pass + + def save_handler(self, req, req_body, res, res_body): + squelch_output = False + u = urlparse.urlsplit(req.path) + path = u.path + handler = None + handler_routes = routes['save'] + for route in handler_routes: + route_re = '^{}$'.format(route) # Find exact matches only + m = re.match(route_re, path) + if m: + logger.debug('Found a save handler for %s', path) + handler = handler_routes[route] + # From https://stackoverflow.com/q/11065419 + # Convert match elements to kw args + squelch_output = handler(req, req_body, res, res_body, + **m.groupdict()) + if not squelch_output: + self.print_info(req, req_body, res, res_body) + if handler is None: + logger.info('Unknown save path: %s', path) + return + + +def main(): + parser = argparse.ArgumentParser(description='Proxy server.') + parser.add_argument('-p', '--port', default=8080, type=int, + help='Port to listen on') + parser.add_argument('-a', '--address', default='', type=str, + help='Address to listen on') + parser.add_argument('-s', '--server', type=str, + default='', + help='InfluxDB Server DSN') + parser.add_argument('--api_key', type=str, + help='Weather Underground API Key') + parser.add_argument('--location', type=str, + help='Weather Underground location query.') + log_choices = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] + parser.add_argument('-l', '--log', dest='logLevel', default='INFO', + choices=log_choices, help='Set the logging level') + args = parser.parse_args() + # Set up clean logging to stderr + log_level = getattr(logging, args.logLevel) + datefmt = '%m/%d/%Y %H:%M:%S' + log_format = '%(asctime)s' + if (args.logLevel == 'DEBUG'): + log_format = '{} %(filename)s'.format(log_format) + log_format = '{} %(funcName)s:%(lineno)d'.format(log_format) + log_format = '{} %(levelname)s: %(message)s'.format(log_format) + logging.basicConfig(level=log_level, format=log_format, datefmt=datefmt) + + global influxdb + if args.server != '': + influxdb = args.server + + global api_key + if args.api_key: + api_key = args.api_key + + global location_query + if args.location: + location_query = args.location + + server_address = (args.address, args.port) + HandlerClass = CarrierProxyRequestHandler + ServerClass = proxy.ThreadingHTTPServer + protocol = 'HTTP/1.1' + + HandlerClass.protocol_version = protocol + httpd = ServerClass(server_address, HandlerClass) + + sa = httpd.socket.getsockname() + logging.info('Serving HTTP Proxy on %s port %s ...', sa[0], sa[1]) + httpd.serve_forever() + return + +if __name__ == "__main__": + try: + main() + except (KeyboardInterrupt, SystemExit): + exit(1) diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..235d1c4 --- /dev/null +++ b/utils.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python +# Utility functions for infinity tracker. +import json +import urllib2 +import urlparse +from six import binary_type, text_type, integer_types, PY2 + + +# From python-influxdb +def _get_unicode(data, force=False): + """Try to return a text aka unicode object from the given data.""" + if isinstance(data, binary_type): + return data.decode('utf-8') + elif data is None: + return '' + elif force: + if PY2: + return unicode(data) + else: + return str(data) + else: + return data + + +# From python-influxdb +def _escape_tag(tag): + tag = _get_unicode(tag, force=True) + return tag.replace( + "\\", "\\\\" + ).replace( + " ", "\\ " + ).replace( + ",", "\\," + ).replace( + "=", "\\=" + ) + + +# From python-influxdb +def _escape_value(value): + value = _get_unicode(value) + if isinstance(value, text_type) and value != '': + return "\"{0}\"".format( + value.replace( + "\"", "\\\"" + ).replace( + "\n", "\\n" + ) + ) + elif isinstance(value, integer_types) and not isinstance(value, bool): + return str(value) + 'i' + else: + return str(value) + + +def extract_req_body(s): + qsl = urlparse.parse_qsl(s, keep_blank_values=True) + for k, v in qsl: + if k == 'data': + return v + return '' + + +def get_current_temp(api_key, location_query): + if not (api_key and location_query): + return False + base_url = 'http://api.wunderground.com/api/{}/'.format(api_key) + url = '{}/geolookup/conditions/q/{}.json'.format(base_url, + location_query) + f = urllib2.urlopen(url) + json_string = f.read() + parsed_json = json.loads(json_string) + temp_f = parsed_json['current_observation']['temp_f'] + f.close() + return temp_f