Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Now works with locally exported timeline. #65

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 121 additions & 18 deletions location_history_json_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@
shapely_available = True


def _get_timestampms(s):
def _get_timestamp(s):
if "timestampMs" in s:
return s["timestampMs"]
return str(int(isoparse(s["timestamp"]).timestamp() * 1000))
return datetime.fromtimestamp(int(s["timestampMs"]) / 1000)
return datetime.fromtimestamp(int(isoparse(s["timestamp"]).timestamp()))

def _valid_date(s):
try:
Expand Down Expand Up @@ -185,14 +185,14 @@ def _write_location(output, format, location, separator, first, last_location):

if format == "csv":
output.write(separator.join([
datetime.utcfromtimestamp(int(_get_timestampms(location)) / 1000).strftime("%Y-%m-%d %H:%M:%S"),
_get_timestamp(location).strftime("%Y-%m-%d %H:%M:%S"),
"%.8f" % (location["latitudeE7"] / 10000000),
"%.8f" % (location["longitudeE7"] / 10000000)
]) + "\n")

if format == "csvfull":
output.write(separator.join([
datetime.utcfromtimestamp(int(_get_timestampms(location)) / 1000).strftime("%Y-%m-%d %H:%M:%S"),
_get_timestamp(location).strftime("%Y-%m-%d %H:%M:%S"),
"%.8f" % (location["latitudeE7"] / 10000000),
"%.8f" % (location["longitudeE7"] / 10000000),
str(location.get("accuracy", "")),
Expand All @@ -204,7 +204,7 @@ def _write_location(output, format, location, separator, first, last_location):

if format == "csvfullest":
output.write(separator.join([
datetime.utcfromtimestamp(int(_get_timestampms(location)) / 1000).strftime("%Y-%m-%d %H:%M:%S"),
_get_timestamp(location).strftime("%Y-%m-%d %H:%M:%S"),
"%.8f" % (location["latitudeE7"] / 10000000),
"%.8f" % (location["longitudeE7"] / 10000000),
str(location.get("accuracy", "")),
Expand Down Expand Up @@ -238,7 +238,7 @@ def _write_location(output, format, location, separator, first, last_location):

# Order of these tags is important to make valid KML: TimeStamp, ExtendedData, then Point
output.write(" <TimeStamp><when>")
time = datetime.utcfromtimestamp(int(_get_timestampms(location)) / 1000)
time = _get_timestamp(location)
output.write(time.strftime("%Y-%m-%dT%H:%M:%SZ"))
output.write("</when></TimeStamp>\n")
if "accuracy" in location or "speed" in location or "altitude" in location:
Expand Down Expand Up @@ -271,7 +271,7 @@ def _write_location(output, format, location, separator, first, last_location):
if "altitude" in location:
output.write(" <ele>%d</ele>\n" % location["altitude"])

time = datetime.utcfromtimestamp(int(_get_timestampms(location)) / 1000)
time = _get_timestamp(location)
output.write(" <time>%s</time>\n" % time.strftime("%Y-%m-%dT%H:%M:%SZ"))
output.write(" <desc>%s" % time.strftime("%Y-%m-%d %H:%M:%S"))
if "accuracy" in location or "speed" in location:
Expand All @@ -292,7 +292,7 @@ def _write_location(output, format, location, separator, first, last_location):
output.write(" <trkseg>\n")

if last_location:
timedelta = abs((int(_get_timestampms(location)) - int(_get_timestampms(last_location))) / 1000 / 60)
timedelta = abs((_get_timestamp(location) - _get_timestamp(last_location)).total_seconds() / 60)
distancedelta = _distance(
location["latitudeE7"] / 10000000,
location["longitudeE7"] / 10000000,
Expand All @@ -313,7 +313,7 @@ def _write_location(output, format, location, separator, first, last_location):

if "altitude" in location:
output.write(" <ele>%d</ele>\n" % location["altitude"])
time = datetime.utcfromtimestamp(int(_get_timestampms(location)) / 1000)
time = _get_timestamp(location)
output.write(" <time>%s</time>\n" % time.strftime("%Y-%m-%dT%H:%M:%SZ"))
if "accuracy" in location or "speed" in location:
output.write(" <desc>\n")
Expand Down Expand Up @@ -344,6 +344,60 @@ def _write_footer(output, format):
output.write(" </trk>\n")
output.write("</gpx>\n")
return


def _get_latitudeE7(point):
"""Gets the latitude * 10E7 from a lat, lon pair in a string"""
return int(float(point.replace('°', '').replace(' ', '').split(",")[0]) * 10000000)


def _get_longitudeE7(point):
"""Gets the longitude * 10E7 from a lat, lon pair in a string"""
return int(float(point.replace('°', '').replace(' ', '').split(",")[1]) * 10000000)


def _convert_legacy_to_pre_2025(data):
"""Converts the legacy data as of 2025 to the earlier format"""

pre2025data = {}
pre2025data["locations"] = []

for item in data:
if "timelinePath" in item:
for location in item["timelinePath"]:
pre2025location = {}
pre2025location["latitudeE7"] = _get_latitudeE7(location["point"])
pre2025location["longitudeE7"] = _get_longitudeE7(location["point"])
pre2025location["timestamp"] = location["time"]
pre2025data["locations"].append(pre2025location)

return pre2025data


def _convert_to_pre_2025(data):
"""Converts the data as of 2025 to the earlier format"""

pre2025data = {}
pre2025data["locations"] = []

for item in data:
if "position" in item:
location = item["position"]
pre2025location = {}
pre2025location["latitudeE7"] = _get_latitudeE7(location["LatLng"])
pre2025location["longitudeE7"] = _get_longitudeE7(location["LatLng"])
pre2025location["timestamp"] = location["timestamp"]
if "accuracyMeters" in location:
pre2025location["accuracy"] = location["accuracyMeters"]
if "altitudeMeters" in location:
pre2025location["altitude"] = int(location["altitudeMeters"])
if "source" in location:
pre2025location["source"] = location["source"]
if "speedMetersPerSecond" in location:
pre2025location["velocity"] = int(location["speedMetersPerSecond"])
pre2025data["locations"].append(pre2025location)

return pre2025data


def convert(locations, output, format="kml",
Expand Down Expand Up @@ -390,7 +444,7 @@ def convert(locations, output, format="kml",
"""

if chronological:
locations = sorted(locations, key=_get_timestampms)
locations = sorted(locations, key=_get_timestamp)

_write_header(output, format, js_variable, separator)

Expand All @@ -402,7 +456,7 @@ def convert(locations, output, format="kml",
if "longitudeE7" not in item or "latitudeE7" not in item or (("timestampMs" not in item) and ("timestamp" not in item)):
continue

time = datetime.utcfromtimestamp(int(_get_timestampms(item)) / 1000)
time = _get_timestamp(item)
print("\r%s / Locations written: %s" % (time.strftime("%Y-%m-%d %H:%M"), added), end="")

if accuracy is not None and "accuracy" in item and item["accuracy"] > accuracy:
Expand Down Expand Up @@ -457,18 +511,48 @@ def main():
action="store_true"
)

arg_parser.add_argument("-s", "--startdate", help="The Start Date - format YYYY-MM-DD (defaults to 0h00m)", type=_valid_date)
arg_parser.add_argument("-e", "--enddate", help="The End Date - format YYYY-MM-DD (defaults to 23h59m59s)", type=_valid_date)
arg_parser.add_argument("--starttime", help="The Start Time - format HH:MM, only used if Start Date is set", type=_valid_time)
arg_parser.add_argument("--endtime", help="The End Time - format HH:MM, only used if End Date is set", type=_valid_time)
arg_parser.add_argument("-a", "--accuracy", help="Maximum accuracy (in meters), lower is better.", type=int)
arg_parser.add_argument(
"-s", "--startdate",
help="The Start Date - format YYYY-MM-DD (defaults to 0h00m)",
type=_valid_date
)

arg_parser.add_argument(
"-e", "--enddate",
help="The End Date - format YYYY-MM-DD (defaults to 23h59m59s)",
type=_valid_date
)

arg_parser.add_argument(
"--starttime",
help="The Start Time - format HH:MM, only used if Start Date is set",
type=_valid_time
)

arg_parser.add_argument(
"--endtime",
help="The End Time - format HH:MM, only used if End Date is set",
type=_valid_time
)

arg_parser.add_argument(
"-a", "--accuracy",
help="Maximum accuracy (in meters), lower is better.",
type=int
)

arg_parser.add_argument(
"-c", "--chronological",
help="Sort items in chronological order (might be unnessary)",
action="store_true"
)

arg_parser.add_argument(
"-l", "--legacy",
help="Use the legacy data in exports from the mobile device (no effect on data exported from servers)",
action="store_true"
)

arg_parser.add_argument(
"-v", "--variable",
default="locationJsonData",
Expand Down Expand Up @@ -547,7 +631,20 @@ def main():
print("ijson is not available. Please install with `pip install ijson` and try again.")
return

items = ijson.items(open(args.input, "r"), "locations.item")
try:
with open(args.input, "r") as f:
items = ijson.items(f, "locations.item")
if not len(list(items)):
f.seek(0)
if args.legacy:
data = _convert_legacy_to_pre_2025(ijson.items(f, "semanticSegments.item"))
else:
data = _convert_to_pre_2025(ijson.items(f, "rawSignals.item"))
items = data["locations"]

except OSError as error:
print("Error opening input file %s: %s" % (args.input, error))
return

else:
try:
Expand All @@ -565,6 +662,12 @@ def main():
except ValueError as error:
print("Error decoding json: %s" % error)
return

if not "locations" in data:
if args.legacy:
data = _convert_legacy_to_pre_2025(data["semanticSegments"])
else:
data = _convert_to_pre_2025(data["rawSignals"])

items = data["locations"]

Expand Down