前段時間、卒業設計のためにラズベリーパイ 4B を購入し、卒業発表が終わった後はラズベリーパイが放置されていました。ローカルネットワークのクラウドドライブを構築する以外に、以前学んだ OpenCV を使いたいと思い、カメラと人体センサーを購入してラズベリーパイに接続し、簡単な「おはよう」通知機能を作りました。毎朝デスクに座るときに、WeChat で「おはよう」の挨拶と天気予報を送信します。
全体の設計思路は、毎朝 7 時にラズベリーパイが py スクリプトを実行し、人体センサーでデスクの前に人がいるかどうかを検知します。もし人がいればカメラを開いて本人かどうかを判断し、本人であれば WeChat でメッセージを送信します。思考はとてもシンプルで、コードも順調に書けて大きな問題には直面しませんでした。ただ、人体センサーを接続する際に正負極を間違えてしまい、1 つの人体センサーを壊してしまいました + 手が少し熱くなりました∠(ᐛ」∠)_
Note
前期準備
🖥️ OpenCV + Mediapipe 環境がインストールされたラズベリーパイ
📹 カメラ
✋ 人体センサー (またはそれ以上)
🔌 母対母のジャンパー線数本
🎤 Wecom 酱を構築したサーバー(またはラズベリーパイ内に構築しても可)
人体センサー#
人体センサーは淘宝で適当なものを購入できますが、感知距離が少し短い(約 1 メートル程度)のものをお勧めします。ここで購入したセンサーは感知距離が 5 から 10 メートルで、最小の感知距離に調整しても、日常使用では少し敏感すぎると感じました。
人体センサーの VCC ピンをラズベリーパイの 4 番 5V ピンに接続し、GND ピンをラズベリーパイの 6 番 GND ピンに接続します。OUT ピンはラズベリーパイの任意の GPIO ピンに接続できますが、ここでは 38 番の GPIO.28 ピンに接続します。
Important
VCC と GND を逆に接続しないように注意してください。そうしないとセンサーが壊れてしまいます。VCC ピンはラズベリーパイの 3.3V ピンまたは 5V ピンに接続するかは、センサーの動作電圧を確認してください。
次に、以下のコードをラズベリーパイに投入して実行します:
import RPi.GPIO as GPIO
import time
GPIO.setmode(GPIO.BCM) #ピン番号をBCMモードに設定
GPIO.setup(20, GPIO.IN) #GPIO.28 (BCM番号は20)を入力モードに設定
while True:
if GPIO.input(20):
print("HIGH")
elif GPIO.input(20) == 0:
print("LOW")
time.sleep(1)
すべてが順調であれば、手を人体センサーの前に置くと HIGH と出力され、しばらく HIGH のまま(センサーに遅延が設定されています)。検知されない場合は LOW と出力されます。これで人体センサーの動作の第一歩のデバッグが完了しました。
顔認識の前期準備#
顔認識には OpenCV + Mediapipe を使用して、カメラで顔の画像をキャッチし、キャッチした顔を API を通じて Megvii のFace++ プラットフォームにアップロードして顔認識の結果を得ます。
Face++ 顔検出#
まず、データベースの顔用に自撮り写真を用意し、multipart/form-data
メソッドで POST して顔検出 APIを呼び出し、face_token を取得します。face_token はこの画像の検出された各顔を表し、一意性を持ち、後続の顔比較に使用できます。
import urllib.request
import urllib.error
import time
import json
def uploadImage(postURL,imgURL):
#multipart/form-dataリクエストボディを構築
border = '----------%s' % hex(int(time.time() * 1000))
postData = []
postData.append('--%s' % border)
postData.append('Content-Disposition: form-data; name="%s"\r\n' % 'api_key')
postData.append('XXXXXXXXXXX') #api_key
postData.append('--%s' % border)
postData.append('Content-Disposition: form-data; name="%s"\r\n' % 'api_secret')
postData.append('XXXXXXXXXXX') #api_secret
postData.append('--%s' % border)
fr = open(imgURL, 'rb')
postData.append('Content-Disposition: form-data; name="%s"; filename=" "' % 'image_file')
postData.append('Content-Type: %s\r\n' % 'application/octet-stream')
postData.append(fr.read())
fr.close()
postData.append('--%s--\r\n' % border)
for i, d in enumerate(postData):
if isinstance(d, str):
postData[i] = d.encode('utf-8')
http_body = b'\r\n'.join(postData)
req = urllib.request.Request(url=postURL, data=http_body)
req.add_header('Content-Type', 'multipart/form-data; boundary=%s' % border)
try:
res = urllib.request.urlopen(req, timeout=5)
qrcont = res.read()
resJson = json.loads(qrcont.decode('utf-8'))
print(resJson['faces'][0]['face_token'])
except urllib.error.HTTPError as e:
print(e.read().decode('utf-8'))
pass
uploadImage(postURL='https://api-cn.faceplusplus.com/facepp/v3/detect',imgURL='./face.jpg')
face_token を取得した後、faceset に保存していないため、72 時間後に自動的に無効になります。次のステップでは、face_token を長期間使用するために保存する顔ライブラリを作成する必要があります。
Face++ 顔ライブラリの作成#
POST メソッドを使用して顔ライブラリ APIを呼び出すことで顔ライブラリを作成できます。
import requests
CreatFacesetdata = {
'api_key': 'XXXXXXXXXXX', #api_key
'api_secret': 'XXXXXXXXXXX' #api_secret
}
CreatFacesetRes = requests.post('https://api-cn.faceplusplus.com/facepp/v3/faceset/create', data=CreatFacesetdata)
CreatFacesetResJson = CreatFacesetRes.json()
faceset_token = CreatFacesetResJson['faceset_token']
print(faceset_token)
faceset_token を取得した後、顔ライブラリに顔を追加できます。
Face++ 顔の追加#
同様に、顔ライブラリに顔を追加するのも非常に簡単で、上記の手順と同じです。
import requests
UploadFacedata = {
'api_key': 'XXXXXXXXXXX', #api_key
'api_secret': 'XXXXXXXXXXX', #api_secret
'faceset_token': 'XXXXXXXXXXX', #faceset_token
'face_tokens': 'XXXXXXXXXXX' #face_token
}
UploadFaceRes = requests.post('https://api-cn.faceplusplus.com/facepp/v3/faceset/addface', data=UploadFacedata)
UploadFaceResJson = UploadFaceRes.json()
print(UploadFaceResJson)
これで顔認識の前期準備が完了しました。 Face++ のドキュメントセンターには、さまざまな API の詳細な説明がありますので、具体的な使用方法を確認できます。
メインプログラム#
天気情報の取得には、和風天気のリアルタイム天気 APIを使用します。https://devapi.qweather.com/v7/weather/now?location={都市 ID}&key={key}
に GET リクエストを送信して天気状況を取得します。顔比較は顔比較 APIを呼び出すことで実現されます。同様にファイルをアップロードする必要があるため、multipart/form-data
メソッドで API を呼び出します。完全なプログラムは以下の通りです:
import cv2
import time
import mediapipe as mp
import RPi.GPIO as GPIO
import requests
import urllib.request
import urllib.error
import time
import datetime
import json
import os
GPIO.setmode(GPIO.BCM)
GPIO.setup(20, GPIO.IN)
def getWeather(location, Key):
# 天気を取得
weatherData = requests.get(
'https://devapi.qweather.com/v7/weather/now?location=' + location + '&key=' + Key
)
weatherJson = weatherData.json()
nowTemperature = weatherJson['now']['temp']
nowWeather = weatherJson['now']['text']
nowFeelsLike = weatherJson['now']['feelsLike']
nowWindDir = weatherJson['now']['windDir']
nowWindScale = weatherJson['now']['windScale']
return nowWeather, nowFeelsLike, nowTemperature, nowWindDir, nowWindScale
def bodyCheak():
# 人体センサー情報
if GPIO.input(20):
return 1
else:
return 0
def faceCompare(imgURL):
# 顔比較
border = '----------%s' % hex(int(time.time() * 1000))
postData = []
postData.append('--%s' % border)
postData.append('Content-Disposition: form-data; name="%s"\r\n' % 'api_key')
postData.append('XXXXXXXXXXX') # api_key
postData.append('--%s' % border)
postData.append('Content-Disposition: form-data; name="%s"\r\n' % 'api_secret')
postData.append('XXXXXXXXXXX') # api_secret
postData.append('--%s' % border)
fr = open(imgURL, 'rb')
postData.append('Content-Disposition: form-data; name="%s"; filename=" "' % 'image_file1')
postData.append('Content-Type: %s\r\n' % 'application/octet-stream')
postData.append(fr.read())
fr.close()
postData.append('--%s' % border)
postData.append('Content-Disposition: form-data; name="%s"\r\n' % 'face_token2')
postData.append('XXXXXXXXXXX') # face_token
postData.append('--%s--\r\n' % border)
for i, d in enumerate(postData):
if isinstance(d, str):
postData[i] = d.encode('utf-8')
http_body = b'\r\n'.join(postData)
req = urllib.request.Request(
url='https://api-cn.faceplusplus.com/facepp/v3/compare',
data=http_body
)
req.add_header('Content-Type', 'multipart/form-data; boundary=%s' % border)
try:
res = urllib.request.urlopen(req, timeout=5)
qrcont = res.read()
resJson = json.loads(qrcont.decode('utf-8'))
if 'confidence' in resJson:
return resJson['confidence'], resJson['thresholds']['1e-5']
else:
print('顔が検出されませんでした')
return 0, 100
except urllib.error.HTTPError as e:
print(e.read().decode('utf-8'))
weather, feelsLike, temperature, windDir, windScale = getWeather(
location='XXXXXXXXXXX',
Key='XXXXXXXXXXX'
)
pushText = 'Vinking、おはようございます~ 今日は' + str(datetime.date.today().year) + '年' \
+ str(datetime.date.today().month) + '月' \
+ str(datetime.date.today().day) + '日' \
+ '、天気は' + weather + '、温度は' + temperature + '°、体感温度は' \
+ feelsLike + '°、' + windDir + 'の' + windScale + '級の風が吹いています。新しい一日を頑張りましょう~'
while True:
if bodyCheak():
Capture = cv2.VideoCapture(0)
mp_face_mesh = mp.solutions.face_mesh
faceMesh = mp_face_mesh.FaceMesh(max_num_faces=1)
faceDetectionTime, videosTime = 0, 0
while True:
success, videos = Capture.read()
imgRGB = cv2.cvtColor(videos, cv2.COLOR_BGR2RGB)
results = faceMesh.process(imgRGB)
faces = []
if results.multi_face_landmarks:
for faceLms in results.multi_face_landmarks:
face = []
for id, lm in enumerate(faceLms.landmark):
ih, iw, ic = videos.shape
x, y = int(lm.x * iw), int(lm.y * ih)
# 顔の左輪郭特徴点
faceLeft = faceLms.landmark[234]
faceLeft_X = int(faceLeft.x * iw)
# 顔の上輪郭特徴点
faceTop = faceLms.landmark[10]
faceTop_Y = int(faceTop.y * ih)
# 顔の右輪郭特徴点
faceRight = faceLms.landmark[454]
faceRight_X = int(faceRight.x * iw)
# 顔の下輪郭特徴点
faceBottom = faceLms.landmark[152]
faceBottom_Y = int(faceBottom.y * ih)
face.append([x, y])
faces.append(face)
videosTime += 1
if len(faces) != 0:
videosTime = 0
faceDetectionTime += 1
if faceDetectionTime >= 20:
cv2.imwrite("./Face.jpg", videos[faceTop_Y: faceBottom_Y, faceLeft_X: faceRight_X])
confidence, thresholds = faceCompare(imgURL='./Face.jpg')
if confidence >= thresholds:
print('成功')
requests.get(
# WeChat通知
'https://speak.vinking.top/?text=[おはよう通知]' + pushText
)
if os.path.exists('./Face.jpg'):
os.remove('./Face.jpg')
Capture.release()
cv2.destroyAllWindows()
GPIO.cleanup()
exit(0)
else:
print('失敗')
if os.path.exists('./Face.jpg'):
os.remove('./Face.jpg')
break
elif videosTime >= 60:
print('タイムアウト')
break
Capture.release()
time.sleep(1)
毎日 7 時に定時実行したい場合は、crontab -e
コマンドを入力し、ファイル内に0 7 * * * python3 {ファイルパス}
と入力し、Ctrl + O で保存、Ctrl + X で終了すれば大丈夫です。
最後に#
ずっと一整套のスマートホームが欲しかったのですが、価格が少し高いため、別の方法で類似の機能を実現することにしました。そして、機能が成功して動作するときは非常に達成感があります。おそらく、直接購入して体験することでは得られない喜びの一種でしょう。
Ps. スマートホーム(例えば、小米のスマートプラグ)を同時に制御したい場合、こちらに小米スマートデバイスの Python ライブラリがありますので、類似の機能を実現できるかもしれません。
この記事はMix Spaceによって xLog に同期更新されました。原始リンクはhttps://www.vinking.top/posts/daily/raspberry-pi-morning-alarm-weather-forecast