Playground

concurrent.futuresを使った並列処理の勉強メモ

Playground
この記事は約10分で読めます。
スポンサーリンク

pythonにて並列処理をするプログラムを作る機会がありました

いつもはthreadingというモジュールで並列処理をしておりプログラムもそのモジュールにて組みました

しかし、調べてみると最近はconcurrent.futuresというモジュールでやるのが良いということを知りました

なので勉強がてらやってみたのメモとして書いておきます

concurrent.futuresとは

Python3.2で追加された並列処理をするためのライブラリです

“ThreadPoolExecutor”とすればスレッドで並列処理を行います

“ProcessPoolExecutor”とすれば別々のプロセスで処理を行います

そのため、ひとつのモジュールをインポートすればスレッドとプロセスのどちらかで並列処理をしてくれます(個別でThreadingやmultiprocessingをインポートする必要がなくなった)

参考:https://docs.python.jp/3/library/concurrent.futures.html

プログラム

並列処理をするにあたり、以前に作成したWebスクレイピングするプログラムをベースに作成しました

tenki.jpとウェザーニュースの2つの情報をスレッドの並列処理でとってくるプログラムを作りました

参考:

tenki.jpをWebスクレイピングしてみた
Webスクレイピングをやりたい思いました とくに深い意味はないのですが、やってみたいなーという気になりました 身近でお手軽なものということで天気情報を発信しているサイトをスクレイピングしてみました 手始めにtenki.jp...
ウェザーニュースをWebスクレイピングしてみた
Webスクレイピングしてみたい第二弾です Tenki.jpのスクレイピングに引き続き、次はウェザーニュースをスクレイピングしてみました

 

また参考として逐次で天気情報を取得するプログラムも追加し、速度検証をしてみました

#!/usr/bin/python
# -*- Coding: utf-8 -*-
from concurrent import futures
import requests
import time
from bs4 import BeautifulSoup


#緯度経度を取得
def Location(address):
    Url = "https://www.geocoding.jp/api/"
    Params = {"q":address}
    Req = requests.get(Url, params=Params)
    Soup = BeautifulSoup(Req.text, 'lxml')


    return Soup.find("lat").text, Soup.find("lng").text


#WeatherNewsの取得
def WeatherNews(address):
    Url = "https://weathernews.jp/onebox/"

    #緯度経度を取得
    (Lat, Lng) = Location(address)

    SrhUrl = Url + Lat + '/' + Lng + "/lang=ja"

    Req = requests.get(SrhUrl)
    Soup = BeautifulSoup(Req.text, 'lxml')

    #Dict
    myDict = {}

    #天気
    myDict["天気"] = Soup.find(class_="sub").text.split(", ")[1]

    #気温
    myDict["気温"] = Soup.find(class_="obs_temp_main").text

    #風速とかの情報
    for val in Soup.find(class_="table-obs_sub").find_all("tr"):
        myDict[val.find(class_="obs_sub_title").text] = val.find(class_="obs_sub_value").text.replace(" : ","")

    return myDict

#Tenkiの取得
def Tenki(address):
    Url = "https://tenki.jp"

    Req = requests.get(Url + "/search/?keyword=" + address)
    Soup = BeautifulSoup(Req.text, 'lxml')


    Sed = Soup.find_all(class_="search-entry-data")

    HrfUrl = None
    for val in Sed:
        if val.find(class_="address").text.find("以下に掲載がない場合"):
            HrfUrl = val.a.get("href")
            break


    myDict = {}
    #住所からhrefを取得
    if not(HrfUrl is None):
        Req = requests.get(Url + HrfUrl)
        Soup = BeautifulSoup(Req.text, 'lxml')

        TodaySoup = Soup.find(class_="today-weather")

        #気温(最高)
        myDict["気温(最高)"] = TodaySoup.find(class_="weather-wrap").find(class_="high-temp temp").find(class_="value").text + TodaySoup.find(class_="weather-wrap").find(class_="high-temp tempdiff").text

        #気温(最低)
        myDict["気温(最低)"] = TodaySoup.find(class_="weather-wrap").find(class_="low-temp temp").find(class_="value").text + TodaySoup.find(class_="weather-wrap").find(class_="low-temp tempdiff").text

        #天気
        myDict["天気"] = TodaySoup.find(class_="weather-wrap").find(class_="weather-telop").text

        #風
        myDict["風"] = TodaySoup.find(class_="wind-wave").find("td").text

    return myDict




def main(address):
    print("Start No Thread Process")
    start = time.time()
    print(WeatherNews(address))
    print(Tenki(address))
    end = time.time()
    print('time = {}'.format(end - start))
    print("======")

    time.sleep(10) #apiのcallのためのwait

    print("Start Thread Process")
    start = time.time()
    myThreads = list()
    with futures.ThreadPoolExecutor() as executor:

        #Thread化
        myThreads.append(executor.submit(WeatherNews, address))
        myThreads.append(executor.submit(Tenki, address))

        #結果を出力
        for myThread in myThreads:
            print(myThread.result())


    end = time.time()
    print('time = {}'.format(end - start))



if __name__ == '__main__':
    main("石川県加賀市")

実行結果

約0.6秒くらい早くなりました

2つのサイトをスクレイピングしているだけなのでThreadしてもしなくても結果はかわりませんが、複数だったら大きく変わると思います

Start No Thread Process
{'天気': 'くもり', '気温': '24.1', '湿度': '70 %', '気圧': '1022 hPa', '風': '北北東 1 m/s', '日の出': '05:53', '日の入': '17:34'}
{'気温(最高)': '24[0]', '気温(最低)': '18[+3]', '天気': '曇', '風': '北の風後北東の風'}
time = 3.6397287845611572
======
Start Thread Process
{'天気': 'くもり', '気温': '24.1', '湿度': '70 %', '気圧': '1022 hPa', '風': '北北東 1 m/s', '日の出': '05:53', '日の入': '17:34'}
{'気温(最高)': '24[0]', '気温(最低)': '18[+3]', '天気': '曇', '風': '北の風後北東の風'}
time = 2.95491361618042

プログラムの解説

スクレイピング方法は前の記事にて書いたので省きます

重要なところは下記のところです

    myThreads = list()
    with futures.ThreadPoolExecutor() as executor:

        #Thread化
        myThreads.append(executor.submit(WeatherNews, address))
        myThreads.append(executor.submit(Tenki, address))

        #結果を出力
        for myThread in myThreads:
            print(myThread.result())

myThreadというリスト(配列)をつくり、その中にスレッド(executor.submit)を入れています

定義の仕方はexecutor.submit(スレッド化させる関数,引数)となっています

最後にfor文にて結果(myThread.result())を出力しています

 

またwith futures.ThreadPoolExecutor() as executor:のところを”with futures.ProcessPoolExecutor()”とするとプロセスで動きます

おまけプログラム

上記のプログラムは、”executor.submit”で逐次スレッド化するものを追加していました

しかしexecutor.mapを使うと一括でスレッド化するものを追加できます

 

おまけとして、myWaitTimeというリストのなかに1~10の値をいれ、.mapで一括追加して動かしてみました

#!/usr/bin/python
# -*- Coding: utf-8 -*-
import time
from concurrent import futures


def myWait(argv):
    time.sleep(argv)
    print("finish:", str(argv) + "sec")


def main():
    myWaitTime = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

    start = time.time()
    with futures.ThreadPoolExecutor() as executor:
        executor.map(myWait, myWaitTime)

    end = time.time()
    print('time = {}'.format(end - start))



if __name__ == '__main__':
    main()

 

結果

finish: 1sec
finish: 2sec
finish: 3sec
finish: 4sec
finish: 5sec
finish: 6sec
finish: 7sec
finish: 8sec
finish: 9sec
finish: 10sec
time = 10.004080295562744

 

引数をリストで渡してしまえばおkなのところは便利です

コメント

タイトルとURLをコピーしました