Skip to content

Commit

Permalink
Improving logging (#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
leikoilja authored Apr 7, 2021
1 parent 8bc7a14 commit 4427814
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 85 deletions.
195 changes: 111 additions & 84 deletions glocaltokens/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,65 +48,74 @@ def __init__(
"""
Initializes a Device. Can set or google_device or ip and port
"""
LOGGER.debug("Initializing new Device instance.")
log_prefix = f"[Device - {device_name}]"
LOGGER.debug("%s Initializing new Device instance", log_prefix)
self.device_name = device_name
self.local_auth_token = None
self.ip_address = ip_address
self.port = port
self.google_device = google_device
self.hardware = hardware

LOGGER.debug(
"Set self device_name to '%s', local_auth_token to None, "
"google_device to %s, and hardware to %s",
device_name,
google_device,
hardware,
)
# Token and name validations
if not self.device_name:
LOGGER.error("%s device_name must be provided", log_prefix)
return
if not token_utils.is_local_auth_token(local_auth_token):
LOGGER.error(
"%s local_auth_token must follow the correct format", log_prefix
)
return

# Setting IP and PORT
if google_device:
LOGGER.debug("google_device is not None")
LOGGER.debug("Setting self ip to %s", google_device.ip_address)
LOGGER.debug(
"%s google_device is provided, using it's IP and PORT", log_prefix
)
self.ip_address = google_device.ip_address
LOGGER.debug("Setting self port to %s", google_device.port)
self.port = google_device.port
else:
LOGGER.debug("google_device is None")
LOGGER.debug(
"%s google_device is not provided, "
"using manually provided IP and PORT",
log_prefix,
)
if (ip_address and not port) or (not ip_address and port):
LOGGER.error(
"Both ip and port must be set, if one of them is specified."
"%s google_device is not provided, "
"both IP(%s) and PORT(%s) must be manually provided",
log_prefix,
ip_address,
port,
)
return
LOGGER.debug("Setting self ip to %s", ip_address)
self.ip_address = ip_address
LOGGER.debug("Setting self port to %s", port)
self.port = port

if not local_auth_token:
LOGGER.error("local_auth_token not set")
return

if not self.device_name:
LOGGER.error("device_name not set")
return

if not token_utils.is_local_auth_token(local_auth_token):
LOGGER.error("local_auth_token doesn't follow the correct format.")
return

# IP and PORT validation
if (
self.ip_address
and not net_utils.is_valid_ipv4_address(self.ip_address)
and not net_utils.is_valid_ipv6_address(self.ip_address)
):
LOGGER.error("ip must be a valid IP address")
LOGGER.error("%s IP(%s) is invalid", log_prefix, self.ip_address)
return

if self.port and not net_utils.is_valid_port(self.port):
LOGGER.error("port must be a valid port")
LOGGER.error("%s PORT(%s) is invalid", log_prefix, self.port)
return

LOGGER.debug("Setting self local_auth_token to %s", censor(local_auth_token))
LOGGER.debug(
'%s Set device_name to "%s", '
'local_auth_token to "%s", '
'IP to "%s", PORT to "%s" and hardware to "%s"',
log_prefix,
device_name,
censor(local_auth_token),
self.ip_address,
self.port,
hardware,
)
self.local_auth_token = local_auth_token

def __str__(self) -> str:
Expand Down Expand Up @@ -140,31 +149,44 @@ def __init__(
Initialize an GLocalAuthenticationTokens instance with google account
credentials
:params
username: google account username (the first part of email,
excluding @gmail.com);
password: google account password (can be app password);
username: google account username;
password: google account password (can be an app password);
master_token: google master token (instead of username/password
combination);
android_id: the id of an android device. Will be randomly generated
if not set;
verbose: wheather or not print debug logging information
"""
self.logging_level = logging.DEBUG if verbose else logging.ERROR
LOGGER.setLevel(self.logging_level)

LOGGER.debug("Initializing new GLocalAuthenticationTokens instance.")

self.username: Optional[str] = username
self.password: Optional[str] = password
self.master_token: Optional[str] = master_token
self.android_id: Optional[str] = android_id
self.access_token: Optional[str] = None
self.access_token_date: Optional[datetime] = None
self.homegraph = None
self.homegraph_date: Optional[datetime] = None
LOGGER.debug(
'Set self username to "%s", password to "%s", '
"Set GLocalAuthenticationTokens client access_token, homegraph, "
"access_token_date and homegraph_date to None"
)

LOGGER.debug(
"Set GLocalAuthenticationTokens client "
'username to "%s", password to "%s", '
'master_token to "%s" and android_id to %s',
censor(username),
censor(password),
censor(master_token),
censor(android_id),
)

# Validation
if (not self.username or not self.password) and not self.master_token:
LOGGER.error(
"You must either provide google username/password "
Expand All @@ -174,29 +196,21 @@ def __init__(
if self.master_token and not token_utils.is_aas_et(self.master_token):
LOGGER.error("master_token doesn't follow the AAS_ET format")
return
self.access_token: Optional[str] = None
self.homegraph = None
self.access_token_date: Optional[datetime] = None
self.homegraph_date: Optional[datetime] = None
LOGGER.debug(
"Set self access_token, homegraph, "
"access_token_date and homegraph_date to None"
)

@staticmethod
def _generate_mac_string() -> str:
"""Generate random 14 char long string"""
LOGGER.debug("Generating mac...")
LOGGER.debug("Generating mac string...")
random_uuid = uuid4()
random_string = str(random_uuid).replace("-", "")[:ANDROID_ID_LENGTH]
mac_string = random_string.upper()
LOGGER.debug("Generated mac: %s", mac_string)
LOGGER.debug("Generated mac string: %s", mac_string)
return mac_string

def get_android_id(self) -> str:
"""Return existing or generate android id"""
if not self.android_id:
LOGGER.debug("There is not any stored android_id, generating a new one")
LOGGER.debug("There is no stored android_id, generating a new one")
self.android_id = self._generate_mac_string()
return self.android_id

Expand All @@ -212,7 +226,10 @@ def get_master_token(self) -> Optional[str]:
return None

if not self.master_token:
LOGGER.debug("There is not any stored master_token, logging in...")
LOGGER.debug(
"There is no stored master_token, "
"logging in using username and password"
)
res = perform_master_login(
self.username, self.password, self.get_android_id()
)
Expand All @@ -221,7 +238,7 @@ def get_master_token(self) -> Optional[str]:
LOGGER.debug("Request response: %s", res)
return None
self.master_token = res["Token"]
LOGGER.debug("Master token: %s", self.master_token)
LOGGER.debug("Master token: %s", censor(self.master_token))
return self.master_token

def get_access_token(self) -> Optional[str]:
Expand All @@ -230,8 +247,8 @@ def get_access_token(self) -> Optional[str]:
self.access_token_date, ACCESS_TOKEN_DURATION
):
LOGGER.debug(
"There is not any stored access_token, "
"or the stored one has expired, getting a new one..."
"There is no access_token stored, "
"or it has expired, getting a new one..."
)
master_token = self.get_master_token()
if master_token is None:
Expand All @@ -255,49 +272,59 @@ def get_access_token(self) -> Optional[str]:
self.access_token = res["Auth"]
self.access_token_date = datetime.now()
LOGGER.debug(
"Access token: %s, datetime %s", self.access_token, self.access_token_date
"Access token: %s, datetime %s",
censor(self.access_token),
self.access_token_date,
)
return self.access_token

# pylint: disable=no-member
def get_homegraph(self):
"""Returns the entire Google Home Foyer V2 service"""
if self.homegraph is None or self._has_expired(
self.homegraph_date, HOMEGRAPH_DURATION
):
LOGGER.debug(
"There is not any stored homegraph, "
"or the stored one has expired, getting a new one..."
"There is no stored homegraph, or it has expired, getting a new one..."
)
log_prefix = "[GRPC]"
try:
LOGGER.debug("Creating SSL channel credentials...")
LOGGER.debug("%s Creating SSL channel credentials...", log_prefix)
scc = grpc.ssl_channel_credentials(root_certificates=None)
LOGGER.debug("Creating access token call credentials...")
LOGGER.debug("%s Creating access token call credentials...", log_prefix)
tok = grpc.access_token_call_credentials(self.get_access_token())
LOGGER.debug("Compositing channel credentials...")
LOGGER.debug("%s Compositing channel credentials...", log_prefix)
ccc = grpc.composite_channel_credentials(scc, tok)

LOGGER.debug(
"Establishing secure channel with the Google Home Foyer API..."
"%s Establishing secure channel with "
"the Google Home Foyer API...",
log_prefix,
)
with grpc.secure_channel(GOOGLE_HOME_FOYER_API, ccc) as channel:
LOGGER.debug("Getting channels StructuresServiceStub...")
LOGGER.debug(
"%s Getting channels StructuresServiceStub...", log_prefix
)
rpc_service = v1_pb2_grpc.StructuresServiceStub(channel)
LOGGER.debug("Getting HomeGraph request...")
LOGGER.debug("%s Getting HomeGraph request...", log_prefix)
request = v1_pb2.GetHomeGraphRequest(string1="", num2="")
LOGGER.debug("Fetching HomeGraph...")
LOGGER.debug("%s Fetching HomeGraph...", log_prefix)
response = rpc_service.GetHomeGraph(request)
LOGGER.debug("Storing gotten homegraph...")
LOGGER.debug("%s Storing obtained HomeGraph...", log_prefix)
self.homegraph = response
self.homegraph_date = datetime.now()
except grpc.RpcError as rpc_error:
LOGGER.debug("Got an RpcError.")
# pylint: disable=no-member
LOGGER.debug("%s Got an RpcError", log_prefix)
if rpc_error.code().name == "UNAUTHENTICATED":
LOGGER.warning("The access token has expired. Getting a new one.")
LOGGER.warning(
"%s The access token has expired. Getting a new one.",
log_prefix,
)
self.invalidate_access_token()
return self.get_homegraph()
LOGGER.error(
"Received unknown RPC error: code=%s " "message=%s",
"%s Received unknown RPC error: code=%s message=%s",
log_prefix,
rpc_error.code(),
rpc_error.details(),
)
Expand Down Expand Up @@ -327,21 +354,20 @@ def get_google_devices(
models_list = models_list if models_list else []

if force_homegraph_reload:
LOGGER.debug("Forcing homegraph reload.")
LOGGER.debug("Forcing homegraph reload")
self.invalidate_homegraph()

LOGGER.debug("Getting homegraph...")
homegraph = self.get_homegraph()
LOGGER.debug("Getting network devices...")
network_devices = (
discover_devices(

network_devices = []
if disable_discovery is False:
LOGGER.debug("Getting network devices...")
network_devices = discover_devices(
models_list,
zeroconf_instance=zeroconf_instance,
logging_level=self.logging_level,
)
if not disable_discovery
else None
)

def find_device(name) -> Optional[GoogleDevice]:
for device in network_devices:
Expand All @@ -350,9 +376,7 @@ def find_device(name) -> Optional[GoogleDevice]:
return None

devices: List[Device] = []
LOGGER.debug(
"Iterating in homegraph devices (len=%s)", len(homegraph.home.devices)
)
LOGGER.debug("Iterating in %d homegraph devices", len(homegraph.home.devices))
for item in homegraph.home.devices:
if item.local_auth_token != "":
# This checks if the current item is a valid model,
Expand All @@ -363,13 +387,11 @@ def find_device(name) -> Optional[GoogleDevice]:
LOGGER.debug("%s not in models_list", item.hardware.model)
continue

LOGGER.debug(
"Looking for a device in local network? %s",
network_devices is not None,
)
google_device = (
find_device(item.device_name) if network_devices else None
)
google_device = None
if network_devices:
LOGGER.debug("Looking for '%s' in local network", item.device_name)
google_device = find_device(item.device_name)

device = Device(
device_name=item.device_name,
local_auth_token=item.local_auth_token,
Expand All @@ -380,13 +402,18 @@ def find_device(name) -> Optional[GoogleDevice]:
LOGGER.debug("Adding %s to devices list", device.device_name)
devices.append(device)
else:
LOGGER.warning("Device initialization failed, skipping.")
LOGGER.warning(
"%s device initialization failed "
"because of missing local_auth_token, skipping.",
device.device_name,
)
else:
LOGGER.debug(
"local_auth_token is not initialized for %s", item.device_name
"'%s' local_auth_token is " "not found in Homegraph, skipping",
item.device_name,
)

LOGGER.debug("Google Home devices: %s", devices)
LOGGER.debug("Sucessfully initialized %d Google Home devices", len(devices))
return devices

def get_google_devices_json(
Expand Down
1 change: 0 additions & 1 deletion tests/assertions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import glocaltokens.utils.token as token_utils


# pylint: disable=too-few-public-methods
class DeviceAssertions:
"""Device specific assessors"""

Expand Down

0 comments on commit 4427814

Please sign in to comment.