注意
- 下記のコードをそのまま使用しても動作しません。※オリジナルモジュール存在していないため
- 2016/11/12の情報なので、アマゾン側のHTML構成等が変更された場合は修正が必要となります。
import requests
import os, sys, datetime, glob, re, time
from BeautifulSoup import BeautifulSoup
from mymodule import myDB
GET_RANKING_DATE = "2016/11/12"
class Ctrl:
def __init__(self):
self.csv_output_flg = True
self.ranking_types_flg = False
self.parent_genreid_get_flg = False
self.child_genreid_get_flg = False
self.prd_ranking_get_flg = True
def process(self):
if self.csv_output_flg:
Exe().ranking_csv_output(GET_RANKING_DATE)
if self.ranking_types_flg:
Exe().ranking_type_insert()
if self.parent_genreid_get_flg:
Exe().genreid_parse_insert()
if self.child_genreid_get_flg:
Exe().child_genreid_parse_insert()
if self.prd_ranking_get_flg:
Exe().prd_ranking_get("bestsellers")
class Exe:
def __init__(self):
pass
def ranking_csv_output(self, ranking_date):
tl = "td_ranking_prd"
sql = "select * from %s where ranking_get_date = '%s' order by create_date asc" % (tl, ranking_date)
rows = DB().conn(tl).Select_Many(sql)
import csv
filename = "amazon_ranking_" + ranking_date.replace("/", "_") + ".csv"
with open(filename, "w") as f:
writer = csv.writer(f, lineterminator = "\n")
writer.writerows([list(r) for r in rows])
print "Csv Output"
sys.exit()
def prd_ranking_get(self, raking_eu_name):
tl = "tm_ranking_type"
sql = "select id from %s where eu_name = '%s' limit 1" % (tl, raking_eu_name)
ranking_type_id = DB().conn(tl).Select(sql)["id"]
def access_and_parse(l_parent, l_child=False):
l_child_db_id = False
if l_child:
l_child_db_id = l_child["genreid"]
url = Url().ranking_page_url(1, raking_eu_name, l_parent["eu_name"], l_child_db_id)
html = Req().open(url)
ll_prd_data = Parse(html).ranking_prd_get(ranking_type_id, l_parent["id"], l_child_db_id)
return ll_prd_data
def db_insert(ll_prd_data, l_parent_db_id, l_child_db_id=False):
DB().ranking_prd_data_insert(ll_prd_data)
Log().insert(l_parent_db_id, l_child_db_id)
for l_parent in DB().genreid_selects_ll():
if not DB().log_check(l_parent["id"]):
ll = access_and_parse(l_parent)
if ll: db_insert(ll, l_parent["id"])
for l_child in DB().parent_rel_child_genreid_select_l(l_parent["eu_name"]):
if not DB().log_check(l_parent["id"], l_child["id"]):
ll = access_and_parse(l_parent, l_child)
if ll: db_insert(ll, l_parent["id"], l_child["id"])
time.sleep(2)
sys.exit()
def ranking_type_insert(self):
l = [["売れ筋", "bestsellers"],
["新着", "new-releases"],
["人気度", "movers-and-shakers"],
["ほしい物", "most-wished-for"],
["人気ギフト", "most-gifted"] ]
tl = "tm_ranking_type"
DB().conn(tl).Insert_All(tl, [[None, i[0], i[1], ""] for i in l])
def child_genreid_parse_insert(self):
if not glob.glob(Path().parent_genreid_db()):
print "Create Parent Genreid Database"
sys.exit()
ll_parent_genreid = DB().genreid_selects_ll()
for i, row in enumerate(ll_parent_genreid):
print "%s/%s now parse : %s" % (i, len(ll_parent_genreid), row["eu_name"])
html = Req().open(\
Url().s_parent_ranking_url(row["eu_name"]) )
ll_child_genreid = Parse(html).ll_child_genreid()
if ll_child_genreid:
DB().child_genreid_insert(ll_child_genreid, row)
DB().parent_genreid_update(row["id"])
def genreid_parse_insert(self):
html = Req().open( Url().amazon_popular_ranking() )
l_parent_genreid = Parse(html).parent_genreid()
if l_parent_genreid:
DB().parent_genreid_insert(l_parent_genreid)
class Log:
def insert(self, parent_db_id, child_db_id=False):
if not child_db_id:
child_db_id = ""
l_log = [None, parent_db_id, child_db_id, GET_RANKING_DATE, str(datetime.datetime.now())]
DB().conn("td_log").Insert("td_log", l_log)
class Path:
def __init__(self):
pass
def parent_genreid_db(self):
return "db/master/tm_genreid_parent.db"
class Parse:
def __init__(self, html):
self.soup = BeautifulSoup(html)
def ranking_prd_get(self, ranking_type_id, parent_db_id, child_db_id=""):
def rank(item):
return item.find("span", attrs={"class": "zg_rankNumber"}).string
def code(item):
return item.find("div", attrs={"class": "zg_title"}).find("a")["href"].split("/")[-1].rstrip()
def title(item):
return item.find("div", attrs={"class": "zg_title"}).find("a").string
def img_url(item):
return item.find("div", attrs={"class": "zg_itemImage_normal"}).find("img")["src"].split("/")[-1]
def price(item):
p_soup = item.find("span", attrs={"class": "price"})
if p_soup:
return str(p_soup.find("b").string).replace("\xef\xbf\xa5 ", "")
return ""
def search_word(item):
soup = item.find("div", attrs={"class": "zg_byline"})
if soup:
return soup.string.rstrip().strip()
return ""
def rating_point(item):
soup = item.find("span", attrs={"class": "a-icon-alt"})
if soup:
r = re.compile("5つ星のうち\s(.*)")
return r.search(str(soup.string)).group(1)
return ""
def rating_cnt(item):
soup = item.find("span", attrs={"class": "a-size-small"})
if soup:
return soup.find("a").string
return 0
d = {}
ll_item = self.soup.findAll("div", attrs={"class": "zg_item_normal"})
now = datetime.datetime.now()
ll = []
for l_item in ll_item:
search_word_str = search_word(l_item)
search_word_id = DB().search_word_id(search_word_str)
prd_code = code(l_item)
prd_id = DB().prd_select(prd_code)
price_str = price(l_item)
raintg_point_str = rating_point(l_item)
rating_cnt_str = rating_cnt(l_item)
if not prd_id:
l = [None, prd_code, title(l_item), img_url(l_item), raintg_point_str,
rating_cnt_str, price_str, search_word_id, search_word_str, 0, now]
DB().conn("tm_prd").Insert("tm_prd", l)
prd_id = DB().prd_select(prd_code)
s_rank = rank(l_item)
u_key = "_".join([GET_RANKING_DATE, str(prd_code), str(ranking_type_id)])
print u_key
l = [None, u_key, prd_id, prd_code, ranking_type_id, parent_db_id,
child_db_id, s_rank, price_str,
rating_point(l_item),
rating_point(l_item),
0,
GET_RANKING_DATE,
now,
]
ll += [l]
return ll
def ll_child_genreid(self):
l = []
c_list = self.soup.findAll("ul", attrs={"id": "zg_browseRoot"})[0].findAll("li")
for i, li in enumerate(c_list):
if i == 0:
continue
a_soup = li.find("a")
if a_soup:
l += [ [a_soup["href"].split("/")[-1], str(a_soup.string)] ]
return l
def parent_genreid(self):
l = []
c_list = self.soup.findAll("ul", attrs={"id": "zg_browseRoot"})[0].findAll("li")
for li in c_list:
a_soup = li.find("a")
if a_soup:
l += [ [a_soup["href"].split("/")[-1], str(a_soup.string)] ]
return l
class Req:
def __init__(self):
pass
def open(self, url):
print url
return requests.get(url).text
class Url:
def __init__(self):
pass
def ranking_page_url(self, p_num, raking_eu_name, parent_eu_name, child_genreid=False):
url = "https://www.amazon.co.jp/gp/%s/%s/" % (raking_eu_name, parent_eu_name)
if not child_genreid:
return url
return url + "%s/#%s" % (child_genreid, p_num)
def s_parent_ranking_url(self, eu_name):
return "https://www.amazon.co.jp/gp/bestsellers/%s/ref=zg_bs_nav_0" % eu_name
def amazon_popular_ranking(self):
return "https://www.amazon.co.jp/gp/most-gifted/"
class DB:
def __init__(self):
pass
def ranking_prd_data_insert(self, ll_prd_data):
tl = "td_ranking_prd"
self.conn(tl).Insert_All(tl, ll_prd_data)
def log_check(self, parent_db_id, child_db_id=False):
tl = "td_log"
sql = "select id from %s where " % tl
sql += "parent_genreid_id = %s and " % parent_db_id
if child_db_id:
sql += "child_genreid_id = %s and " % child_db_id
else:
sql += "child_genreid_id = '' and "
sql += "ranking_get_date = '%s' limit 1" % GET_RANKING_DATE
print sql
row = self.conn(tl).Select(sql)
if row:
return True
return False
def prd_select(self, prd_code):
tl = "tm_prd"
sql = "select id from %s where prd_code = '%s' limit 1" % (tl, prd_code)
row = self.conn(tl).Select(sql)
if row:
return row["id"]
return False
def search_word_id(self, word):
tl = "tm_search_word"
sql = "select id from %s where word = '%s'" % (tl, word)
row = self.conn(tl).Select(sql)
if row:
return row["id"]
self.conn(tl).Insert(tl, [None, word, 0, ""])
return self.conn(tl).Select(sql)["id"]
def parent_rel_child_genreid_select_l(self, eu_name):
tl = "tm_genreid_child"
sql = "select id, jp_name, genreid from %s where current_genreid = '%s'" % (tl, eu_name)
return self.conn(tl).Select_Many(sql)
def parent_genreid_update(self, parent_db_id):
tl = "tm_genreid_parent"
now = datetime.datetime.now()
sql = "update %s set update_date = '%s' where id = %s" % (tl, now, parent_db_id)
return self.conn(tl).Update(sql)
def child_genreid_insert(self, ll_child_genreid, p_row):
tl = "tm_genreid_child"
now = datetime.datetime.now()
ll = [[None, i[1], i[0], p_row["id"],p_row["eu_name"], now, ""] for i in ll_child_genreid]
self.conn(tl).Insert_All(tl, ll)
def genreid_selects_ll(self):
tl = "tm_genreid_parent"
return self.conn(tl).Select_Many("select * from %s" % tl)
def parent_genreid_insert(self, l_parent_genreid):
tl = "tm_genreid_parent"
now = datetime.datetime.now()
ll = [[None, i[0], i[1], 0, now, ""] for i in l_parent_genreid]
self.conn(tl).Insert_All(tl, ll)
def conn(self, tbl_name):
db_dic = {"td": "data", "tm": "master"}
db = myDB.Database()
db.db_path = "db/%s/%s.db" % (db_dic[tbl_name[:2]], tbl_name)
db.Connection()
return db
def main():
ct = Ctrl()
ct.process()
if(__name__=="__main__"):
main()