pythonオリンピック

オリンピックが楽しみです

Amazon 全カテゴリの人気ランキングをpythonで取得

はじめまして

処理内容

  • カテゴリIDの取得(親と子)し、データベースに保存
  • ランキングデータを取得しデータベースに保存(全カテゴリ)
  • CSVで出力

アマゾン人気ランキングの作成に掛かった時間

  • 5時間

作業内容

使用したモジュール

  • BeautifulSoup
  • requests

オリジナルモジュール

  • myDB

注意

  • 下記のコードをそのまま使用しても動作しません。※オリジナルモジュール存在していないため
  • 2016/11/12の情報なので、アマゾン側のHTML構成等が変更された場合は修正が必要となります。
#!/usr/bin/python
#coding:utf-8

import requests
import os, sys, datetime, glob, re, time
from BeautifulSoup import BeautifulSoup
from mymodule import myDB

GET_RANKING_DATE = "2016/11/12"

class Ctrl:

    def __init__(self):
        self.csv_output_flg         = True
        self.ranking_types_flg      = False
        self.parent_genreid_get_flg = False
        self.child_genreid_get_flg  = False
        self.prd_ranking_get_flg    = True

    def process(self):
        if self.csv_output_flg:
            Exe().ranking_csv_output(GET_RANKING_DATE)
        if self.ranking_types_flg:
            Exe().ranking_type_insert()
        if self.parent_genreid_get_flg:
            Exe().genreid_parse_insert()
        if self.child_genreid_get_flg:
            Exe().child_genreid_parse_insert()

        #ranking_get
        if self.prd_ranking_get_flg:
            Exe().prd_ranking_get("bestsellers")

class Exe:

    def __init__(self):
        pass

    def ranking_csv_output(self, ranking_date):
        tl = "td_ranking_prd"
        sql = "select * from %s where ranking_get_date = '%s' order by create_date asc" % (tl, ranking_date)
        rows = DB().conn(tl).Select_Many(sql)
        import csv
        filename = "amazon_ranking_" + ranking_date.replace("/", "_") + ".csv"
        with open(filename, "w") as f:
            writer = csv.writer(f, lineterminator = "\n")
            writer.writerows([list(r) for r in rows])
            print "Csv Output"
        sys.exit()

    def prd_ranking_get(self, raking_eu_name):
        
        tl = "tm_ranking_type"
        sql = "select id from %s where eu_name = '%s' limit 1" % (tl, raking_eu_name)
        ranking_type_id = DB().conn(tl).Select(sql)["id"]
        
        def access_and_parse(l_parent, l_child=False):
            l_child_db_id = False
            if l_child:
                l_child_db_id = l_child["genreid"]

            url = Url().ranking_page_url(1, raking_eu_name, l_parent["eu_name"], l_child_db_id)
            html = Req().open(url)
            ll_prd_data = Parse(html).ranking_prd_get(ranking_type_id, l_parent["id"], l_child_db_id)
            return ll_prd_data

        def db_insert(ll_prd_data, l_parent_db_id, l_child_db_id=False):
            DB().ranking_prd_data_insert(ll_prd_data)
            Log().insert(l_parent_db_id, l_child_db_id)

        for l_parent in DB().genreid_selects_ll():
            #parent ranking
            if not DB().log_check(l_parent["id"]):
                ll = access_and_parse(l_parent)
                if ll: db_insert(ll, l_parent["id"])

            #parent in child ranking
            for l_child in DB().parent_rel_child_genreid_select_l(l_parent["eu_name"]):
                if not DB().log_check(l_parent["id"], l_child["id"]): 
                    ll = access_and_parse(l_parent, l_child)
                    if ll: db_insert(ll, l_parent["id"], l_child["id"])
                    time.sleep(2)

            sys.exit()

    def ranking_type_insert(self):
        l = [["売れ筋", "bestsellers"],
             ["新着", "new-releases"],
             ["人気度", "movers-and-shakers"],
             ["ほしい物", "most-wished-for"],
             ["人気ギフト", "most-gifted"] ]
        tl = "tm_ranking_type"
        DB().conn(tl).Insert_All(tl, [[None, i[0], i[1], ""] for i in l])

    def child_genreid_parse_insert(self):
        if not glob.glob(Path().parent_genreid_db()):
            print "Create Parent Genreid Database"
            sys.exit()

        ll_parent_genreid = DB().genreid_selects_ll()
        for i, row in enumerate(ll_parent_genreid):
            print "%s/%s now parse : %s" % (i, len(ll_parent_genreid), row["eu_name"])
            html = Req().open(\
                Url().s_parent_ranking_url(row["eu_name"]) )
            ll_child_genreid = Parse(html).ll_child_genreid()
            if ll_child_genreid:
                DB().child_genreid_insert(ll_child_genreid, row)
            DB().parent_genreid_update(row["id"])

    def genreid_parse_insert(self):
        html = Req().open( Url().amazon_popular_ranking() )
        l_parent_genreid = Parse(html).parent_genreid()
        if l_parent_genreid:
            DB().parent_genreid_insert(l_parent_genreid)

class Log:

    def insert(self, parent_db_id, child_db_id=False):
        if not child_db_id:
            child_db_id = ""
        l_log = [None, parent_db_id, child_db_id, GET_RANKING_DATE, str(datetime.datetime.now())]
        DB().conn("td_log").Insert("td_log", l_log)

class Path:

    def __init__(self):
        pass

    def parent_genreid_db(self):
        return "db/master/tm_genreid_parent.db"

class Parse:

    def __init__(self, html):
        self.soup = BeautifulSoup(html)

    def ranking_prd_get(self, ranking_type_id, parent_db_id, child_db_id=""):

        def rank(item):
            return item.find("span", attrs={"class": "zg_rankNumber"}).string

        def code(item):
            return item.find("div", attrs={"class": "zg_title"}).find("a")["href"].split("/")[-1].rstrip()
        
        def title(item):
            return item.find("div", attrs={"class": "zg_title"}).find("a").string

        def img_url(item):
            return item.find("div", attrs={"class": "zg_itemImage_normal"}).find("img")["src"].split("/")[-1]

        def price(item):
            p_soup = item.find("span", attrs={"class": "price"})
            if p_soup:
                return str(p_soup.find("b").string).replace("\xef\xbf\xa5 ", "")
            return ""

        def search_word(item):
            soup = item.find("div", attrs={"class": "zg_byline"})
            if soup:
                return soup.string.rstrip().strip()
            return ""

        def rating_point(item):
            soup = item.find("span", attrs={"class": "a-icon-alt"})
            if soup:
                r = re.compile("5つ星のうち\s(.*)")
                return r.search(str(soup.string)).group(1)
            return ""

        def rating_cnt(item):
            soup = item.find("span", attrs={"class": "a-size-small"})
            if soup:
                return soup.find("a").string
            return 0

        d = {}
        ll_item = self.soup.findAll("div", attrs={"class": "zg_item_normal"})
        now = datetime.datetime.now()
        ll = []
        for l_item in ll_item:
            search_word_str = search_word(l_item) 
            search_word_id = DB().search_word_id(search_word_str)
            prd_code = code(l_item)
            prd_id = DB().prd_select(prd_code)
            price_str = price(l_item)
            raintg_point_str = rating_point(l_item)
            rating_cnt_str = rating_cnt(l_item)
            if not prd_id:
                l = [None, prd_code, title(l_item), img_url(l_item), raintg_point_str,
                     rating_cnt_str, price_str, search_word_id, search_word_str, 0, now]
                DB().conn("tm_prd").Insert("tm_prd", l)
                prd_id = DB().prd_select(prd_code)
            
            s_rank = rank(l_item)
            u_key = "_".join([GET_RANKING_DATE, str(prd_code), str(ranking_type_id)])
            print u_key
            l = [None, u_key, prd_id, prd_code, ranking_type_id, parent_db_id,
                 child_db_id, s_rank, price_str, 
                 rating_point(l_item),
                 rating_point(l_item),
                 0,
                 GET_RANKING_DATE,
                 now,
                 ]
            ll += [l]
        return ll

    def ll_child_genreid(self):
        l = []
        c_list = self.soup.findAll("ul", attrs={"id": "zg_browseRoot"})[0].findAll("li")
        for i, li in enumerate(c_list):
            if i == 0: #all category link
                continue
            a_soup = li.find("a")
            if a_soup:
                l += [ [a_soup["href"].split("/")[-1], str(a_soup.string)] ]
        return l

    def parent_genreid(self):
        l = []
        c_list = self.soup.findAll("ul", attrs={"id": "zg_browseRoot"})[0].findAll("li")
        for li in c_list:
            a_soup = li.find("a")
            if a_soup:
                l += [ [a_soup["href"].split("/")[-1], str(a_soup.string)] ]
        return l

class Req:

    def __init__(self):
        pass

    def open(self, url):
        print url
        return requests.get(url).text

class Url:

    def __init__(self):
        pass

    def ranking_page_url(self, p_num, raking_eu_name, parent_eu_name, child_genreid=False):
        url = "https://www.amazon.co.jp/gp/%s/%s/" % (raking_eu_name, parent_eu_name)
        if not child_genreid:
            return url
        return url + "%s/#%s" % (child_genreid, p_num)

    def s_parent_ranking_url(self, eu_name):
        return "https://www.amazon.co.jp/gp/bestsellers/%s/ref=zg_bs_nav_0" % eu_name

    def amazon_popular_ranking(self):
        return "https://www.amazon.co.jp/gp/most-gifted/"

class DB:

    def __init__(self):
        pass

    def ranking_prd_data_insert(self, ll_prd_data):
        tl = "td_ranking_prd"
        self.conn(tl).Insert_All(tl, ll_prd_data)

    def log_check(self, parent_db_id, child_db_id=False):
        tl = "td_log"
        sql = "select id from %s where " % tl
        sql += "parent_genreid_id = %s and " % parent_db_id
        
        if child_db_id:
            sql += "child_genreid_id = %s and " % child_db_id
        else:
            #parent ranking
            sql += "child_genreid_id = '' and "

        sql += "ranking_get_date = '%s' limit 1" % GET_RANKING_DATE
        print sql
        row = self.conn(tl).Select(sql)
        if row:
            return True
        return False

    def prd_select(self, prd_code):
        tl = "tm_prd"
        sql = "select id from %s where prd_code = '%s' limit 1" % (tl, prd_code)
        row = self.conn(tl).Select(sql)
        if row:
            return row["id"]
        return False
    
    def search_word_id(self, word):
        tl = "tm_search_word"
        sql = "select id from %s where word = '%s'" % (tl, word)
        row = self.conn(tl).Select(sql)
        if row:
            return row["id"]
        self.conn(tl).Insert(tl, [None, word, 0, ""])
        return self.conn(tl).Select(sql)["id"]

    def parent_rel_child_genreid_select_l(self, eu_name):
        tl = "tm_genreid_child"
        sql = "select id, jp_name, genreid from %s where current_genreid = '%s'" % (tl, eu_name)
        return self.conn(tl).Select_Many(sql)

    def parent_genreid_update(self, parent_db_id):
        tl = "tm_genreid_parent"
        now = datetime.datetime.now()
        sql = "update %s set update_date = '%s' where id = %s" % (tl, now, parent_db_id)
        return self.conn(tl).Update(sql)

    def child_genreid_insert(self, ll_child_genreid, p_row):
        tl = "tm_genreid_child"
        now = datetime.datetime.now()
        ll = [[None, i[1], i[0], p_row["id"],p_row["eu_name"], now, ""] for i in ll_child_genreid]
        self.conn(tl).Insert_All(tl, ll)

    def genreid_selects_ll(self):
        tl = "tm_genreid_parent"
        return self.conn(tl).Select_Many("select * from %s" % tl)

    def parent_genreid_insert(self, l_parent_genreid):
        tl = "tm_genreid_parent"
        now = datetime.datetime.now()
        ll = [[None, i[0], i[1], 0, now, ""] for i in l_parent_genreid]
        self.conn(tl).Insert_All(tl, ll)

    def conn(self, tbl_name):
        db_dic = {"td": "data", "tm": "master"}
        db = myDB.Database()
        db.db_path = "db/%s/%s.db" % (db_dic[tbl_name[:2]], tbl_name)
        db.Connection()
        return db
        
def main():
    ct = Ctrl()
    ct.process()

if(__name__=="__main__"):
    main()

テーブル

  • td_log.txt
  • td_ranking_prd.txt
  • tm_genreid_child.txt
  • tm_genreid_parent.txt
  • tm_prd.txt
  • tm_ranking_type.txt
  • tm_search_word.txt