tideillusion
179天前
979
举报

【学爬01】requests模拟浏览器登录lgulife

对python爬虫感兴趣,随缘开个贴记录下学习内容。

若有同好发现可改进之处,请一定要评论指出。

欢迎讨论!

import json

import requests
from lxml import html
from bs4 import BeautifulSoup as bs
import random
import pandas as pd


class lgulife_api:
    """
    A simple tool to visit lgulife.com with Python.

    The id of a post can be found at the end of the url. 
    Examples:
        The id of 
        https://www.lgulife.com/bbs/post/929/
        is 929.

    The ids of thread/comment/reply under a post can be found using the get_comment method.

    Attributes:
        _cookies: A cookie box contains all received cookies.
        _session: A requests.Session object.
        __name: User's name.
        __password: User's password.
        __token: Latest received CSRF cookie.
        __session_id: Login cookie.
        login_url: Url of login page.
        register_url: Url of register page.
        __is_logged_in: Whether login has succeeded.
        base_header: Basic request header.
    """
    def __init__(self):
        self._cookies = {}
        self._session = requests.Session()
        self.__name = ""
        self.__password = ""
        self.__token = ""
        self.__session_id = ""
        self.login_url = "https://www.lgulife.com/login/"
        self.register_url = "https://www.lgulife.com/register/"
        self.__is_logged_in = False
        self.base_header = {
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
            "Accept-Encoding": "gzip, deflate, br",
            "Accept-Language": "en,zh-CN;q=0.9,zh;q=0.8",
            "Cache-Control": "max-age=0",
            "Connection": "keep-alive",
            "Host": "www.lgulife.com",
            "Sec-Fetch-Dest": "document",
            "Sec-Fetch-Mode": "navigate",
            "Sec-Fetch-Site": "none",
            "Sec-Fetch-User": "?1",
            "Upgrade-Insecure-Requests": "1",
            "User_Agent": self.random_user_agent(),
        }

    def __require_login(func):
        """
        A decorator to check the status of login before some actions that required login.
        """
        def wrapper(self, *args, **kwargs):

            if self.__is_logged_in:
                return func(self, *args, **kwargs)
            else:
                print("[WARNING] %s requires login, please use \"login\" first!" % func.__name__)
                return

        return wrapper

    @staticmethod
    def random_user_agent():
        """
        Generate a random user agent.

        :return: A string of user agent.
        """
        user_agents = [
            'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US) AppleWebKit/532.0 (KHTML, like Gecko) Chrome/4.0.203.0 Safari/532.0',
            'Mozilla/5.0 (Windows; U; Windows NT 6.0; en-US) AppleWebKit/533.2 (KHTML, like Gecko) Chrome/5.0.342.5 Safari/533.2',
            'Mozilla/5.0 (Windows; U; Windows NT 6.0; en-US) AppleWebKit/532.0 (KHTML, like Gecko) Chrome/3.0.195.10 Safari/532.0',
            'Mozilla/5.0 (X11; U; Linux x86_64; en-US) AppleWebKit/532.0 (KHTML, like Gecko) Chrome/4.0.211.2 Safari/532.0',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1664.3 Safari/537.36',
            'Mozilla/5.0 ArchLinux (X11; U; Linux x86_64; en-US) AppleWebKit/534.30 (KHTML, like Gecko) Chrome/12.0.742.100 Safari/534.30',
            'Mozilla/5.0 (Windows; U; Windows NT 6.0; en-US) AppleWebKit/533.3 (KHTML, like Gecko) Chrome/8.0.552.224 Safari/533.3',
            'Mozilla/5.0 (Windows; U; Windows NT 6.0; en-US) AppleWebKit/525.19 (KHTML, like Gecko) Chrome/1.0.154.50 Safari/525.19',
            'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.34 Safari/534.24',
            'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US) AppleWebKit/532.0 (KHTML, like Gecko) Chrome/3.0.198.0 Safari/532.0']

        return random.choice(user_agents)

    # This method has not been tested yet.
    #
    # def register(self, name, email, password):
    #     result = self._session.get(url=self.register_url, headers=self.base_header)
    #     self._cookies.update(dict(result.cookies))
    #     tree = html.fromstring(result.text)
    #     self.__token = list(set(tree.xpath('//input[@name="csrfmiddlewaretoken"]/@value')))[0]
    #     form_token = list(set(html.fromstring(result.text).xpath(
    #         '//input[@name="csrfmiddlewaretoken"]/@value')))[0]
    #     register_header = self.base_header.copy().update({
    #         "Host": "www.lgulife.com",
    #         "Pragma": "no-cache",
    #         "Sec-Fetch-Site": "same-origin",
    #         "Origin": "https://www.lgulife.com",
    #         "Referer": self.register_url
    #     })
    #     register_form = (
    #         ("csrfmiddlewaretoken", (None, form_token)),
    #         ("username", (None, name)),
    #         ("email", (None, email)),
    #         ("password", (None, password)),
    #         ("confirm", (None, password)),
    #         ("invitation", (None, ""))
    #     )
    #     response = self._session.post(url=self.register_url, data=register_form,
    #                                   headers=register_header, cookies=self._cookies)
    #     return response

    def login(self, name, password):
        """
        Perform login action.

        :param name: User's name.
        :param password: User's password.
        :return: Response from server.
        """
        self.__name = name
        self.__password = password
        result = self._session.get(url=self.login_url, headers=self.base_header)
        tree = html.fromstring(result.text)
        self.__token = list(set(tree.xpath('//input[@name="csrfmiddlewaretoken"]/@value')))[0]
        login_header = self.base_header.copy().update({
            "Content-Type": "application/x-www-form-urlencoded",
            "Host": "www.lgulife.com",
            "Origin": "https://www.lgulife.com",
            "Referer": "https://www.lgulife.com/login/"})
        login_form = {
            "csrfmiddlewaretoken": self.__token,
            "action": "login",
            "name": self.__name,
            "password": self.__password
        }

        response = self._session.post(url=self.login_url, data=login_form, headers=login_header,
                                      allow_redirects=False)
        if response.status_code == 302:
            print("Login as %s!" % self.__name)
            self.__is_logged_in = True
            self._cookies.update(dict(response.cookies))
        else:
            print("[ERROR] Status code %d!" % response.status_code)
        return response

    def get_post(self, post_id, page=1):
        """
        Get the information of a certain page of a post.

        :param post_id: The id of the post.
        :param page: Page number.
        :return: Response from server.
        """
        response = self._session.get(url="https://www.lgulife.com/bbs/post/%d/?page=%d" % (post_id, page))
        if response.status_code == 200 and response.cookies:
            print("Successfully get post %d!" % post_id)
            self._cookies.update(dict(response.cookies))
        elif response.status_code != 200:
            print("[ERROR] Status code %d!" % response.status_code)
        else:
            print("[ERROR] Post %d may have been deleted!" % post_id)
        return response

    @__require_login
    def comment(self, post_id, text, thread_id=None):
        """
        For logged in user, post a reply under a post or under a reply to a post. By default, reply to a post.

        :param post_id: The id of the post.
        :param text: Comment context.
        :param thread_id: If the target is a reply to a post, then the id of the reply.
        :return: Response from server.
        """
        form_token = list(set(html.fromstring(self.get_post(post_id).text).xpath(
            '//input[@name="csrfmiddlewaretoken"]/@value')))[0]
        post_url = "https://www.lgulife.com/bbs/post/%d/" % post_id
        comment_header = self.base_header.copy().update({
            "Host": "www.lgulife.com",
            "Pragma": "no-cache",
            "Sec-Fetch-Site": "same-origin",
            "Origin": "https://www.lgulife.com",
            "Referer": post_url,
            "sessionid": self.__session_id
        })
        comment_form = (
            ("csrfmiddlewaretoken", (None, form_token)),
            ("action", (None, "creat_post")),
            ("action", (None, "create_thread")),
            ("reply_to", (None, thread_id)),
            ("content", (None, text)),
            ("editor-html-code", (None, "<p>%s</p>" % text)),
            ("image", (None, "", "application/octet-stream"))
        )
        response = self._session.post(url=post_url, data=comment_form,
                                      headers=comment_header, cookies=self._cookies)
        if response.status_code == 200:
            print("Successfully comment on %d!" % post_id)
        else:
            print("[ERROR] Status code %d!" % response.status_code)
        return response

    @__require_login
    def delete_comment(self, post_id, thread_id):
        """
        For logged in user, delete user's reply. Note that user can only delete his/her own reply.

        :param post_id: The id of the post.
        :param thread_id: The id of the reply.
        :return: Response from server.
        """
        print("[WARNING] If you have insufficient permissions, this method will silently fail!")
        form_token = list(set(html.fromstring(self.get_post(post_id).text).xpath(
            '//input[@name="csrfmiddlewaretoken"]/@value')))[0]
        post_url = "https://www.lgulife.com/bbs/post/%d/" % post_id
        delete_header = self.base_header.copy().update({
            "Host": "www.lgulife.com",
            "Pragma": "no-cache",
            "Sec-Fetch-Site": "same-origin",
            "Sec-Fetch-Dest": "empty",
            "Sec-Fetch-Mode": "cors",
            "Origin": "https://www.lgulife.com",
            "Referer": post_url,
            "sessionid": self.__session_id
        })
        delete_form = (
            ("csrfmiddlewaretoken", (None, form_token)),
            ("action", (None, "delete_thread")),
            ("thread_id", (None, str(thread_id)))
        )
        response = self._session.post(url=post_url, data=delete_form,
                                      headers=delete_header, cookies=self._cookies)
        if response.status_code == 200:
            print("Successfully delete comment %d in post %d!" % (thread_id, post_id))
        else:
            print("[ERROR] Status code %d!" % response.status_code)
        return response

    @__require_login
    def delete_post(self, post_id):
        """
        For logged in user, delete user's post. Note that user can only delete his/her own post.

        :param post_id: The id of the post.
        :return: Response from server.
        """
        print("[WARNING] If you have insufficient permissions, this method will silently fail!")
        form_token = list(set(html.fromstring(self.get_post(post_id).text).xpath(
            '//input[@name="csrfmiddlewaretoken"]/@value')))[0]
        post_url = "https://www.lgulife.com/bbs/post/%d/" % post_id
        delete_header = self.base_header.copy().update({
            "Host": "www.lgulife.com",
            "Pragma": "no-cache",
            "Sec-Fetch-Dest": "empty",
            "Sec-Fetch-Mode": "cors",
            "Sec-Fetch-Site": "same-origin",
            "Origin": "https://www.lgulife.com",
            "Referer": post_url,
            "sessionid": self.__session_id
        })
        delete_form = (
            ("csrfmiddlewaretoken", (None, form_token)),
            ("action", (None, "delete_post")),
        )
        response = self._session.post(url=post_url, data=delete_form,
                                      headers=delete_header, cookies=self._cookies)
        if response.status_code == 200:
            print("Successfully delete post %d!" % post_id)
        return response

    def get_comment(self, post_id, hot=False):
        """
        Collect all comments under a post.

        :param post_id: The id of the post.
        :param hot: Collect only hot comments or all comments.
        :return: A dataframe contains information of each comments including the name of replier, the id, the content  and the reply that this replie quotes.
        """
        if hot:
            index = 0
        else:
            index = -1
        page = 1
        soup = bs(self.get_post(post_id, page).text)
        threads_list = soup.find("div", {"id": "thread-list"}).find_all("div", {"class": "threads"})
        info = [info for info in threads_list[index].find_all("span", {"class": "reply-btn"})]
        context = threads_list[index].find_all("div", {"class": "thread-body"})
        while True:
            page += 1
            soup = bs(self.get_post(post_id, page).text)
            threads_list = soup.find("div", {"id": "thread-list"}).find_all("div", {"class": "threads"})
            if not bool(threads_list[-1].find_all("span", {"class": "reply-btn"})):
                break
            else:
                info.extend([info for info in threads_list[index].find_all("span", {"class": "reply-btn"})])
                context.extend(threads_list[index].find_all("div", {"class": "thread-body"}))

        name = [i["name"] for i in info]
        thread_id = [i["aria-valuetext"] for i in info]
        content = [content.find_all("div")[-1].find_all("p") for content in
                   context]
        text = ["\n".join(j.text for j in i) for i in content]
        quote = ["".join(
            [quote.find("p", {"class": "replied-creator"}).text, quote.find("p", {"class": None}).text]) if quote.find(
            "p", {"class": "replied-creator"}) else ""
                 for quote in context]

        return pd.DataFrame({"name": name, "thread_id": thread_id, "content": text, "quote": quote})

    @__require_login
    def vote_post(self, post_id, up):
        """
        Vote up or down on a post. Note that this method cannot guarantee the final voting status.

        :param post_id: The id of the post.
        :param up: Whether to vote up.
        :return: Response from server.
        """
        print("[INFO] This method does not print your final vote!")
        post_url = "https://www.lgulife.com/bbs/post/%d/" % post_id
        form_token = list(set(html.fromstring(self.get_post(post_id).text).xpath(
            '//input[@name="csrfmiddlewaretoken"]/@value')))[0]
        vote_header = self.base_header.copy().update({
            "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
            "Accept": "text/html, */*; q=0.01",
            "Sec-Fetch-Dest": "empty",
            "Sec-Fetch-Mode": "cors",
            "X-Requested-With": "XMLHttpRequest",
            "Cache-Control": "no-cache"
        })
        if up:
            attitude = "up"
        else:
            attitude = "down"
        vote_form = {
            "csrfmiddlewaretoken": form_token,
            "action": "post_vote",
            "attitude": attitude
        }
        response = self._session.post(url=post_url, data=vote_form, headers=vote_header, cookies=self._cookies)
        return response

    @__require_login
    def vote_comment(self, post_id, thread_id, up):
        """
        Vote up or down on a comment.

        :param post_id: The id of the post.
        :param thread_id: The id of the reply.
        :param up: Whether to vote up.
        :return: Response from server.
        """
        post_url = "https://www.lgulife.com/bbs/post/%d/" % post_id
        form_token = list(set(html.fromstring(self.get_post(post_id).text).xpath(
            '//input[@name="csrfmiddlewaretoken"]/@value')))[0]
        vote_header = self.base_header.copy().update({
            "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
            "Sec-Fetch-Dest": "empty",
            "Sec-Fetch-Mode": "cors",
            "X-Requested-With": "XMLHttpRequest",
            "Cache-Control": "no-cache"
        })
        if up:
            attitute = "up"
        else:
            attitute = "down"

        vote_form = {
            "csrfmiddlewaretoken": form_token,
            "type": "normal",
            "action": "thread_vote",
            "attitute": attitute,
            "thread_id": thread_id
        }
        response = self._session.post(url=post_url, data=vote_form, headers=vote_header, cookies=self._cookies)
        info = json.loads(response.text)
        if not (info["thread_down"] or info["thread_up"]):
            print("Successfully withdraw vote on comment %d in post %d, now the comment has %d upvotes and %d "
                  "downvotes!" % (thread_id, post_id, info["upvote_num"], info["downvote_num"]))
        elif info["thread_down"]:
            print(
                "Successfully downvote on comment %d in post %d, now the comment has %d upvotes and %d downvotes!" % (
                    thread_id, post_id, info["upvote_num"], info["downvote_num"]))
        elif info["thread_up"]:
            print(
                "Successfully upvote on comment %d in post %d, now the comment has %d upvotes and %d downvotes!" % (
                    thread_id, post_id, info["upvote_num"], info["downvote_num"]))
        return response

    @__require_login
    def like_post(self, post_id):
        """
        Vote up on a post.

        :param post_id: The id of the post.
        :return: Response from server.
        """
        post_url = "https://www.lgulife.com/"
        form_token = list(set(html.fromstring(self.get_post(post_id).text).xpath(
            '//input[@name="csrfmiddlewaretoken"]/@value')))[0]
        like_header = self.base_header.copy().update({
            "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
            "Sec-Fetch-Dest": "empty",
            "Sec-Fetch-Mode": "cors",
            "X-Requested-With": "XMLHttpRequest",
            "Cache-Control": "no-cache"
        })

        like_form = {
            "csrfmiddlewaretoken": form_token,
            "action": "post_upvote",
            "id": post_id
        }
        response = self._session.post(url=post_url, data=like_form, headers=like_header, cookies=self._cookies,
                                      allow_redirects=False)
        info = json.loads(response.text)
        if info["user_liked"]:
            print("Successfully like post %d, now the post has %d likes!" % (post_id, info["like_num"]))
        else:
            print("Successfully unlike post %d, now the post has %d likes!" % (post_id, info["like_num"]))
        return response


热评 (1)


刘洋
179天前
查看回复(3)

楼主的学习与分享精神值得肯定,不过还是想在这里知会一下各位,尽量不要对本站进行过多非正常使用目的的访问,以免对其他用户造成体验影响。本站目前架构未进行前后端分离,requests爬虫会返回整个html页面,给服务端计算和网络传输造成较大影响。等之后前后端分离之后,直接请求我们的API会更合适。

6
0
回复

全部回帖 (7)


刘洋
179天前
查看回复(3)

楼主的学习与分享精神值得肯定,不过还是想在这里知会一下各位,尽量不要对本站进行过多非正常使用目的的访问,以免对其他用户造成体验影响。本站目前架构未进行前后端分离,requests爬虫会返回整个html页面,给服务端计算和网络传输造成较大影响。等之后前后端分离之后,直接请求我们的API会更合适。

6
0
回复 # 1

work007
179天前
查看对话

刘洋:

楼主的学习与分享精神值得肯定,不过还是想在这里知会一下各位,尽量不要对本站进行过多非正常使用目的的访

命令行版LGULife指日可待?

0
0
回复 # 2

若愚是一只鱼🐟
179天前
查看对话

刘洋:

楼主的学习与分享精神值得肯定,不过还是想在这里知会一下各位,尽量不要对本站进行过多非正常使用目的的访

每日一问:今天 LGU·Life 前后端分离了吗?

0
0
回复 # 3

tideillusion
楼主
179天前
查看对话

刘洋:

楼主的学习与分享精神值得肯定,不过还是想在这里知会一下各位,尽量不要对本站进行过多非正常使用目的的访

期待ing

0
0
回复 # 4

卡Q因
179天前

再爬土豆🥔可撑不住了啊,求放过

0
0
回复 # 5

圆圆圆
178天前

刀架ly脖子上了

什么时候换土豆啊

0
0
回复 # 6

吃瓜不停
178天前

快进到DDOS

0
0
回复 # 7


    请登录参与讨论 :)