Source code for zcooldl.zcooldl

# @Author: lonsty
# @Date:   2019-09-07 18:34:18
import json
import math
import os.path as op
import re
import sys
import threading
import time
from collections import namedtuple
from concurrent.futures import ThreadPoolExecutor, as_completed, wait
from datetime import datetime
from pathlib import Path
from queue import Empty, Queue
from typing import List
from urllib.parse import urljoin, urlparse
from uuid import uuid4

import requests
from bs4 import BeautifulSoup
from termcolor import colored, cprint

from zcooldl.utils import (mkdirs_if_not_exist, retry, safe_filename,
                           sort_records)

Scrapy = namedtuple('Scrapy', 'type author title objid index url')  # 用于记录下载任务
HEADERS = {
    'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 '
                  '(KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36'
}
HOST_PAGE = 'https://www.zcool.com.cn'
SEARCH_DESIGNER_SUFFIX = '/search/designer?&word={word}'
USER_SUFFIX = '/u/{id}'
PAGE_SUFFIX = '?myCate=0&sort=1&p={page}'
WORK_SUFFIX = '/work/content/show?p=1&objectId={objid}'
COLLECTION_SUFFIX = '/collection/contents?id={objid}&p={page}&pageSize=25'
USER_API = 'https://www.zcool.com.cn/member/card/{id}'
TIMEOUT = 30
Q_TIMEOUT = 1
MAX_WORKERS = 20
RETRIES = 3

thread_local = threading.local()


[docs]def get_session(): """使线程获取同一个 Session,可减少 TCP 连接数,加速请求。 :return requests.Session: session """ if not hasattr(thread_local, "session"): thread_local.session = requests.Session() return thread_local.session
[docs]@retry(Exception, tries=RETRIES) def session_request(url: str, method: str = 'GET') -> requests.Response: """使用 session 请求数据。使用了装饰器 retry,在网络异常导致错误时会重试。 :param str url: 目标请求 URL :param str method: 请求方式 :return requests.Response: 响应数据 """ resp = get_session().request(method, url, headers=HEADERS, timeout=TIMEOUT) resp.raise_for_status() return resp
[docs]class ZCoolScraper(): def __init__(self, user_id=None, username=None, collection=None, destination=None, max_pages=None, spec_topics=None, max_topics=None, max_workers=None, retries=None, redownload=None, overwrite=False, thumbnail=False): """初始化下载参数。 :param int user_id: 用户 ID :param str username: 用户名 :param HttpUrl collection: 收藏集 URL :param str destination: 图片保存到本地的路径,默认当前路径 :param int max_pages: 最大爬取页数,默认所有 :param list spec_topics: 需要下载的特定主题 :param int max_topics: 最大下载主题数量,默认所有 :param int max_workers: 线程开启个数,默认 20 :param int retries: 请求异常时的重试次数,默认 3 :param str redownload: 下载记录文件,给定此文件则从失败记录进行下载 :param bool overwrite: 是否覆盖已存在的文件,默认 False :param bool thumbnail: 是否下载缩略图,默认 False """ self.start_time = datetime.now() print(f' - - - - - -+-+ {self.start_time.ctime()} +-+- - - - - -\n') self.collection = collection self.spec_topics = spec_topics self.max_topics = max_topics or 'all' self.max_workers = max_workers or MAX_WORKERS self.pool = ThreadPoolExecutor(self.max_workers) self.overwrite = overwrite self.thumbnail = thumbnail self.pages = Queue() self.topics = Queue() self.images = Queue() self.stat = { 'npages': 0, 'ntopics': 0, 'nimages': 0, 'pages_pass': set(), 'pages_fail': set(), 'topics_pass': set(), 'topics_fail': set(), 'images_pass': set(), 'images_fail': set() } if retries: # 重置全局变量 RETRIES global RETRIES RETRIES = retries dest = Path(destination or '', urlparse(HOST_PAGE).netloc) # 从记录文件中的失败项开始下载 if redownload: self.username = self.reload_records(redownload) self.user_id = self.search_id_by_username(self.username) self.max_pages = self.pages.qsize() self.max_topics = self.topics.qsize() self.directory = dest / safe_filename(self.username) self.stat.update({ 'npages': self.max_pages, 'ntopics': self.max_topics, 'nimages': self.images.qsize() }) print(f'{"Username".rjust(17)}: {colored(self.username, "cyan")}\n' f'{"User ID".rjust(17)}: {self.user_id}\n' f'{"Pages to scrapy".rjust(17)}: {self.max_pages:2d}\n' f'{"Topics to scrapy".rjust(17)}: {self.max_topics:3d}\n' f'{"Images to scrapy".rjust(17)}: {self.images.qsize():4d}\n' f'Storage directory: {colored(self.directory, attrs=["underline"])}', end='\n\n') self.fetch_all(initialized=True) return # 从收藏集下载 if collection: objid = self.parse_objid(collection, is_collection=True) resp = session_request(urljoin(HOST_PAGE, COLLECTION_SUFFIX.format(objid=objid, page=1))) data = resp.json().get('data', {}) total = data.get('total', 0) page_size = data.get('pageable', {}).get('pageSize') max_pages_ = math.ceil(total / page_size) self.max_pages = min(max_pages or 9999, max_pages_) self.directory = dest / safe_filename(f'{self.username}-{self._collection_name}') self.parse_collection_topics(data.get('content')) # 解析第 2 页 至 最大页的 topic 到下载任务 for page in range(2, self.max_pages + 1): resp = session_request(urljoin(HOST_PAGE, COLLECTION_SUFFIX.format(objid=objid, page=page))) self.parse_collection_topics(topics=resp.json().get('data', {}).get('content'), offset=page_size * (page - 1)) # 根据用户 ID 或用户名下载 else: self.user_id = user_id or self.search_id_by_username(username) self.base_url = urljoin(HOST_PAGE, USER_SUFFIX.format(id=self.user_id)) try: response = session_request(self.base_url) except requests.exceptions.ProxyError: cprint('Cannot connect to proxy.', 'red') sys.exit(1) except Exception as e: cprint(f'Failed to connect to {self.base_url}, {e}', 'red') sys.exit(1) soup = BeautifulSoup(markup=response.text, features='html.parser') try: author = soup.find(name='div', id='body').get('data-name') if username and username != author: cprint(f'Invalid user id:「{user_id}」or username:「{username}」!', 'red') sys.exit(1) self.username = author except Exception: self.username = username or 'anonymous' self.directory = dest / safe_filename(self.username) try: max_pages_ = int(soup.find(id='laypage_0').find_all(name='a')[-2].text) except Exception: max_pages_ = 1 self.max_pages = min(max_pages or 9999, max_pages_) if self.spec_topics: topics = ', '.join(self.spec_topics) elif self.max_topics == 'all': topics = 'all' else: topics = self.max_pages * self.max_topics print(f'{"Username".rjust(17)}: {colored(self.username, "cyan")}\n' f'{"User ID".rjust(17)}: {self.user_id}\n' f'{"Maximum pages".rjust(17)}: {max_pages_}\n' f'{"Pages to scrapy".rjust(17)}: {self.max_pages}\n' f'{"Topics to scrapy".rjust(17)}: {topics}\n' f'Storage directory: {colored(self.directory, attrs=["underline"])}', end='\n\n') self.END_PARSING_TOPICS = False self.fetch_all(initialized=True if self.collection else False)
[docs] def search_id_by_username(self, username): """通过用户昵称查找用户 ID。 :param str username: 用户昵称 :return int: 用户 ID """ if not username: cprint('Must give an <user id> or <username>!', 'yellow') sys.exit(1) search_url = urljoin(HOST_PAGE, SEARCH_DESIGNER_SUFFIX.format(word=username)) try: response = session_request(search_url) except requests.exceptions.ProxyError: cprint('Cannot connect to proxy.', 'red') sys.exit(1) except Exception as e: cprint(f'Failed to connect to {search_url}, {e}', 'red') sys.exit(1) author_1st = BeautifulSoup(response.text, 'html.parser').find(name='div', class_='author-info') if (not author_1st) or (author_1st.get('data-name') != username): cprint(f'Username「{username}」does not exist!', 'yellow') sys.exit(1) return author_1st.get('data-id')
[docs] def reload_records(self, file): """从本地下载记录里读取下载失败的内容。 :param str file: 下载记录文件的路径。 :return str: 用户名 """ with open(file, 'r', encoding='utf-8') as f: for fail in json.loads(f.read()).get('fail'): scrapy = Scrapy._make(fail.values()) if scrapy.type == 'page': self.pages.put(scrapy) elif scrapy.type == 'topic': self.topics.put(scrapy) elif scrapy.type == 'image': self.images.put(scrapy) return scrapy.author
[docs] def generate_pages(self): """根据最大下载页数,生成需要爬取主页的任务。""" for page in range(1, self.max_pages + 1): suffix = COLLECTION_SUFFIX if self.collection else PAGE_SUFFIX url = urljoin(self.base_url, suffix.format(page=page)) scrapy = Scrapy(type='page', author=self.username, title=page, objid=None, index=page - 1, url=url) if scrapy not in self.stat["pages_pass"]: self.pages.put(scrapy)
[docs] def parse_collection_topics(self, topics: List[dict], offset: int = 0): for idx, topic in enumerate(topics): new_scrapy = Scrapy(type='topic', author=topic.get('creatorObj', {}).get('username'), title=topic.get('title'), objid=topic.get('id'), index=offset + idx, url=topic.get('pageUrl')) if new_scrapy not in self.stat["topics_pass"]: self.topics.put(new_scrapy) self.stat["ntopics"] += 1
[docs] def parse_topics(self, scrapy): """爬取主页,解析所有 topic,并将爬取主题的任务添加到任务队列。 :param scrapy: 记录任务信息的数据体 :return Scrapy: 记录任务信息的数据体 """ resp = session_request(scrapy.url) cards = BeautifulSoup(resp.text, 'html.parser').find_all(name='a', class_='card-img-hover') for idx, card in enumerate(cards if self.max_topics == 'all' else cards[:self.max_topics + 1]): title = card.get('title') if self.spec_topics and (title not in self.spec_topics): continue new_scrapy = Scrapy(type='topic', author=scrapy.author, title=title, objid=None, index=idx, url=card.get('href')) if new_scrapy not in self.stat["topics_pass"]: self.topics.put(new_scrapy) self.stat["ntopics"] += 1 return scrapy
[docs] def fetch_topics(self): """从任务队列中获取要爬取的主页,使用多线程处理得到需要爬取的主题。""" page_futures = {} while True: try: scrapy = self.pages.get(timeout=Q_TIMEOUT) page_futures[self.pool.submit(self.parse_topics, scrapy)] = scrapy except Empty: break except Exception: continue for future in as_completed(page_futures): scrapy = page_futures.get(future) try: future.result() self.stat["pages_pass"].add(scrapy) except Exception: self.stat["pages_fail"].add(scrapy) cprint(f'GET page: {scrapy.title} ({scrapy.url}) failed.', 'red') self.END_PARSING_TOPICS = True
[docs] def parse_objid(self, url: str, is_collection: bool = False) -> str: """根据 topic 页面解析 objid :param url: topic 或 collection 的 URL :return: objid """ soup = BeautifulSoup(session_request(url).text, 'html.parser') objid = soup.find('input', id='dataInput').attrs.get('data-objid') if is_collection: self._collection_name = soup.find('h2', class_='title-h2').text user = soup.find(name='span', class_='details-user-avatar') self.user_id = user.find('div').attrs.get('data-id') self.username = user.find('a').attrs.get('title') return objid
[docs] def parse_images(self, scrapy): """爬取 topic,获得 objid 后直接调用 API,从返回数据里获得图片地址等信息, 并将下载图片的任务添加到任务队列。 :param scrapy: 记录任务信息的数据体 :return Scrapy: 记录任务信息的数据体 """ objid = scrapy.objid or self.parse_objid(scrapy.url) resp = session_request(urljoin(HOST_PAGE, WORK_SUFFIX.format(objid=objid))) data = resp.json().get('data', {}) author = data.get('product', {}).get('creatorObj', {}).get('username') title = data.get('product', {}).get('title') objid = data.get('product', {}).get('id') for img in data.get('allImageList', []): new_scrapy = Scrapy(type='image', author=author, title=title, objid=objid, index=img.get('orderNo') or 0, url=img.get('url')) if new_scrapy not in self.stat["images_pass"]: self.images.put(new_scrapy) self.stat["nimages"] += 1 return scrapy
[docs] def fetch_images(self): """从任务队列中获取要爬取的主题,使用多线程处理得到需要下载的图片。""" image_futures = {} while True: try: scrapy = self.topics.get(timeout=Q_TIMEOUT) image_futures[self.pool.submit(self.parse_images, scrapy)] = scrapy except Empty: if self.END_PARSING_TOPICS: break except Exception: continue for future in as_completed(image_futures): scrapy = image_futures.get(future) try: future.result() self.stat["topics_pass"].add(scrapy) except Exception: self.stat["topics_fail"].add(scrapy) cprint(f'GET topic: {scrapy.title} ({scrapy.url}) failed.', 'red')
[docs] def fetch_all(self, initialized: bool = False): """同时爬取主页、主题,并更新状态。""" if not initialized: self.generate_pages() fetch_futures = [self.pool.submit(self.fetch_topics), self.pool.submit(self.fetch_images)] end_show_fetch = False t = threading.Thread(target=self.show_fetch_status, kwargs={'end': lambda: end_show_fetch}) t.start() try: wait(fetch_futures) except KeyboardInterrupt: raise finally: end_show_fetch = True t.join()
[docs] def show_fetch_status(self, interval=0.5, end=None): """用于后台线程,实现边爬取边显示状态。 :param int interval: 状态更新间隔,秒 :param function end: 用于控制退出线程 """ while True: status = 'Fetched Pages: {pages}\tTopics: {topics}\tImages: {images}'.format( pages=colored(str(self.max_pages).rjust(3), 'blue'), topics=colored(str(self.stat["ntopics"]).rjust(3), 'blue'), images=colored(str(self.stat["nimages"]).rjust(5), 'blue')) print(status, end='\r', flush=True) if (interval == 0) or (end and end()): print('\n') break time.sleep(interval)
[docs] def show_download_status(self, interval=0.5, end=None): """用于后台线程,实现边下载边显示状态。 :param int interval: 状态更新间隔,秒 :param function end: 用于控制退出线程 """ while True: completed = len(self.stat["images_pass"]) + len(self.stat["images_fail"]) if self.stat["nimages"] > 0: status = 'Time used: {time_used}\tFailed: {failed}\tCompleted: {completed}'.format( time_used=colored(str(datetime.now() - self.start_time)[:-7], 'yellow'), failed=colored(str(len(self.stat["images_fail"])).rjust(3), 'red'), completed=colored(str(int(completed / self.stat["nimages"] * 100)) + f'% ({completed}/{self.stat["nimages"]})', 'green')) print(status, end='\r', flush=True) if (interval == 0) or (end and end()): if self.stat["nimages"] > 0: print('\n') break time.sleep(interval)
[docs] def download_image(self, scrapy): """下载图片保存到本地。 :param scrapy: 记录任务信息的数据体 :return Scrapy: 记录任务信息的数据体 """ try: name = re.findall(r'(?<=/)\w*?\.(?:jpg|gif|png|bmp)', scrapy.url, re.IGNORECASE)[0] except IndexError: name = uuid4().hex + '.jpg' path = self.directory / safe_filename(scrapy.title) filename = path / f'[{scrapy.index + 1 or 0:02d}]{name}' if (not self.overwrite) and op.isfile(filename): return scrapy url = scrapy.url if self.thumbnail: if url.lower().endswith(('jpg', 'png', 'bmp')): url = f'{scrapy.url}@1280w_1l_2o_100sh.{url[-3:]}' resp = session_request(url) mkdirs_if_not_exist(path) with open(filename, 'wb') as f: for chunk in resp.iter_content(8192): f.write(chunk) return scrapy
[docs] def save_records(self): """将成功及失败的下载记录保存到本地文件。 :return str: 记录文件的路径 """ filename = f'{safe_filename(self.start_time.isoformat()[:-7])}.json' abspath = op.abspath(self.directory / filename) with open(abspath, 'w', encoding='utf-8') as f: success = (self.stat["pages_pass"] | self.stat["topics_pass"] | self.stat["images_pass"]) fail = (self.stat["pages_fail"] | self.stat["topics_fail"] | self.stat["images_fail"]) type_order = {'page': 1, 'topic': 2, 'image': 3} s_ordered = sort_records(success, order=type_order) f_ordered = sort_records(fail, order=type_order) records = { 'time': self.start_time.isoformat(), 'success': [scrapy._asdict() for scrapy in s_ordered], 'fail': [scrapy._asdict() for scrapy in f_ordered] } f.write(json.dumps(records, ensure_ascii=False, indent=2)) return abspath
[docs] def run_scraper(self): """使用多线程下载所有图片,完成后保存记录并退出程序。""" end_show_download = False t = threading.Thread(target=self.show_download_status, kwargs={'end': lambda: end_show_download}) t.start() image_futuress = {} while True: try: scrapy = self.images.get_nowait() if scrapy not in self.stat["images_pass"]: image_futuress[self.pool.submit(self.download_image, scrapy)] = scrapy except Empty: break except KeyboardInterrupt: raise except Exception: continue try: for future in as_completed(image_futuress): scrapy = image_futuress.get(future) try: future.result() self.stat["images_pass"].add(scrapy) except Exception: self.stat["images_fail"].add(scrapy) cprint(f'Download image: {scrapy.title}[{scrapy.index + 1}] ' f'({scrapy.url}) failed.', 'red') except KeyboardInterrupt: raise finally: end_show_download = True t.join() saved_images = len(self.stat["images_pass"]) failed_images = len(self.stat["images_fail"]) if saved_images or failed_images: if saved_images: print(f'Saved {colored(saved_images, "green")} images to ' f'{colored(self.directory.absolute(), attrs=["underline"])}') records_path = self.save_records() print(f'Saved records to {colored(records_path, attrs=["underline"])}') else: cprint('No images to download.', 'yellow')