How I made Face ID for the front door

import requests
import configparser
import logging
import os
import sys
import io
import face_recognition
import configparser
import paho.mqtt.client as mqtt
from datetime import datetime
from PIL import Image

delimeter="/"
if(os.name=='nt'):
    delimeter="\\"

root_path = os.path.dirname(__file__) + delimeter

config = configparser.ConfigParser()
config.read(root_path + "fr.conf")
url = config['image']['url']
topic_subscribe = config['mqtt']['topic_subscribe']
topic_publish = config['mqtt']['topic_publish']
mqtt_server = config['mqtt']['mqtt_server']
mqtt_port = int(config['mqtt']['mqtt_port'])
try:
    mqtt_user = config['mqtt']['mqtt_user']
    mqtt_password = config['mqtt']['mqtt_password']
except:
    pass

known_path = config['dirs']['known']
new_path = config['dirs']['new']
unknown_path = config['dirs']['unknown']
no_faces = config['dirs']['no_faces']
logfile = config['log']['logfile']
loglevel = 10 * int(config['log']['loglevel'])
comparison_threshold = float(config['compare']['comparison_threshold'])

def parse_int_tuple(input):
    return tuple(int(k.strip()) for k in input[1:-1].split(','))

try:
    crop_rect = parse_int_tuple(config['image']['crop_rect'])
except:
    pass


logging.basicConfig(level=loglevel, filename=(root_path + logfile),filemode="w",format="%(asctime)s %(levelname)s %(message)s")
persons = []

def add_known_person():
    listdir = None
    try:
        listdir = os.listdir((root_path + known_path))
    except Exception as exc:
        logging.error("Can't list known person dir " + (root_path + known_path) + " with error: " + str(exc))
        sys.exit(1)

    for file_path in listdir:
        # check if current file_path is a file
        if os.path.isfile(os.path.join((root_path + known_path), file_path)):
            person_name = file_path.split(".")[0]
            logging.info("Try to add known person from file " + file_path)
            # load image from file
            known_image = face_recognition.load_image_file(root_path + known_path + delimeter + file_path)
            # recognize face
            face_enc = face_recognition.face_encodings(known_image)
            # if face found
            if(len(face_enc) > 0):
                # get first face
                person_encoding = face_enc[0]
                # add face to known persons array
                persons.append([person_name, [person_encoding]])
                logging.info("Add known person: " + person_name)
            else:
                logging.error("Face not found in file " + file_path)

def get_image_from_url(file_name):
    result = False
    r = None
    try:
        # Get image from URL
        r = requests.get(url)
        logging.info("Get image success")
    except Exception as exc:
         logging.error("Can't get image error:" + str(exc))
    if r is not None:
        new_file_name = file_name
        new_file_path = new_path + delimeter + new_file_name
        # Load image from request content
        im = Image.open(io.BytesIO(r.content))
        try:
            crop_rect
            # Crop image
            im = im.crop(crop_rect)
        except:
            pass
        try:
            # Save image to folder new
            im.save(root_path + new_file_path)
            result = True
            logging.info("Save image to " + (root_path + new_file_path))
        except Exception as exc:
            logging.error("Can't save image " + root_path + new_file_path + " with error: " + str(exc))
        return result
            

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    if(rc != 0):
        logging.info("MQTT connection error with result code "+str(rc))
        print("MQTT connection error with result code "+str(rc))
    else:
        logging.info("MQTT connection successful with result code "+str(rc))
        print("MQTT connection successful with result code "+str(rc))

    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    client.subscribe(topic_subscribe)

# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
    known_person = "unknown"
    logging.info(msg.topic+" "+str(msg.payload.decode()))
    # Loop face recognition
    i = 0
    try:
        # Read value from topic
        i = int(msg.payload.decode())
    except ValueError:
        logging.error("Receive non int value in topic " + msg.topic)
    for j in range(i):
        now = datetime.now()
        new_file_name = now.strftime("%Y_%m_%d-%H_%M_%S-") + str(j) + '.jpg'
        if get_image_from_url(new_file_name):
            new_file_path = new_path + delimeter + new_file_name
            # Load image from file
            new_image = face_recognition.load_image_file(root_path + new_file_path)
            # Check face count on image
            face_locations = face_recognition.face_locations(new_image)
            logging.info("Face location count: " + str(len(face_locations)))
            if(len(face_locations) > 0):
                # Recognize face on image
                unknown_faces = face_recognition.face_encodings(new_image, face_locations)
                logging.info("face recognition count: " + str(len(unknown_faces)))
                # Compare face with known faces
                for unknown_face in unknown_faces:
                    for i in range(len(persons)):
                        results = face_recognition.compare_faces(persons[i][1], unknown_face, comparison_threshold)
                        logging.info("compare face with " + persons[i][0] + " result: " + str(results))
                        if(results[0]):
                            known_person = persons[i][0]
                            client.publish(topic_publish + 'person', "{ \"known_person\": \"" + known_person + "\" }", 0, False)
                            logging.info("client publish { \"known_person\": \"" + known_person + "\" }")
            else:
                #print("no faces")
                known_person = "nofaces"
            logging.info("person: " + known_person)
            client.publish(topic_publish + known_person, now.strftime("%Y-%m-%d %H:%M:%S"), 0, True)
            if(known_person == "nofaces"):
                try:
                    os.remove(root_path + new_file_path)
                    logging.info("Remove file " + root_path + new_file_path)
                except Exception as exc:
                    logging.error("Can't remove file " + root_path + new_file_path + ": " + str(exc))
            else:
                try:
                    os.replace(root_path + new_file_path, root_path + known_person + delimeter + new_file_name)
                    logging.info("Move file " + root_path + new_file_path + " to " + root_path + known_person + delimeter + new_file_name)
                except Exception as exc:
                    logging.error("Can't move file " + root_path + new_file_path + " to " + root_path + known_person + delimeter + new_file_name + ": " + str(exc))
            if(known_person not in ("nofaces", "unknown")):
                break

if __name__ == "__main__":
    logging.info("Start face recognition")
    add_known_person()
    connected_flag = 0
    client = mqtt.Client()
    try:
        mqtt_password
        client.username_pw_set(mqtt_user, mqtt_password)
    except NameError:
        logging.warn("MQTT password is not set. Trying to connect without password.")
    client.enable_logger(logger=logging)
    client.on_connect = on_connect
    client.on_message = on_message
    while not connected_flag:
        try:
            client.connect(mqtt_server, mqtt_port, 60)
            connected_flag = 1
        except Exception as exc:
            logging.error("MQTT connection error: " + str(exc))
    client.loop_forever()

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *