スクレイピングした情報をSlackへ通知

やること

  1. MySQL導入
  2. MySQLクライアント導入
  3. Slackクライアント導入

インストール

MySQL 5.7

以下の記事参照

CentOS6にMySQL5.7をyumでインストール - Qiita

文字コードの設定はクライアント側も必要なので以下の対応も行う。

vi /etc/my.cnf

# 以下を各セクションへ追記
[mysqld]
character-set-server=utf8

[client]
default-character-set=utf8

MySQLdb(mysqlclient==1.3.12)

pip install mysqlclient

Slack(slackweb==1.0.5)

pip install slackweb

SlackのWebhook設定

以下の設定ページで新規または既存チャンネルへのインテグレーションの追加を行う。

Sign in | Slack

設定完了後にWebhooへのURLが表示されるのでコピー。 (https://hooks.slack.com/services/*****こんな感じ)

サンプル

MySQLdb

# -*- coding:utf-8 -*-

import MySQLdb

def db_sample():
    # 接続する
    con = MySQLdb.connect(
            user='YOUR_NAME',
            passwd='YOUR_PASSWORD',
            host='YOUR_HOST',
            db='YOUR_DB')

    # カーソルを取得する
    ## レスポンスをディクショナリで利用するための命令
    cur= con.cursor(MySQLdb.cursors.DictCursor)

    # クエリを実行する
    sql = "SELECT * FROM test"
    cur.execute(sql)

    # 実行結果をすべて取得する
    rows = cur.fetchall()

    # 一行ずつ表示する
    for row in rows:
        print( "%s, %s, %s, %s" % (row['id'], row['name'], row['created_at'], row['updated_at']) )

    cur.close
    con.close

if __name__ == "__main__":
    db_sample()

slackweb

# -*- coding:utf-8 -*-

import slackweb

slack = slackweb.Slack(url="https://hooks.slack.com/services/*****")
slack.notify(text="hell world")

下準備の完了

以上が必要なライブラリと設定の追加になる。 これを踏まえ、Scrapy側で操作する。

スクレイピングの実装

とあるWebページの情報を取得し、更新があればSlackへ通知対応方法になります。 迷惑を掛けるとまずいので、リンク等は架空のものとします。

プロジェクトの作成

scrapy startproject test_to_slack

spiders

実際にスクレイピングしたい処理をクラス内に記述します。 必要な処理としては以下の2点になります。

  1. 取得URLの設定
  2. データの抽出

スパイダーをテストしたいだけなら、他の実装は不要です。 このスパイダーは、該当URL先のHTMLから特定の文字列が含まれてるURLを抽出します。

# -*- coding: utf-8 -*-
import scrapy
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import CrawlSpider, Rule
import pytz, datetime, calendar

class HogehogeSpider(CrawlSpider):
    name = 'hogehoge'
    # リンクを抽出するために考慮されるドメインを含む文字列の単一の値, またはリスト。
    allowed_domains = ['www.hogehogehoge.jp']
    start_urls = ['http://www.hogehogehoge.jp']

    rules = (
        # スパイダーがクロールを開始するURLリストの指定と、callback関数の指定。
        ## 正規表現での記述が可能なため、該当の全URLを対象とする。今回は/hoge_hoge/のみ。
        ## callbackで呼び出す関数を指定する。
        Rule(LinkExtractor(allow='/hoge_hoge/$'), callback='parse_hoge'),
    )

    def parse_hoge(self, response):
        i = {}
        # xpath/cssセレクタを使って該当の情報を抽出する。
        ## 以下の場合は、'<html><body><div><ul><li><a>'のようなHTMLのhrefに"comic"が含まれている情報を抽出している。
        i['count'] = len(response.xpath('//body/div/ul/li/a[contains(@href,"comic")]/@href'))
        tz = pytz.timezone("Asia/Tokyo")
        now = datetime.datetime.now(tz)
        i['updated_at'] = i['created_at'] = calendar.timegm(now.utctimetuple())

        yield i

item

スクレイピングした情報を管理するためのクラス。 例えば、スパイダーでこのItemクラスに管理しているフィールドのみ使用したい場合以下のような実装になります。 フィールドで管理されていないフィールドを指定した場合はエラーを返します。

from Hogehoge.items import HogehogeItem
...
        i = HogehogeItem()
        i['count'] = len(response.xpath('//body/div/ul/li/a[contains(@href,"comic")]/@href')
        # 以下のフィールドはItemクラスにないのでエラー
        i['test'] = 9999    
import scrapy

class HogehogeItem(scrapy.Item):
    # define the fields for your item here like:
    # name = scrapy.Field()
    count = scrapy.Field()
    created_at = scrapy.Field()
    updated_at = scrapy.Field()

settings

各種設定を記述します。 今回は、使用するパイプラインと、MySQL、Slack関連の設定を行ないました。

ITEM_PIPELINES = {
    'hogehoge.pipelines.HogehogePipeline': 1,
}

...

MYSQL_HOST = 'YOUR_HOST'
MYSQL_DB = 'YOUR_DB'
MYSQL_USER = 'YOUR_USER'
MYSQL_PASSWORD = 'YOUR_PASSWORD'
SLACK_WEBHOOK = 'https://hooks.slack.com/services/***'

pipline

MySQLへの登録やSlackへの通知処理を行ないます。

import MySQLdb
import slackweb

class HogehogePipeline(object):
    #def process_item(self, item, spider):
    #    return item

    def __init__(self, mysql_host, mysql_db, mysql_user, mysql_passwd):
        self.mysql_host = mysql_host
        self.mysql_db = mysql_db
        self.mysql_user = mysql_user
        self.mysql_passwd = mysql_passwd

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            mysql_host = crawler.settings.get('MYSQL_HOST'),
            mysql_db = crawler.settings.get('MYSQL_DB'),
            mysql_user = crawler.settings.get('MYSQL_USER'),
            mysql_passwd = crawler.settings.get('MYSQL_PASSWORD')
        )

    def open_spider(self, spider):
        self.conn = MySQLdb.connect(
            user = self.mysql_user,
            passwd = self.mysql_passwd,
            host = self.mysql_host,
            db = self.mysql_db,
            charset="utf8"
        )
        self.cur = self.conn.cursor(MySQLdb.cursors.DictCursor)

    def close_spider(self, spider):
        self.cur.close
        self.conn.close

    def process_item(self, item, spider):
        select_sql = "SELECT * FROM `test`;"
        self.cur.execute(select_sql)

        webhook_url = spider.settings['SLACK_WEBHOOK']
        slack = slackweb.Slack(url=webhook_url)

        # 登録済みなら登録件数と今回取得した件数の比較を行う
        if self.cur.rowcount:
            # 登録件数が異なれば更新
            if self.cur.fetchone()['count'] != item['count']:
                update_sql = "UPDATE `test` SET count = %s, updated_at = %s;"
                self.cur.execute(update_sql, (item['count'], item['updated_at']))
                self.conn.commit()

                to_comic_url = "http://www.hogehoge.jp/comic%s.png" % item['count']
                slack.notify(text=to_comic_url)
            else: # 同じならupdated_atのみ更新
                update_sql = "UPDATE `test` SET updated_at = %s WHERE count = %s;"
                self.cur.execute(update_sql, (item['updated_at'], item['count']))
                self.conn.commit()

        else: # 未登録なら登録する
            insert_sql = "INSERT IGNORE INTO `test` (`count`, `created_at`, `updated_at`) VALUES (%s, %s, %s);"
            self.cur.execute(insert_sql, (item['count'], item['created_at'], item['updated_at']))
            self.conn.commit()

            to_comic_url = "http://www.hogehoge.jp/comic01.png"
            slack.notify(text=to_comic_url)