Cách sử dụng ThreadPoolExecutor trong Python 3
Các chuỗi Python là một dạng song song cho phép chương trình của bạn chạy nhiều thủ tục cùng một lúc. Tính song song trong Python cũng có thể đạt được bằng cách sử dụng nhiều quy trình, nhưng các stream đặc biệt phù hợp để tăng tốc các ứng dụng liên quan đến lượng I / O (đầu vào / kết quả ) đáng kể.Ví dụ về hoạt động liên kết I / O bao gồm thực hiện các yêu cầu web và đọc dữ liệu từ các file . Ngược lại với các hoạt động ràng buộc I / O, các hoạt động ràng buộc CPU (như thực hiện phép toán với thư viện chuẩn Python) sẽ không được hưởng lợi nhiều từ các stream Python.
Python 3 bao gồm tiện ích ThreadPoolExecutor
để thực thi mã trong một stream .
Trong hướng dẫn này, ta sẽ sử dụng ThreadPoolExecutor
để thực hiện các yêu cầu mạng một cách nhanh chóng. Ta sẽ xác định một hàm rất phù hợp để gọi trong các stream , sử dụng ThreadPoolExecutor
để thực thi chức năng đó và xử lý kết quả từ các lần thực thi đó.
Đối với hướng dẫn này, ta sẽ thực hiện các yêu cầu mạng để kiểm tra sự tồn tại của các trang Wikipedia .
Lưu ý: Thực tế là các hoạt động liên kết I / O được hưởng lợi nhiều hơn từ các stream so với các hoạt động liên kết CPU là do một đặc điểm riêng trong Python được gọi là khóa thông dịch toàn cục . Nếu muốn, bạn có thể tìm hiểu thêm về khóa thông dịch global của Python trong tài liệu Python chính thức .
Yêu cầu
Để tận dụng tối đa hướng dẫn này, bạn nên làm quen với lập trình bằng Python và môi trường lập trình Python local với requests
được cài đặt.
Bạn có thể xem lại các hướng dẫn này để biết thông tin cơ bản cần thiết:
- Cách viết mã bằng Python 3
Cách cài đặt Python 3 và cài đặt môi trường lập trình local trên Ubuntu 18.04
Để cài đặt gói
requests
vào môi trường lập trình Python local của bạn, bạn có thể chạy lệnh sau:
- pip install --user requests==2.23.0
Bước 1 - Xác định một hàm để thực thi trong chuỗi
Hãy bắt đầu bằng cách xác định một hàm mà ta muốn thực thi với sự trợ giúp của các stream .
Sử dụng nano
hoặc môi trường phát triển / soạn thảo văn bản bạn muốn , bạn có thể mở file này:
- nano wiki_page_function.py
Đối với hướng dẫn này, ta sẽ viết một hàm xác định xem trang Wikipedia có tồn tại hay không:
import requests def get_wiki_page_existence(wiki_page_url, timeout=10): response = requests.get(url=wiki_page_url, timeout=timeout) page_status = "unknown" if response.status_code == 200: page_status = "exists" elif response.status_code == 404: page_status = "does not exist" return wiki_page_url + " - " + page_status
Các get_wiki_page_existence
chức năng chấp nhận hai đối số: một URL đến một trang Wikipedia ( wiki_page_url
), và một timeout
số giây để chờ đợi một phản ứng từ URL đó.
get_wiki_page_existence
sử dụng gói requests
để thực hiện yêu cầu web tới URL đó. Tùy thuộc vào mã trạng thái của response
HTTP, một chuỗi được trả về mô tả trang có tồn tại hay không. Các mã trạng thái khác nhau thể hiện các kết quả khác nhau của một yêu cầu HTTP. Quy trình này giả định mã trạng thái 200
"thành công" nghĩa là trang Wikipedia tồn tại và mã trạng thái 404
"không tìm thấy" nghĩa là trang Wikipedia không tồn tại.
Như được mô tả trong phần Yêu cầu , bạn cần cài đặt gói requests
để chạy chức năng này.
Hãy thử chạy hàm bằng cách thêm url
và lệnh gọi hàm sau hàm get_wiki_page_existence
:
. . . url = "https://en.wikipedia.org/wiki/Ocean" print(get_wiki_page_existence(wiki_page_url=url))
Khi bạn đã thêm mã, hãy lưu file .
Nếu ta chạy mã này:
- python wiki_page_function.py
Ta sẽ thấy kết quả như sau:
Outputhttps://en.wikipedia.org/wiki/Ocean - exists
Việc gọi hàm get_wiki_page_existence
với một trang Wikipedia hợp lệ sẽ trả về một chuỗi xác nhận trên thực tế, trang đó tồn tại.
Cảnh báo: Nói chung, không an toàn khi chia sẻ các đối tượng hoặc trạng thái Python giữa các stream mà không cần chú ý đặc biệt để tránh các lỗi đồng thời. Khi xác định một hàm để thực thi trong một stream , cách tốt nhất là xác định một hàm thực hiện một công việc duy nhất và không chia sẻ hoặc xuất bản trạng thái cho các stream khác. get_wiki_page_existence
là một ví dụ về một hàm như vậy.
Bước 2 - Sử dụng ThreadPoolExecutor để thực thi một chức năng trong chủ đề
Bây giờ ta có một hàm rất phù hợp để gọi với các stream , ta có thể sử dụng ThreadPoolExecutor
để thực hiện nhanh nhiều lệnh gọi của hàm đó.
Hãy thêm đoạn mã được đánh dấu sau vào chương trình của bạn trong wiki_page_function.py
:
import requests import concurrent.futures def get_wiki_page_existence(wiki_page_url, timeout=10): response = requests.get(url=wiki_page_url, timeout=timeout) page_status = "unknown" if response.status_code == 200: page_status = "exists" elif response.status_code == 404: page_status = "does not exist" return wiki_page_url + " - " + page_status wiki_page_urls = [ "https://en.wikipedia.org/wiki/Ocean", "https://en.wikipedia.org/wiki/Island", "https://en.wikipedia.org/wiki/this_page_does_not_exist", "https://en.wikipedia.org/wiki/Shark", ] with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for url in wiki_page_urls: futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url)) for future in concurrent.futures.as_completed(futures): print(future.result())
Ta hãy xem cách mã này hoạt động:
-
concurrent.futures
được nhập để cung cấp cho ta quyền truy cập vàoThreadPoolExecutor
. - Câu lệnh
with
được sử dụng để tạo một trìnhexecutor
cá thểThreadPoolExecutor
sẽ nhanh chóng dọn dẹp các stream sau khi hoàn thành. - Bốn công việc được
submitted
cho ngườiexecutor
: một công việc cho mỗi URL trong danh sáchwiki_page_urls
. - Mỗi cuộc gọi để
submit
trả về một versionFuture
được lưu trữ trong danh sáchfutures
. - Hàm
as_completed
đợi mỗiget_wiki_page_existence
gọiget_wiki_page_existence
Future
hoàn tất để ta có thể in kết quả của nó.
Nếu ta chạy lại chương trình này, với lệnh sau:
- python wiki_page_function.py
Ta sẽ thấy kết quả như sau:
Outputhttps://en.wikipedia.org/wiki/Island - exists https://en.wikipedia.org/wiki/Ocean - exists https://en.wikipedia.org/wiki/this_page_does_not_exist - does not exist https://en.wikipedia.org/wiki/Shark - exists
Kết quả này có ý nghĩa: 3 trong số các URL là trang Wikipedia hợp lệ và một trong số chúng this_page_does_not_exist
thì không. Lưu ý kết quả của bạn có thể được đặt hàng khác với kết quả này. Hàm concurrent.futures.as_completed
trong ví dụ này trả về kết quả ngay khi chúng có sẵn, dù công việc được gửi theo thứ tự nào.
Bước 3 - Xử lý ngoại lệ từ các hàm chạy trong chuỗi
Trong bước trước, get_wiki_page_existence
đã trả về thành công một giá trị cho tất cả các lệnh gọi của ta . Trong bước này, ta sẽ thấy rằng ThreadPoolExecutor
cũng có thể nêu ra các ngoại lệ được tạo trong các lệnh gọi hàm stream .
Hãy xem xét khối mã ví dụ sau:
import requests import concurrent.futures def get_wiki_page_existence(wiki_page_url, timeout=10): response = requests.get(url=wiki_page_url, timeout=timeout) page_status = "unknown" if response.status_code == 200: page_status = "exists" elif response.status_code == 404: page_status = "does not exist" return wiki_page_url + " - " + page_status wiki_page_urls = [ "https://en.wikipedia.org/wiki/Ocean", "https://en.wikipedia.org/wiki/Island", "https://en.wikipedia.org/wiki/this_page_does_not_exist", "https://en.wikipedia.org/wiki/Shark", ] with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for url in wiki_page_urls: futures.append( executor.submit( get_wiki_page_existence, wiki_page_url=url, timeout=0.00001 ) ) for future in concurrent.futures.as_completed(futures): try: print(future.result()) except requests.ConnectTimeout: print("ConnectTimeout.")
Khối mã này gần giống với khối mà ta đã sử dụng ở Bước 2, nhưng nó có hai điểm khác biệt chính:
- Bây giờ ta vượt qua
timeout=0.00001
đểget_wiki_page_existence
. Vì góirequests
sẽ không thể hoàn thành yêu cầu web của nó tới Wikipedia trong0.00001
giây, nên nó sẽ đưa ra một ngoại lệConnectTimeout
. - Ta bắt các ngoại lệ
ConnectTimeout
được nêu ra bởifuture.result()
và in ra một chuỗi mỗi lần ta làm như vậy.
Nếu ta chạy lại chương trình, ta sẽ thấy kết quả sau:
OutputConnectTimeout. ConnectTimeout. ConnectTimeout. ConnectTimeout.
Bốn thông báo ConnectTimeout
được in — một thông báo cho mỗi wiki_page_urls
của ta , vì không ai trong số chúng có thể hoàn thành trong 0.00001
giây và mỗi lệnh trong bốn get_wiki_page_existence
gọi get_wiki_page_existence
đã nêu ra ngoại lệ ConnectTimeout
.
Đến đây bạn đã thấy rằng nếu một lệnh gọi hàm được gửi đến ThreadPoolExecutor
tạo ra một ngoại lệ, thì ngoại lệ đó có thể được nâng lên bình thường bằng cách gọi Future.result
. Gọi Future.result
trên tất cả các lệnh gọi đã gửi của bạn đảm bảo chương trình của bạn sẽ không bỏ lỡ bất kỳ trường hợp ngoại lệ nào được nêu ra từ hàm stream của bạn.
Bước 4 - So sánh thời gian thực thi có và không có stream
Bây giờ hãy xác minh việc sử dụng ThreadPoolExecutor
thực sự làm cho chương trình của bạn nhanh hơn.
Đầu tiên, hãy dành thời gian get_wiki_page_existence
nếu ta chạy nó mà không có chuỗi:
import time import requests import concurrent.futures def get_wiki_page_existence(wiki_page_url, timeout=10): response = requests.get(url=wiki_page_url, timeout=timeout) page_status = "unknown" if response.status_code == 200: page_status = "exists" elif response.status_code == 404: page_status = "does not exist" return wiki_page_url + " - " + page_status wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)] print("Running without threads:") without_threads_start = time.time() for url in wiki_page_urls: print(get_wiki_page_existence(wiki_page_url=url)) print("Without threads time:", time.time() - without_threads_start)
Trong ví dụ mã, ta gọi hàm get_wiki_page_existence
với năm mươi URL trang Wikipedia khác nhau từng cái một. Ta sử dụng hàm time.time()
để in ra số giây cần thiết để chạy chương trình của ta .
Nếu ta chạy lại mã này như trước, ta sẽ thấy kết quả như sau:
OutputRunning without threads: https://en.wikipedia.org/wiki/0 - exists https://en.wikipedia.org/wiki/1 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Without threads time: 5.803015232086182
Mục 2–47 trong kết quả này đã được bỏ qua cho ngắn gọn.
Số giây được in sau Without threads time
sẽ khác khi bạn chạy nó trên máy của bạn — không sao cả, bạn chỉ nhận được một số cơ sở để so sánh với giải pháp sử dụng ThreadPoolExecutor
. Trong trường hợp này, nó là ~5.803
giây.
Hãy chạy cùng năm mươi URL Wikipedia thông qua get_wiki_page_existence
, nhưng lần này sử dụng ThreadPoolExecutor
:
import time import requests import concurrent.futures def get_wiki_page_existence(wiki_page_url, timeout=10): response = requests.get(url=wiki_page_url, timeout=timeout) page_status = "unknown" if response.status_code == 200: page_status = "exists" elif response.status_code == 404: page_status = "does not exist" return wiki_page_url + " - " + page_status wiki_page_urls = ["https://en.wikipedia.org/wiki/" + str(i) for i in range(50)] print("Running threaded:") threaded_start = time.time() with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for url in wiki_page_urls: futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url)) for future in concurrent.futures.as_completed(futures): print(future.result()) print("Threaded time:", time.time() - threaded_start)
Mã này giống như mã ta đã tạo ở Bước 2, chỉ với việc bổ sung một số câu lệnh in cho ta biết số giây cần thiết để thực thi mã của ta .
Nếu ta chạy lại chương trình, ta sẽ thấy như sau:
OutputRunning threaded: https://en.wikipedia.org/wiki/1 - exists https://en.wikipedia.org/wiki/0 - exists . . . https://en.wikipedia.org/wiki/48 - exists https://en.wikipedia.org/wiki/49 - exists Threaded time: 1.2201685905456543
, số giây được in sau Threaded time
sẽ khác trên máy tính của bạn (cũng như thứ tự kết quả của bạn).
Đến đây bạn có thể so sánh thời gian thực hiện để tìm nạp 50 URL trang Wikipedia có và không có chuỗi.
Trên máy được sử dụng trong hướng dẫn này, không có stream mất ~5.803
giây và có stream mất ~1.220
giây. Chương trình của ta chạy nhanh hơn đáng kể với các chủ đề.
Kết luận
Trong hướng dẫn này, bạn đã học cách sử dụng trình ThreadPoolExecutor
trong Python 3 để chạy mã bị ràng buộc I / O một cách hiệu quả. Bạn đã tạo một hàm rất phù hợp để gọi trong các stream , học cách truy xuất cả kết quả và ngoại lệ từ các lần thực thi theo stream của hàm đó và quan sát hiệu suất tăng lên khi sử dụng các stream .
Từ đây, bạn có thể tìm hiểu thêm về các hàm đồng thời khác được cung cấp bởi mô-đun concurrent.futures
.
Các tin liên quan
Cách sử dụng module sqlite3 trong Python 32020-06-02
Cách thiết lập notebook Jupyter với Python 3 trên Ubuntu 20.04 và Kết nối qua Đường hầm SSH
2020-05-19
Cách cài đặt Phân phối Python Anaconda trên Ubuntu 20.04 [Khởi động nhanh]
2020-05-19
Cách cài đặt Phân phối Python Anaconda trên Ubuntu 20.04
2020-05-06
Cách cài đặt Python 3 và thiết lập môi trường lập trình trên server Ubuntu 20.04
2020-04-24
Cách cài đặt Python 3 và thiết lập môi trường lập trình trên server Ubuntu 18.04
2020-04-24
Cách cài đặt Python 3 và thiết lập môi trường lập trình trên Ubuntu 20.04 [Quickstart]
2020-04-24
Cách cài đặt Python 3 và thiết lập môi trường lập trình trên Ubuntu 18.04 [Quickstart]
2020-04-24
Cách cài đặt Python 3 và thiết lập môi trường lập trình trên CentOS 8
2020-04-10
Cách bắt đầu với Python trong Visual Studio Code
2020-04-09