草榴论坛(1024) 自动回贴 py

import requests
import pickle
import random
import re
import os
import json
import onetimepass as otp
from time import sleep
from typing import BinaryIO , Dict , List , Union
import base64
import logging

#从 https://truecaptcha.org/ 处注册获取 userid 和 apikey
userid : str = "xxxx"
apikey : str = "xxxx"

#用户名 密码 2FA 密钥 多个账号用空格分隔
user : str = "xxxx xxxx"
password : str = "xxxx xxxx"
secret : str = "xxxx xxxx"

#循环间隔
PollingTime : int = 5
#回复次数限制
ReplyLimit : int = 10
#禁止版主帖子回复
Forbid : bool = True
#手动输入验证码
Input_self : bool = False
#每日点赞
Like : bool = True
#时间间隔最小
TimeIntervalStart : bool = 1024
#时间间隔最大
TimeIntervalEnd : bool = 2048
#回复内容
ReplyContent : List = ['感谢分享','感谢你的分享','谢谢分享','多谢分享','感谢作者的分享','谢谢坛友分享','内容精彩','的确如此','感谢分享','涨知识了','很有意思']


logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
FORMAT = '%(asctime)s %(message)s'
logging.basicConfig(format=FORMAT)

def save_cookies(session : requests.Session , filename : str) -> None:
    with open(filename, 'wb') as f:
        try:
            pickle.dump(session.cookies, f)
            logger.debug(f"save {filename} success")
        except:
            ...

def load_cookies(session : requests.Session , filename : str) -> None:
    with open(filename, 'rb') as f:
        try:
            session.cookies.update(pickle.load(f))
            logger.debug(f"load {filename} success")
        except:
            ...

def apitruecaptcha(content : BinaryIO) -> str:
    image=base64.b64encode(content)
    url='https://api.apitruecaptcha.org/one/gettext'
    data={'data':str(image,'utf-8'),
        'userid':userid,
        'apikey':apikey
    }
    result = requests.post(url, json.dumps(data))
    res=result.json()
    try:
        code = res['result']
    except:
        logger.debug(f"api error,{str(res)}")
        code = "XXXX"
    logger.debug("apitruecaptcha code: %s" % code)
    return code

def ttshitu(content : BinaryIO) -> str:
    image=base64.b64encode(content)
    host='http://api.ttshitu.com/base64'
    headers={'Content-Type':'application/json;charset=UTF-8'}
    data={
        'username': userid ,
        'password': apikey ,
        'image':image.decode('utf-8')
    }
    res=requests.post(url=host,data=json.dumps(data))
    res=res.text
    res=json.loads(res)
    res=res['data']['result']
    return res

class User:
    Like : bool = False
    Retry : int = 5
    RetryList : int = 10
    Host : str = "https://t66y.com/"
    Index : str = f"{Host}index.php"
    Login : str = f"{Host}login.php"
    Post : str = f"{Host}post.php?"
    Today : str = f"{Host}thread0806.php?fid=7&search=today"
    VerCode : str = f"{Host}require/codeimg.php?"
    Api : str = f"{Host}api.php"
    UserAgent : str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/90.0.4430.212 Safari/537.36"
    ReplyList : List = []
    Invalid : bool = False
    SleepTime : int = 0
    ReplyCount : int = ReplyLimit
    Headers : Dict = {'Host': Host.split("//")[1].replace("/", ""),'Proxy-Connection':'keep-alive','Referer': Login,'User-Agent': UserAgent
    }

    def __init__(self, username : str , password : str , secret : str) -> None:
        self.username : str = username
        self.password : str = password
        self.secret : str = secret
        self.cookies : requests.cookies = None
        self.s : requests.Session = requests.Session()
        file : str = f"./{username}.cookies"
        if os.path.exists(file):
            load_cookies(self.s, file)
            if self.is_valid_cookies():
                logger.info(f"{self.username} cookies is valid , login success")
                self.cookies = self.s.cookies
                return
            else:
                logger.info(f"{self.username} cookies is invalid , login again")
        if self.login():
            logger.info(f"{self.username} login success")
            self.cookies = self.s.cookies
            save_cookies(self.s, file)
        else:
            logger.info(f"{self.username} login failed")
            self.Invalid = True

    def login(self) -> None:
        def login1():
            sleep(2)
            data : Dict = {
            'pwuser': self.username,
            'pwpwd':  self.password,
            'hideid': '0',
            'cktime': '0',
            'forward': self.Post,
            'jumpurl': self.Post,
            'step': '2'
            }
            login = self.s.post(self.Login , headers = self.Headers , data = data)
            return login.text

        def login2():
            sleep(2)
            token = otp.get_totp(self.secret)
            data = {
            'step': '2',
            'cktime': '0',
            'oneCode': token
            }
            login=self.s.post(self.Login , headers = self.Headers , data = data)
            return login.text

        def captcha():
            sleep(2)
            code = random.uniform(0,1)
            code = round(code,16)
            VerCode = self.VerCode + str(code)
            image = self.s.get(VerCode , headers = self.Headers)
            if Input_self:
                with open("./captcha.png", "wb") as f:
                    f.write(image.content)
                vercode = input("请输入验证码:")
            else:
                vercode = apitruecaptcha(image.content)
            data={'validate': vercode}
            login = self.s.post(self.Login , data = data , headers = self.Headers)
            if "驗證碼不正確,請重新填寫" in login.text:
                logger.info(f"{self.username} captcha with code: {vercode} failed")
            else:
                logger.info(f"{self.username} captcha with code: {vercode} success") 

        res = login1()
        if res.find("賬號已開啟兩步驗證") != -1:
            res = login2()
            if res.find("您已經順利登錄") != -1:
                return True
        elif res.find("登录尝试次数过多") != -1:
            captcha()
            logger.debug(f"{self.username} complete captcha")
            # 請輸入您的帳號與密碼
        
        if self.Retry > 0:
            logger.debug(f"{self.username} retry login,remaining retry times: %d" % self.Retry)
            self.Retry -= 1
            if self.login():
                return True
        else:
            logger.debug(f"{self.username} login failed!")
            return False

    #校验 cookies 是否有效
    def is_valid_cookies(self):
        sleep(2)
        res = self.s.get(self.Index , headers = self.Headers)
        if res.text.find("上次登錄時間") != -1:
            return True
        else:
            return False

    def reply(self, url) -> None:
        sleep(2)
        if self.ReplyCount == 0:
            logger.info(f"{self.username} reply completed")
            return
        title = self.get_title(url)
        content = self.get_reply_content()
        tid = self.get_tid(url)
        data = {
            'atc_usesign':'1',
            'atc_convert':'1',
            'atc_autourl': '1',
            'atc_title':  title,
            'atc_content': content ,
            'step': '2',
            'action': 'reply',
            'fid': '7',
            'tid':  tid,
            'atc_attachment': 'none',
            'pid':'','article':'',
            'touid':'','verify':'verify'
        }
        res = requests.post(url = self.Post , data = data , headers = self.Headers , cookies = self.cookies)
        if res.text.find("發貼完畢點擊進入主題列表") != -1:
            self.ReplyCount -= 1
            logger.info(f"{self.username} reply {title} with {content} success , remaining reply times: %d" % self.ReplyCount)
            return True
        elif res.text.find("所屬的用戶組") != -1:
            logger.debug(f"{self.username} reply failed , usergroup is not allowed")
            return False
        elif res.text.find("管理員禁言, 類型為永久禁言") != -1:
            logger.debug(f"{self.username} reply failed , user is banned")
            return False
        else:
            logger.debug(f"{self.username} reply failed , unknown error")
            logger.error(res.text)
            return False

    def like(self, url : str) -> bool:
        if not Like or self.Like:
            return
        id = self.get_reply_id(url)
        if not id:
            return
        sleep(2)
        data={
            'action': 'clickLike',
            'id': id,
            'page': 'h',
            'json': '1',
            'url': url
        } 
        res = requests.post(self.Api , headers = self.Headers , data = data , cookies = self.cookies)
        try:
            if int(json.loads(res.text)['myMoney']) > 0 :
                logger.debug(f"{self.username} like success")
        except:
            return

    #简单浏览
    def browse(self, url : str) -> None:
        sleep(2)
        res = requests.get(url = url , headers = self.Headers , cookies = self.cookies)
    
    #获取今日帖子
    def get_today_list(self):
        sleep(2)
        res = requests.get(self.Today , headers = self.Headers)
        content = res.text

        pat_title : str = ('htm_data/\w+/\w+/\w+.html')
        pat_moderator : str = "版主:([\s\S]*?)<\/span>"
        pat_username : str = "username=(\w+)"
        pat_user : str = 'class="bl">(.*)?</a>'
        pat_all_title : str = '<h3><a href="([\s\S]*?)" rel="external nofollow" '
        moderator : str = re.search(pat_moderator, content).group(0)
        username : List = re.findall(pat_username, moderator)
        content = res.text[res.text.find('普通主題'):]
        all_username : List = re.findall(pat_user, content)
        title = re.findall(pat_title , content)
        all_title = re.findall(pat_all_title , content)
        logger.debug(f"{self.username} get list number: {str(len(title))}")

        if len(all_title) != len(all_username):
            if self.RetryList > 0:
                logger.debug(f"{self.username} get list number error , retry get list , remaining retry times: %d" % self.RetryList)
                self.RetryList -= 1
                sleep(2)
                self.get_today_list()
                return
            else:
                os._exit(0)

        black_list : List = []
        if Forbid:
            logger.info("moderator list:" + str(" ".join(username)))
            for index in range(len(all_username)):
                if all_username[index] in moderator:
                    black_list.append(all_title[index])
            for item in black_list:
                try:
                    title.remove(item)
                except:
                    ...
                logger.debug(f"{self.username} remove {item} from list")
        self.ReplyList = title
        logger.debug(f"{self.username} get reply list number {str(len(title))}")

    #从今日列表中抽取出一个帖子
    def get_one_link(self) -> Union[str , None]:
        if len(self.ReplyList) == 0:
            return None
        url = self.ReplyList[random.randint(0,len(self.ReplyList)-1)]
        logger.debug(f"{self.username} get one link: {url}")
        self.ReplyList.remove(url)
        return f"{self.Host}{url}"

    #从 url 中提取出 tid
    def get_tid(self , url : str) -> str:
        pat_tid = "/(\d+).html"
        tid = re.search(pat_tid , url).group(1)
        return tid

    #获取回复内容的 id 用于点赞
    def get_reply_id(self, url : str) -> Union[str , None]:
        sleep(2)
        res = requests.get(url = url , headers = self.Headers , cookies = self.cookies)
        reply_id_list = re.findall("<a name=#(\d+)><\/a>", res.text)
        if len(reply_id_list) == 0:
            return None
        else:
            return reply_id_list[random.randint(0,len(reply_id_list)-1)]

    #获取给定 url 的主题名字
    def get_title(self , url : str) -> str:
        sleep(2)
        res = requests.get(url = url , headers = self.Headers , cookies = self.cookies)
        pat_title = '<b> 本頁主題:</b> .*</td>'
        try:
            title = re.search(pat_title, res.text).group(0)
            title = "Re:" + title.replace('<b> 本頁主題:</b>','').replace('</td>','')
        except:
            title = "Re:"
        return title

    #随机获取回复内容
    def get_reply_content(self) -> str:
        return ReplyContent[random.randint(0,len(ReplyContent)-1)]

    # 获取回复次数
    def get_reply_number(self) -> str:
        sleep(2)
        res = requests.get(self.Index , headers = self.Headers , cookies = self.cookies)
        pat_reply_number = "共發表帖子: \d{1,5}"
        reply_number = re.search(pat_reply_number , res.text).group(0).replace('共發表帖子:','')
        return reply_number

    def get_username(self) -> str:
        return self.username

    def set_invalid(self) -> None:
        logger.info(f"{self.username} is invalid")
        self.Invalid = True

    def get_invalid(self) -> bool:
        return self.Invalid

    def set_sleep_time(self , time : int) -> None:
        self.SleepTime = time

    def get_sleep_time(self) -> int:
        return self.SleepTime

userlist=user.split()
passwordlist=password.split()
secretlist=secret.split()

users = []
for i in range(len(userlist)):
    user=User(userlist[i],passwordlist[i],secretlist[i])
    user.get_today_list()
    users.append(user)

while True:
    return_flag = True
    for user in users:        
        if user.get_invalid():
            continue
        else:
            return_flag = False
        if user.get_sleep_time() > 0:
            user.set_sleep_time(user.get_sleep_time() - PollingTime)
            continue
        url = user.get_one_link()
        if url is None:
            user.set_invalid()
            continue
        user.browse(url)
        if not user.reply(url):
            user.set_invalid()
            continue        
        user.like(url)
        sleep_time = random.randint(TimeIntervalStart,TimeIntervalEnd)
        logger.info(f"{user.get_username()} sleep {sleep_time} seconds")
        user.set_sleep_time(sleep_time)

    if return_flag:
        os._exit(0)

    sleep(PollingTime)

正文完
 
admin
版权声明:本站原创文章,由 admin 2023-01-28发表,共计9452字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。