スクレイピングした情報をSlackへ通知
やること
インストール
MySQL 5.7
以下の記事参照
CentOS6にMySQL5.7をyumでインストール - Qiita
文字コードの設定はクライアント側も必要なので以下の対応も行う。
vi /etc/my.cnf # 以下を各セクションへ追記 [mysqld] character-set-server=utf8 [client] default-character-set=utf8
MySQLdb(mysqlclient==1.3.12)
pip install mysqlclient
Slack(slackweb==1.0.5)
pip install slackweb
SlackのWebhook設定
以下の設定ページで新規または既存チャンネルへのインテグレーションの追加を行う。
設定完了後にWebhooへのURLが表示されるのでコピー。 (https://hooks.slack.com/services/*****こんな感じ)
サンプル
MySQLdb
# -*- coding:utf-8 -*- import MySQLdb def db_sample(): # 接続する con = MySQLdb.connect( user='YOUR_NAME', passwd='YOUR_PASSWORD', host='YOUR_HOST', db='YOUR_DB') # カーソルを取得する ## レスポンスをディクショナリで利用するための命令 cur= con.cursor(MySQLdb.cursors.DictCursor) # クエリを実行する sql = "SELECT * FROM test" cur.execute(sql) # 実行結果をすべて取得する rows = cur.fetchall() # 一行ずつ表示する for row in rows: print( "%s, %s, %s, %s" % (row['id'], row['name'], row['created_at'], row['updated_at']) ) cur.close con.close if __name__ == "__main__": db_sample()
slackweb
# -*- coding:utf-8 -*- import slackweb slack = slackweb.Slack(url="https://hooks.slack.com/services/*****") slack.notify(text="hell world")
下準備の完了
以上が必要なライブラリと設定の追加になる。 これを踏まえ、Scrapy側で操作する。
スクレイピングの実装
とあるWebページの情報を取得し、更新があればSlackへ通知対応方法になります。 迷惑を掛けるとまずいので、リンク等は架空のものとします。
プロジェクトの作成
scrapy startproject test_to_slack
spiders
実際にスクレイピングしたい処理をクラス内に記述します。 必要な処理としては以下の2点になります。
- 取得URLの設定
- データの抽出
スパイダーをテストしたいだけなら、他の実装は不要です。 このスパイダーは、該当URL先のHTMLから特定の文字列が含まれてるURLを抽出します。
# -*- coding: utf-8 -*- import scrapy from scrapy.linkextractors import LinkExtractor from scrapy.spiders import CrawlSpider, Rule import pytz, datetime, calendar class HogehogeSpider(CrawlSpider): name = 'hogehoge' # リンクを抽出するために考慮されるドメインを含む文字列の単一の値, またはリスト。 allowed_domains = ['www.hogehogehoge.jp'] start_urls = ['http://www.hogehogehoge.jp'] rules = ( # スパイダーがクロールを開始するURLリストの指定と、callback関数の指定。 ## 正規表現での記述が可能なため、該当の全URLを対象とする。今回は/hoge_hoge/のみ。 ## callbackで呼び出す関数を指定する。 Rule(LinkExtractor(allow='/hoge_hoge/$'), callback='parse_hoge'), ) def parse_hoge(self, response): i = {} # xpath/cssセレクタを使って該当の情報を抽出する。 ## 以下の場合は、'<html><body><div><ul><li><a>'のようなHTMLのhrefに"comic"が含まれている情報を抽出している。 i['count'] = len(response.xpath('//body/div/ul/li/a[contains(@href,"comic")]/@href')) tz = pytz.timezone("Asia/Tokyo") now = datetime.datetime.now(tz) i['updated_at'] = i['created_at'] = calendar.timegm(now.utctimetuple()) yield i
item
スクレイピングした情報を管理するためのクラス。 例えば、スパイダーでこのItemクラスに管理しているフィールドのみ使用したい場合以下のような実装になります。 フィールドで管理されていないフィールドを指定した場合はエラーを返します。
from Hogehoge.items import HogehogeItem ... i = HogehogeItem() i['count'] = len(response.xpath('//body/div/ul/li/a[contains(@href,"comic")]/@href') # 以下のフィールドはItemクラスにないのでエラー i['test'] = 9999
import scrapy class HogehogeItem(scrapy.Item): # define the fields for your item here like: # name = scrapy.Field() count = scrapy.Field() created_at = scrapy.Field() updated_at = scrapy.Field()
settings
各種設定を記述します。 今回は、使用するパイプラインと、MySQL、Slack関連の設定を行ないました。
ITEM_PIPELINES = { 'hogehoge.pipelines.HogehogePipeline': 1, } ... MYSQL_HOST = 'YOUR_HOST' MYSQL_DB = 'YOUR_DB' MYSQL_USER = 'YOUR_USER' MYSQL_PASSWORD = 'YOUR_PASSWORD' SLACK_WEBHOOK = 'https://hooks.slack.com/services/***'
pipline
MySQLへの登録やSlackへの通知処理を行ないます。
import MySQLdb import slackweb class HogehogePipeline(object): #def process_item(self, item, spider): # return item def __init__(self, mysql_host, mysql_db, mysql_user, mysql_passwd): self.mysql_host = mysql_host self.mysql_db = mysql_db self.mysql_user = mysql_user self.mysql_passwd = mysql_passwd @classmethod def from_crawler(cls, crawler): return cls( mysql_host = crawler.settings.get('MYSQL_HOST'), mysql_db = crawler.settings.get('MYSQL_DB'), mysql_user = crawler.settings.get('MYSQL_USER'), mysql_passwd = crawler.settings.get('MYSQL_PASSWORD') ) def open_spider(self, spider): self.conn = MySQLdb.connect( user = self.mysql_user, passwd = self.mysql_passwd, host = self.mysql_host, db = self.mysql_db, charset="utf8" ) self.cur = self.conn.cursor(MySQLdb.cursors.DictCursor) def close_spider(self, spider): self.cur.close self.conn.close def process_item(self, item, spider): select_sql = "SELECT * FROM `test`;" self.cur.execute(select_sql) webhook_url = spider.settings['SLACK_WEBHOOK'] slack = slackweb.Slack(url=webhook_url) # 登録済みなら登録件数と今回取得した件数の比較を行う if self.cur.rowcount: # 登録件数が異なれば更新 if self.cur.fetchone()['count'] != item['count']: update_sql = "UPDATE `test` SET count = %s, updated_at = %s;" self.cur.execute(update_sql, (item['count'], item['updated_at'])) self.conn.commit() to_comic_url = "http://www.hogehoge.jp/comic%s.png" % item['count'] slack.notify(text=to_comic_url) else: # 同じならupdated_atのみ更新 update_sql = "UPDATE `test` SET updated_at = %s WHERE count = %s;" self.cur.execute(update_sql, (item['updated_at'], item['count'])) self.conn.commit() else: # 未登録なら登録する insert_sql = "INSERT IGNORE INTO `test` (`count`, `created_at`, `updated_at`) VALUES (%s, %s, %s);" self.cur.execute(insert_sql, (item['count'], item['created_at'], item['updated_at'])) self.conn.commit() to_comic_url = "http://www.hogehoge.jp/comic01.png" slack.notify(text=to_comic_url)