如何客製化企業 RAG 知識庫?— 從資料庫到知識整合的實戰技術
|

【CAVEDU講堂】Google Mediapipe 深蹲偵測 結合 Arduino 首次接觸就上手

   

作者/圖片來源:CAVEDU 教育團隊

這次要做一個自虐的專題,偵測深蹲是否到位,其實是檢測大腿與小腿的夾角是否小於指定角度,看看怎樣使用 mediapipe 來做到喔!分成兩個版本:[Mediapipe 結合 Arduino 首次接觸就上手] 以及 [純 Mediapipe (超純)]。兩者差異在於 python 端會根據辨識結果發送訊號給 Arduino 首次接觸就上手,來看影片吧 (大家都要減肥了QQ)

想深入認識 Google Mediapipe 的朋友請點我看更多

Mediapipe 姿態偵測結合 Arduino 首次接觸就上手

以下是 mediapipe POSE api 的人體關節點定義,可看到兩腿的髖、膝與踝關節分別為 24 26 28 與 23 25 27。

請用以下指令安裝 mediapipe (python)


pip install mediapipe

如果執行本範例程式出現錯誤,請根據本文操作到圖 21。本文執行環境使用 Anaconda (python 3.7)

Python

使用 pose API 偵測兩腿夾角,想挑戰更高難度的話可以把角度調小一點(先不要謝謝)

Mediapipe squat detecting with Arduino Uno


import cv2
import mediapipe as mp
import numpy as np
import time
import json
import serial

cam = cv2.VideoCapture(0)
mppose = mp.solutions.pose
mpdraw = mp.solutions.drawing_utils
poses = mppose.Pose()
h = 0
w = 0
ser = serial.Serial("COM3", 9600)

start_time = 0
status = False

sport = {
    "name": "Squat",
    "count": 0,
    "calories": 0
}


def logger(count, cals):
    f = open("log.txt", 'a')
    fs = f"{time.ctime()} count: {count} cals: {cals}\n"
    f.write(fs)
    f.close()


def calc_angles(a, b, c):
    a = np.array(a)
    b = np.array(b)
    c = np.array(c)

    radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - \
              np.arctan2(a[1] - b[1], a[0] - b[0])

    angle = np.abs(radians * 180.0 / np.pi)

    if angle > 180:
        angle = 360 - angle

    return angle


def get_landmark(landmarks, part_name):
    return [
        landmarks[mppose.PoseLandmark[part_name].value].x,
        landmarks[mppose.PoseLandmark[part_name].value].y,
        landmarks[mppose.PoseLandmark[part_name].value].z,
    ]


def get_visibility(landmarks):
    if landmarks[mppose.PoseLandmark["RIGHT_HIP"].value].visibility < 0.8 or \
            landmarks[mppose.PoseLandmark["LEFT_HIP"].value].visibility < 0.8: return False else: return True def get_body_ratio(landmarks): r_body = abs(landmarks[mppose.PoseLandmark["RIGHT_SHOULDER"].value].y - landmarks[mppose.PoseLandmark["RIGHT_HIP"].value].y) l_body = abs(landmarks[mppose.PoseLandmark["LEFT_SHOULDER"].value].y - landmarks[mppose.PoseLandmark["LEFT_HIP"].value].y) avg_body = (r_body + l_body) / 2 r_leg = abs(landmarks[mppose.PoseLandmark["RIGHT_HIP"].value].y - landmarks[mppose.PoseLandmark["RIGHT_ANKLE"].value].y) l_leg = abs(landmarks[mppose.PoseLandmark["LEFT_HIP"].value].y - landmarks[mppose.PoseLandmark["LEFT_ANKLE"].value].y) if r_leg > l_leg:
        return r_leg / avg_body
    else:
        return l_leg / avg_body


def get_knee_angle(landmarks):
    r_hip = get_landmark(landmarks, "RIGHT_HIP")
    l_hip = get_landmark(landmarks, "LEFT_HIP")

    r_knee = get_landmark(landmarks, "RIGHT_KNEE")
    l_knee = get_landmark(landmarks, "LEFT_KNEE")

    r_ankle = get_landmark(landmarks, "RIGHT_ANKLE")
    l_ankle = get_landmark(landmarks, "LEFT_ANKLE")

    r_angle = calc_angles(r_hip, r_knee, r_ankle)
    l_angle = calc_angles(l_hip, l_knee, l_ankle)

    m_hip = (r_hip + l_hip)
    m_hip = [x / 2 for x in m_hip]
    m_knee = (r_knee + l_knee)
    m_knee = [x / 2 for x in m_knee]
    m_ankle = (r_ankle + l_ankle)
    m_ankle = [x / 2 for x in m_ankle]

    mid_angle = calc_angles(m_hip, m_knee, m_ankle)

    return [r_angle, l_angle, mid_angle]


def main():
    global h, w, start_time, status
    flag = False
    if not cam.isOpened():
        print("Camera not open")
        exit()

    try:
        f = open("sport_recorder.json", "r")
        prevdata = json.load(f)
        if sport['name'] == prevdata['name']:
            sport['count'] = prevdata['count']
            sport['calories'] = prevdata['calories']
            print("Read Success!")
        f.close()
    except:
        print("Read Error...")
        pass

    tmp = f"a{sport['count']}\n"
    ser.write(str.encode(tmp))
    tmp = f"b{sport['calories']}\n"
    ser.write(str.encode(tmp))

    cv2.namedWindow('frame', cv2.WINDOW_FREERATIO)

    while not flag:
        ret, frame = cam.read()
        if not ret:
            print("Read Error")
            break
        frame = cv2.flip(frame, 1)
        rgbframe = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        poseoutput = poses.process(rgbframe)
        h, w, _ = frame.shape
        preview = frame.copy()

        if poseoutput.pose_landmarks:
            mpdraw.draw_landmarks(preview, poseoutput.pose_landmarks, mppose.POSE_CONNECTIONS)
            knee_angles = get_knee_angle(poseoutput.pose_landmarks.landmark)
            body_ratio = get_body_ratio(poseoutput.pose_landmarks.landmark)
            if knee_angles[0] < 120:
                cv2.putText(preview, "Left: Down {:.1f}".format(knee_angles[0]), (10, 40)
                            , cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1, cv2.LINE_AA
                            )
            elif knee_angles[0] < 130:
                cv2.putText(preview, "Left: ??? {:.1f}".format(knee_angles[0]), (10, 40)
                            , cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA
                            )
            else:
                cv2.putText(preview, "Left: Up {:.1f}".format(knee_angles[0]), (10, 40)
                            , cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA
                            )

            if knee_angles[1] < 120:
                cv2.putText(preview, "Right: Down {:.1f}".format(knee_angles[1]), (10, 80)
                            , cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1, cv2.LINE_AA
                            )
            elif knee_angles[1] < 130: cv2.putText(preview, "Right: ??? {:.1f}".format(knee_angles[1]), (10, 80) , cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA ) else: cv2.putText(preview, "Right: Up {:.1f}".format(knee_angles[1]), (10, 80) , cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA ) avg_angle = (knee_angles[0] + knee_angles[1]) // 2 # determine the status if status: if avg_angle > 160:
                    status = False
                    pass_time = time.time() - start_time
                    start_time = 0
                    if 3000 > pass_time > 3:
                        sport['count'] = sport['count'] + 1
                        sport['calories'] = sport['calories'] + int(0.66 * pass_time)
                        logger(sport['count'], sport['calories'])
                        tmp = f"a{sport['count']}\n"
                        ser.write(str.encode(tmp))
                        tmp = f"b{sport['calories']}\n"
                        ser.write(str.encode(tmp))

            else:
                if avg_angle < 120 and body_ratio < 1.2: start_time = time.time() status = True # print(f"status:{status} {start_time}") if status: cv2.putText(preview, f"{status} : {avg_angle:.1f} {body_ratio:.3f}", (10, 120) , cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1, cv2.LINE_AA ) if time.time() - start_time > 3:
                    ser.write(b'command_2\n')
                else:
                    ser.write(b'command_1\n')
            else:
                cv2.putText(preview, f"{status} : {avg_angle:.1f} {body_ratio:.3f}", (10, 120)
                            , cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA
                            )
                ser.write(b'command_4\n')
        else:
            ser.write(b'command_4\n')
            start_time = 0

        cv2.imshow('frame', preview)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            flag = True

    f = open("sport_recorder.json", "w+")
    f.write(json.dumps(sport))
    f.close()

    # release camera
    cam.release()
    cv2.destroyAllWindows()


if __name__ == '__main__':
    main()

Arduino ino

接收來自於PC端的 python程式,並執行對應的LED、蜂鳴器動作:

arduino receive command from PC (python serial)


//Arduino首次接觸就上手
//Google Mediapipe - POSE api
//example: https://cavedu.gitbook.io/cavedu/hangeekduino

#define LED_1_pin 4  //板子上有
#define LED_2_pin 9  //需另外接
#define AUDIO_pin 5  //板子上有

int counter = 0;
int calories = 0;

bool stat = false;
bool breaker = false;

String str;

void setup(void)
{
  Serial.begin(9600);

  // init pin states
  pinMode(LED_1_pin, OUTPUT);
  digitalWrite(LED_1_pin, LOW);
  pinMode(LED_2_pin, OUTPUT);
  digitalWrite(LED_2_pin, LOW);
}

void loop(void)
{
  int i;

  if (Serial.available()) {
    // 讀取傳入的字串直到"\n"結尾
    str = Serial.readStringUntil('\n');

    if (str.startsWith("a")) {
      str.remove(0, 1);
      counter = str.toInt();
    }
    else if (str.startsWith("b")) {
      str.remove(0, 1);
      calories = str.toInt();
    }
    else if (str == "command_1") {
      if (!stat) {
        tone(AUDIO_pin, 110, 100);
        delay(100);
        tone(AUDIO_pin, 165, 100);
      }
      stat = true;
      digitalWrite(LED_1_pin, HIGH);
      digitalWrite(LED_2_pin, LOW);
    }
    else if (str == "command_2") {
      digitalWrite(LED_1_pin, HIGH);
      digitalWrite(LED_2_pin, HIGH);
      if (!breaker) {
        tone(AUDIO_pin, 200, 100);
        delay(100);
        tone(AUDIO_pin, 200, 100);
        breaker = true;
      }
    }
    else if (str == "command_4" ) {
      if (stat) {
        tone(AUDIO_pin, 165, 100);
        delay(100);
        tone(AUDIO_pin, 110, 100);
      }
      stat = false;
      breaker = false;
      digitalWrite(LED_1_pin, LOW);
      digitalWrite(LED_2_pin, LOW);
    }
  }
}

純 Mediapipe 版本

可以看出就是取消 serial 相關的程式碼而已,歡迎您也一起來做做看喔!


import cv2
import mediapipe as mp
import numpy as np
import time
import json
#import serial

cam = cv2.VideoCapture(0)
mppose = mp.solutions.pose
mpdraw = mp.solutions.drawing_utils
poses = mppose.Pose()
h = 0
w = 0
#ser = serial.Serial("COM3", 9600)

start_time = 0
status = False

sport = {
    "name": "Squat",
    "count": 0,
    "calories": 0
}


def logger(count, cals):
    f = open("log.txt", 'a')
    fs = f"{time.ctime()} count: {count} cals: {cals}\n"
    f.write(fs)
    f.close()


def calc_angles(a, b, c):
    a = np.array(a)
    b = np.array(b)
    c = np.array(c)

    radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - \
              np.arctan2(a[1] - b[1], a[0] - b[0])

    angle = np.abs(radians * 180.0 / np.pi)

    if angle > 180:
        angle = 360 - angle

    return angle


def get_landmark(landmarks, part_name):
    return [
        landmarks[mppose.PoseLandmark[part_name].value].x,
        landmarks[mppose.PoseLandmark[part_name].value].y,
        landmarks[mppose.PoseLandmark[part_name].value].z,
    ]


def get_visibility(landmarks):
    if landmarks[mppose.PoseLandmark["RIGHT_HIP"].value].visibility < 0.8 or \
            landmarks[mppose.PoseLandmark["LEFT_HIP"].value].visibility < 0.8: return False else: return True def get_body_ratio(landmarks): r_body = abs(landmarks[mppose.PoseLandmark["RIGHT_SHOULDER"].value].y - landmarks[mppose.PoseLandmark["RIGHT_HIP"].value].y) l_body = abs(landmarks[mppose.PoseLandmark["LEFT_SHOULDER"].value].y - landmarks[mppose.PoseLandmark["LEFT_HIP"].value].y) avg_body = (r_body + l_body) / 2 r_leg = abs(landmarks[mppose.PoseLandmark["RIGHT_HIP"].value].y - landmarks[mppose.PoseLandmark["RIGHT_ANKLE"].value].y) l_leg = abs(landmarks[mppose.PoseLandmark["LEFT_HIP"].value].y - landmarks[mppose.PoseLandmark["LEFT_ANKLE"].value].y) if r_leg > l_leg:
        return r_leg / avg_body
    else:
        return l_leg / avg_body


def get_knee_angle(landmarks):
    r_hip = get_landmark(landmarks, "RIGHT_HIP")
    l_hip = get_landmark(landmarks, "LEFT_HIP")

    r_knee = get_landmark(landmarks, "RIGHT_KNEE")
    l_knee = get_landmark(landmarks, "LEFT_KNEE")

    r_ankle = get_landmark(landmarks, "RIGHT_ANKLE")
    l_ankle = get_landmark(landmarks, "LEFT_ANKLE")

    r_angle = calc_angles(r_hip, r_knee, r_ankle)
    l_angle = calc_angles(l_hip, l_knee, l_ankle)

    m_hip = (r_hip + l_hip)
    m_hip = [x / 2 for x in m_hip]
    m_knee = (r_knee + l_knee)
    m_knee = [x / 2 for x in m_knee]
    m_ankle = (r_ankle + l_ankle)
    m_ankle = [x / 2 for x in m_ankle]

    mid_angle = calc_angles(m_hip, m_knee, m_ankle)

    return [r_angle, l_angle, mid_angle]


def main():
    global h, w, start_time, status
    flag = False
    if not cam.isOpened():
        print("Camera not open")
        exit()

    try:
        f = open("sport_recorder.json", "r")
        prevdata = json.load(f)
        if sport['name'] == prevdata['name']:
            sport['count'] = prevdata['count']
            sport['calories'] = prevdata['calories']
            print("Read Success!")
        f.close()
    except:
        print("Read Error...")
        pass

    tmp = f"a{sport['count']}\n"
    #ser.write(str.encode(tmp))
    tmp = f"b{sport['calories']}\n"
    #ser.write(str.encode(tmp))

    cv2.namedWindow('frame', cv2.WINDOW_FREERATIO)

    while not flag:
        ret, frame = cam.read()
        if not ret:
            print("Read Error")
            break
        frame = cv2.flip(frame, 1)
        rgbframe = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        poseoutput = poses.process(rgbframe)
        h, w, _ = frame.shape
        preview = frame.copy()

        if poseoutput.pose_landmarks:
            mpdraw.draw_landmarks(preview, poseoutput.pose_landmarks, mppose.POSE_CONNECTIONS)
            knee_angles = get_knee_angle(poseoutput.pose_landmarks.landmark)
            body_ratio = get_body_ratio(poseoutput.pose_landmarks.landmark)
            if knee_angles[0] < 120:
                cv2.putText(preview, "Left: Down {:.1f}".format(knee_angles[0]), (10, 40)
                            , cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1, cv2.LINE_AA
                            )
            elif knee_angles[0] < 130:
                cv2.putText(preview, "Left: ??? {:.1f}".format(knee_angles[0]), (10, 40)
                            , cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA
                            )
            else:
                cv2.putText(preview, "Left: Up {:.1f}".format(knee_angles[0]), (10, 40)
                            , cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA
                            )

            if knee_angles[1] < 120:
                cv2.putText(preview, "Right: Down {:.1f}".format(knee_angles[1]), (10, 80)
                            , cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1, cv2.LINE_AA
                            )
            elif knee_angles[1] < 130: cv2.putText(preview, "Right: ??? {:.1f}".format(knee_angles[1]), (10, 80) , cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 1, cv2.LINE_AA ) else: cv2.putText(preview, "Right: Up {:.1f}".format(knee_angles[1]), (10, 80) , cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA ) avg_angle = (knee_angles[0] + knee_angles[1]) // 2 # determine the status if status: if avg_angle > 160:
                    status = False
                    pass_time = time.time() - start_time
                    start_time = 0
                    if 3000 > pass_time > 3:
                        sport['count'] = sport['count'] + 1
                        sport['calories'] = sport['calories'] + int(0.66 * pass_time)
                        logger(sport['count'], sport['calories'])
                        tmp = f"a{sport['count']}\n"
                        #ser.write(str.encode(tmp))
                        tmp = f"b{sport['calories']}\n"
                        #ser.write(str.encode(tmp))

            else:
                if avg_angle < 120 and body_ratio < 1.2: start_time = time.time() status = True # print(f"status:{status} {start_time}") if status: cv2.putText(preview, f"{status} : {avg_angle:.1f} {body_ratio:.3f}", (10, 120) , cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1, cv2.LINE_AA ) #if time.time() - start_time > 3:
                    #ser.write(b'command_2\n')
                #else:
                    #ser.write(b'command_1\n')
            else:
                cv2.putText(preview, f"{status} : {avg_angle:.1f} {body_ratio:.3f}", (10, 120)
                            , cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA
                            )
                #ser.write(b'command_4\n')
        else:
            #ser.write(b'command_4\n')
            start_time = 0

        cv2.imshow('frame', preview)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            flag = True

    f = open("sport_recorder.json", "w+")
    f.write(json.dumps(sport))
    f.close()

    # release camera
    cam.release()
    cv2.destroyAllWindows()


if __name__ == '__main__':
    main()

(本文經CAVEDU同意轉載,原文連結;責任編輯:謝涵如)

CAVEDU 教育團隊

訂閱MakerPRO知識充電報

與40000位開發者一同掌握科技創新的技術資訊!

Author: CAVEDU 教育團隊

CAVEDU 教育團隊是由一群對教育充滿熱情的大孩子所組成的機器人科學教育團隊。致力推動國內機器人教育。

Share This Post On
468 ad

Submit a Comment

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *