From 7f1775a03937f5cbebcae136a84f6131d8270cfd Mon Sep 17 00:00:00 2001 From: Patrick-Ze <19711799+Patrick-Ze@users.noreply.github.com> Date: Sun, 8 Oct 2023 21:11:08 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BC=93=E5=AD=98`path=5Fto=5Fid`=E8=AE=BF?= =?UTF-8?q?=E9=97=AE=E8=BF=87=E7=9A=84=E7=9B=AE=E5=BD=95=E4=BB=A5=E5=87=8F?= =?UTF-8?q?=E5=B0=91API=E8=AF=B7=E6=B1=82=EF=BC=8C=E7=BC=A9=E7=9F=AD?= =?UTF-8?q?=E6=9F=A5=E6=89=BE=E6=97=B6=E9=97=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pikpakapi/__init__.py | 51 ++++++++++++++++++++++++++----------------- 1 file changed, 31 insertions(+), 20 deletions(-) diff --git a/pikpakapi/__init__.py b/pikpakapi/__init__.py index f529dd1..8c7db03 100644 --- a/pikpakapi/__init__.py +++ b/pikpakapi/__init__.py @@ -53,6 +53,8 @@ def __init__(self, username: str, password: str, proxy: Optional[str] = None): self.user_id = None + self._path_id_cache = {} + def get_headers(self, access_token: Optional[str] = None) -> Dict[str, str]: """ Returns the headers to use for the requests. @@ -393,45 +395,54 @@ async def path_to_id(self, path: str, create: bool = False) -> List[Dict[str, st return [] paths = path.split("/") paths = [p.strip() for p in paths if len(p) > 0] - path_ids = [] - count = 0 + # 构造不同级别的path表达式,尝试找到距离目标最近的那一层 + multi_level_paths = ["/" + "/".join(paths[:i + 1]) for i in range(len(paths))] + path_ids = [self._path_id_cache[p] for p in multi_level_paths if p in self._path_id_cache] + # 判断缓存命中情况 + hit_cnt = len(path_ids) + if hit_cnt == len(paths): + return path_ids + elif hit_cnt == 0: + count = 0 + parent_id = None + else: + count = hit_cnt + parent_id = path_ids[-1]["id"] + next_page_token = None - parent_id = None while count < len(paths): + current_parent_path = "/" + "/".join(paths[:count]) data = await self.file_list( parent_id=parent_id, next_page_token=next_page_token ) - id = "" - file_type = "" + record_of_target_path = None for f in data.get("files", []): + current_path = "/" + "/".join(paths[:count] + [f.get("name")]) + file_type = "folder" if f.get("kind", "").find("folder") != -1 else "file" + record = {"id": f.get("id"), "name": f.get("name"), "file_type": file_type} + self._path_id_cache[current_path] = record if f.get("name") == paths[count]: - id = f.get("id") - file_type = ( - "folder" if f.get("kind", "").find("folder") != -1 else "file" - ) - break - if id: - path_ids.append( - { - "id": id, - "name": paths[count], - "file_type": file_type, - } - ) + record_of_target_path = record + # 不break: 剩下的文件也同样缓存起来 + if record_of_target_path is not None: + path_ids.append(record_of_target_path) count += 1 - parent_id = id + parent_id = record_of_target_path["id"] elif data.get("next_page_token") and (not next_page_token or next_page_token != data.get("next_page_token")): next_page_token = data.get("next_page_token") elif create: data = await self.create_folder(name=paths[count], parent_id=parent_id) id = data.get("file").get("id") - path_ids.append( + record = ( { "id": id, "name": paths[count], "file_type": "folder", } ) + path_ids.append(record) + current_path = "/" + "/".join(paths[:count+1]) + self._path_id_cache[current_path] = record count += 1 parent_id = id else: