詹学伟
詹学伟
Published on 2025-09-05 / 12 Visits
0
0

Selenuim自动化

一、背景和说明

该项目是朋友公司的需求,他们公司有个系统,系统中的数据不对,需要修正。

正确的数据在本地excel文件中,大约有20w条,文件中有每条数据的编号(唯一),然后是其他字段。

刚开始他们使用手动一条一条操作,后来发现数据太多,根本无法短时间内完成。

由于系统是其他部门开发的,他们没有修改数据的接口,所以只能通过自动化模拟正常用户进入系统操作。

二、我的思路

我的第一想法拿到对应的数据库信息(可写),但是朋友说只有一个可读的用户。

第二个想法是通过对应api来修正数据,但是朋友说对应的开发没有相关api。

于是就想到了使用自动化模拟用户操作,从而修正数据。

三、脚本代码

import logging
import threading
import time
from logging.handlers import RotatingFileHandler
from queue import Queue

import pandas as pd
from selenium import webdriver
from selenium.common.exceptions import TimeoutException, NoSuchElementException, NoAlertPresentException
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait


# 配置日志记录
def setup_logger():
    logger = logging.getLogger('selenium_automation')
    logger.setLevel(logging.INFO)

    # 创建文件处理器,限制每个日志文件大小为5MB,保留3个备份
    file_handler = RotatingFileHandler(
        'automation.log',
        maxBytes=5 * 1024 * 1024,
        backupCount=3,
        encoding='utf-8'
    )
    file_handler.setLevel(logging.INFO)

    # 创建控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)

    # 创建格式化器
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )

    # 添加格式化器到处理器
    file_handler.setFormatter(formatter)
    console_handler.setFormatter(formatter)

    # 添加处理器到日志器
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    return logger


# 初始化日志
logger = setup_logger()

# 配置信息
WEBSITE_URL = "https://xxxxxx"
USERNAME = "xxx"
PASSWORD = "xxx"
EXCEL_FILE = "/Users/zhanxuewei/Documents/data2.xlsx"
SHEET_NAME = "Sheet1"
THREAD_COUNT = 4  # 线程数量


class WebDriverWorker(threading.Thread):
    def __init__(self, queue, thread_id):
        threading.Thread.__init__(self)
        self.queue = queue
        self.thread_id = thread_id
        self.driver = None

    def run(self):
        chrome_options = Options()
        # 禁用所有弹窗和通知
        chrome_options.add_argument("--disable-notifications")
        chrome_options.add_argument("--disable-popup-blocking")

        # 禁用自动化控制提示
        chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
        chrome_options.add_experimental_option('useAutomationExtension', False)

        # 隐藏"Chrome正在被自动化软件控制"提示
        chrome_options.add_argument("--disable-infobars")

        # 无头模式可选(如需)
        # chrome_options.add_argument("--headless")
        chrome_options.add_argument("--headless")
        chrome_options.add_experimental_option(
            "prefs", {"profile.managed_default_content_settings.images": 2}
        )
        # self.driver = webdriver.Chrome(options=chrome_options)
        self.driver = webdriver.Chrome()
        try:
            self.login()
            self.process_queue()
        finally:
            if self.driver:
                self.driver.quit()

    def login(self):
        """登录系统"""
        logger.info(f"{threading.current_thread().name} 正在登录系统...")
        self.driver.get(WEBSITE_URL)
        self.driver.maximize_window()

        username_input = WebDriverWait(self.driver, 10).until(
            EC.presence_of_element_located((By.CSS_SELECTOR, "input[placeholder='请输入用户名']"))
        )
        password_input = self.driver.find_element(By.CSS_SELECTOR,
                                                  "input[placeholder='密码长度为8-15且包含字母、数字、特殊字符']")
        login_button = self.driver.find_element(By.CSS_SELECTOR, "button[type='button']")

        username_input.send_keys(USERNAME)
        time.sleep(1)
        password_input.send_keys(PASSWORD)
        time.sleep(1)
        login_button.click()
        time.sleep(5)

        # 进入资源管理菜单
        resource_menu = WebDriverWait(self.driver, 10).until(
            EC.element_to_be_clickable((By.XPATH, "//div[contains(text(),'资源管理')]"))
        )
        resource_menu.click()
        time.sleep(2)

        # 进入修改资源菜单
        update_resource_menu = WebDriverWait(self.driver, 10).until(
            EC.element_to_be_clickable((By.XPATH, "//span[contains(text(),'资源修改')]"))
        )
        update_resource_menu.click()
        time.sleep(2)
        self.driver.switch_to.frame(0)

    def process_queue(self):
        """处理队列中的数据"""
        while not self.queue.empty():
            survey_no, category_code, resource_name, to_east, to_south,to_north, to_west, resource_area, work_type = self.queue.get()

            try:
                logger.info(f"{threading.current_thread().name} 正在处理测绘编号: {survey_no}")
                # 展开查询条件
                search_btn = WebDriverWait(self.driver, 10).until(
                    # EC.element_to_be_clickable((By.CSS_SELECTOR, "span#search"))
                    EC.element_to_be_clickable((By.XPATH, "//span[@id='search']"))
                )
                search_btn.click()
                time.sleep(2)

                # 输入测绘编号查询
                search_input = WebDriverWait(self.driver, 10).until(
                    # EC.presence_of_element_located((By.CSS_SELECTOR, "input[name='dkbm']"))
                    EC.presence_of_element_located((By.XPATH, "//input[@id='textfield']"))
                )
                search_input.clear()
                search_input.send_keys(str(survey_no))

                # 点击查询按钮
                search_button = WebDriverWait(self.driver, 10).until(
                    EC.presence_of_element_located((By.XPATH, "//a[contains(text(),'查 询')]"))
                )
                search_button.click()
                time.sleep(2)

                # 点击修改按钮
                edit_button = WebDriverWait(self.driver, 10).until(
                    EC.presence_of_element_located((By.XPATH, "//table//a[text()='修改']"))
                )
                edit_button.click()
                time.sleep(1)

                # 1.资源名称
                resource_name_input = WebDriverWait(self.driver, 10).until(
                    EC.presence_of_element_located((By.CSS_SELECTOR, "input[id='resourcesName']"))
                )
                resource_name_input.clear()
                resource_name_input.send_keys(resource_name)

                # 2.资源类型 TODO
                # 3.面积
                resource_area_input = WebDriverWait(self.driver, 10).until(
                    EC.presence_of_element_located((By.CSS_SELECTOR, "input[id='resourcesArea']"))
                )
                resource_area_input.clear()
                resource_area_input.send_keys(resource_area)
                # 4.经营方式 TODO
                # 5.四至东
                dongzhi_input = WebDriverWait(self.driver, 10).until(
                    EC.presence_of_element_located((By.CSS_SELECTOR, "input[id='dongzhi']"))
                )
                dongzhi_input.clear()
                dongzhi_input.send_keys(to_east)

                # 5.四至南
                nanzhi_input = WebDriverWait(self.driver, 10).until(
                    EC.presence_of_element_located((By.CSS_SELECTOR, "input[id='nanzhi']"))
                )
                nanzhi_input.clear()
                nanzhi_input.send_keys(to_south)

                # 5.四至西
                xizhi_input = WebDriverWait(self.driver, 10).until(
                    EC.presence_of_element_located((By.CSS_SELECTOR, "input[id='xizhi']"))
                )
                xizhi_input.clear()
                xizhi_input.send_keys(to_north)

                # 5.四至北
                beizhi_input = WebDriverWait(self.driver, 10).until(
                    EC.presence_of_element_located((By.CSS_SELECTOR, "input[id='beizhi']"))
                )
                beizhi_input.clear()
                beizhi_input.send_keys(to_west)

                # 保存修改
                save_button = WebDriverWait(self.driver, 10).until(
                    EC.presence_of_element_located((By.XPATH, "//table//a[@id='zicbdsq_list2_saveBtn']"))
                )
                # save_button = self.driver.find_element(By.XPATH, "//table//a[@id='zicbdsq_list2_saveBtn']")
                save_button.click()
                time.sleep(0.5)

                try:
                    WebDriverWait(self.driver, 1).until(EC.alert_is_present())
                    self.driver.switch_to.alert.accept()
                    time.sleep(0.5)
                except (NoAlertPresentException, TimeoutException):
                    logger.error(f"{threading.current_thread().name} 弹出框alter异常: {survey_no} -> {resource_name}")
                    pass

                logger.info(f"{threading.current_thread().name} 成功处理: {survey_no} -> {resource_name}")

            except (TimeoutException, NoSuchElementException) as e:
                logger.error(f"线程{self.thread_id} 处理测绘编号 {survey_no} 时出错: {str(e)}")
            finally:
                self.queue.task_done()


def main():
    # 读取Excel数据
    logger.info("正在读取Excel数据...")
    df = pd.read_excel(EXCEL_FILE, sheet_name=SHEET_NAME, header=1)
    data = [(row[df.columns[0]], row[df.columns[1]], row[df.columns[2]], row[df.columns[4]], row[df.columns[5]],
             row[df.columns[6]], row[df.columns[7]], row[df.columns[8]], row[df.columns[10]]) for _, row in
            df.iterrows()
            if pd.notna(row[df.columns[0]]) and pd.notna(row[df.columns[2]])]

    if not data:
        logger.warning("Excel没有需要处理数据")
        return

    # 创建多个队列,每个线程一个队列
    queues = [Queue() for _ in range(THREAD_COUNT)]

    # 将数据均匀分配到各个队列
    for i, item in enumerate(data):
        queues[i % THREAD_COUNT].put(item)

    # 创建并启动工作线程
    threads = []
    for i in range(THREAD_COUNT):
        time.sleep(2)
        worker = WebDriverWorker(queues[i], i + 1)
        worker.start()
        threads.append(worker)

    # 等待所有队列处理完成
    for q in queues:
        q.join()

    # 等待所有线程完成
    for t in threads:
        t.join()

    logger.info("所有数据处理完成!")


if __name__ == "__main__":
    main()

四、结果展示

这是第四个版本,在第一个版本上有很多优化,一个是excel解析的优化,二个是xpath定位dom元素的优化,三个是添加了多线程....

中途有卡壳,就是xpath定位一个iframe内的dom元素的时候

完成总耗时大约2h,感觉还是挺有意思的


Comment