pythonにて並列処理をするプログラムを作る機会がありました
いつもはthreadingというモジュールで並列処理をしておりプログラムもそのモジュールにて組みました
しかし、調べてみると最近はconcurrent.futuresというモジュールでやるのが良いということを知りました
なので勉強がてらやってみたのメモとして書いておきます
concurrent.futuresとは
Python3.2で追加された並列処理をするためのライブラリです
“ThreadPoolExecutor”とすればスレッドで並列処理を行います
“ProcessPoolExecutor”とすれば別々のプロセスで処理を行います
そのため、ひとつのモジュールをインポートすればスレッドとプロセスのどちらかで並列処理をしてくれます(個別でThreadingやmultiprocessingをインポートする必要がなくなった)
参考:https://docs.python.jp/3/library/concurrent.futures.html
プログラム
並列処理をするにあたり、以前に作成したWebスクレイピングするプログラムをベースに作成しました
tenki.jpとウェザーニュースの2つの情報をスレッドの並列処理でとってくるプログラムを作りました
参考:
また参考として逐次で天気情報を取得するプログラムも追加し、速度検証をしてみました
#!/usr/bin/python # -*- Coding: utf-8 -*- from concurrent import futures import requests import time from bs4 import BeautifulSoup #緯度経度を取得 def Location(address): Url = "https://www.geocoding.jp/api/" Params = {"q":address} Req = requests.get(Url, params=Params) Soup = BeautifulSoup(Req.text, 'lxml') return Soup.find("lat").text, Soup.find("lng").text #WeatherNewsの取得 def WeatherNews(address): Url = "https://weathernews.jp/onebox/" #緯度経度を取得 (Lat, Lng) = Location(address) SrhUrl = Url + Lat + '/' + Lng + "/lang=ja" Req = requests.get(SrhUrl) Soup = BeautifulSoup(Req.text, 'lxml') #Dict myDict = {} #天気 myDict["天気"] = Soup.find(class_="sub").text.split(", ")[1] #気温 myDict["気温"] = Soup.find(class_="obs_temp_main").text #風速とかの情報 for val in Soup.find(class_="table-obs_sub").find_all("tr"): myDict[val.find(class_="obs_sub_title").text] = val.find(class_="obs_sub_value").text.replace(" : ","") return myDict #Tenkiの取得 def Tenki(address): Url = "https://tenki.jp" Req = requests.get(Url + "/search/?keyword=" + address) Soup = BeautifulSoup(Req.text, 'lxml') Sed = Soup.find_all(class_="search-entry-data") HrfUrl = None for val in Sed: if val.find(class_="address").text.find("以下に掲載がない場合"): HrfUrl = val.a.get("href") break myDict = {} #住所からhrefを取得 if not(HrfUrl is None): Req = requests.get(Url + HrfUrl) Soup = BeautifulSoup(Req.text, 'lxml') TodaySoup = Soup.find(class_="today-weather") #気温(最高) myDict["気温(最高)"] = TodaySoup.find(class_="weather-wrap").find(class_="high-temp temp").find(class_="value").text + TodaySoup.find(class_="weather-wrap").find(class_="high-temp tempdiff").text #気温(最低) myDict["気温(最低)"] = TodaySoup.find(class_="weather-wrap").find(class_="low-temp temp").find(class_="value").text + TodaySoup.find(class_="weather-wrap").find(class_="low-temp tempdiff").text #天気 myDict["天気"] = TodaySoup.find(class_="weather-wrap").find(class_="weather-telop").text #風 myDict["風"] = TodaySoup.find(class_="wind-wave").find("td").text return myDict def main(address): print("Start No Thread Process") start = time.time() print(WeatherNews(address)) print(Tenki(address)) end = time.time() print('time = {}'.format(end - start)) print("======") time.sleep(10) #apiのcallのためのwait print("Start Thread Process") start = time.time() myThreads = list() with futures.ThreadPoolExecutor() as executor: #Thread化 myThreads.append(executor.submit(WeatherNews, address)) myThreads.append(executor.submit(Tenki, address)) #結果を出力 for myThread in myThreads: print(myThread.result()) end = time.time() print('time = {}'.format(end - start)) if __name__ == '__main__': main("石川県加賀市")
実行結果
約0.6秒くらい早くなりました
2つのサイトをスクレイピングしているだけなのでThreadしてもしなくても結果はかわりませんが、複数だったら大きく変わると思います
Start No Thread Process {'天気': 'くもり', '気温': '24.1', '湿度': '70 %', '気圧': '1022 hPa', '風': '北北東 1 m/s', '日の出': '05:53', '日の入': '17:34'} {'気温(最高)': '24[0]', '気温(最低)': '18[+3]', '天気': '曇', '風': '北の風後北東の風'} time = 3.6397287845611572 ====== Start Thread Process {'天気': 'くもり', '気温': '24.1', '湿度': '70 %', '気圧': '1022 hPa', '風': '北北東 1 m/s', '日の出': '05:53', '日の入': '17:34'} {'気温(最高)': '24[0]', '気温(最低)': '18[+3]', '天気': '曇', '風': '北の風後北東の風'} time = 2.95491361618042
プログラムの解説
スクレイピング方法は前の記事にて書いたので省きます
重要なところは下記のところです
myThreads = list() with futures.ThreadPoolExecutor() as executor: #Thread化 myThreads.append(executor.submit(WeatherNews, address)) myThreads.append(executor.submit(Tenki, address)) #結果を出力 for myThread in myThreads: print(myThread.result())
myThreadというリスト(配列)をつくり、その中にスレッド(executor.submit)を入れています
定義の仕方はexecutor.submit(スレッド化させる関数,引数)となっています
最後にfor文にて結果(myThread.result())を出力しています
またwith futures.ThreadPoolExecutor() as executor:のところを”with futures.ProcessPoolExecutor()”とするとプロセスで動きます
おまけプログラム
上記のプログラムは、”executor.submit”で逐次スレッド化するものを追加していました
しかしexecutor.mapを使うと一括でスレッド化するものを追加できます
おまけとして、myWaitTimeというリストのなかに1~10の値をいれ、.mapで一括追加して動かしてみました
#!/usr/bin/python # -*- Coding: utf-8 -*- import time from concurrent import futures def myWait(argv): time.sleep(argv) print("finish:", str(argv) + "sec") def main(): myWaitTime = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] start = time.time() with futures.ThreadPoolExecutor() as executor: executor.map(myWait, myWaitTime) end = time.time() print('time = {}'.format(end - start)) if __name__ == '__main__': main()
結果
finish: 1sec
finish: 2sec
finish: 3sec
finish: 4sec
finish: 5sec
finish: 6sec
finish: 7sec
finish: 8sec
finish: 9sec
finish: 10sec
time = 10.004080295562744
引数をリストで渡してしまえばおkなのところは便利です
コメント