Automatic SD Card Reading, Uploading, Cleaning

UX is in fact important. Here is some acceptable UX.

Date
Author
Tags home automation

  • https://wiki.archlinux.org/title/Udev
    • https://askubuntu.com/questions/1239044/how-to-get-script-to-always-launch-on-device-connect
    • https://stackoverflow.com/questions/73648490/how-can-i-create-a-udev-rule-to-run-a-script-when-usb-store-is-plugged-in
  • our rgb matrix
  • https://github.com/Ferk/udev-media-automount/
  • https://github.com/Ferk/udev-media-automount/blob/master/media-automount.rules

Implementation

Could find out the udev attr of a specific device with

udevadm info --attribute-walk --path=$(udevadm info --query=path --name=/dev/sdb1)

This was sufficient to work for our sd card, the idrod/vendor are for the adapter, and then the subsystem=block should select only block devices attached to that.

# cat /etc/udev/rules.d/98-camera.rules
ACTION=="add", ATTRS{idProduct}=="8816", ATTRS{idVendor}=="aaaa", SUBSYSTEMS=="usb", SUBSYSTEM=="block", RUN+="/usr/bin/systemctl --no-block restart media-automount@%k.service"
ACTION=="remove", ATTRS{idProduct}=="8816", ATTRS{idVendor}=="aaaa", SUBSYSTEMS=="usb", SUBSYSTEM=="block", RUN+="/usr/bin/systemctl --no-block restart media-automount@%k.service"

Systemd

The suggestion of forking didn’t actually work for me, it would get killed after ~120s. Simple, however, ran to completion.

# cat /etc/systemd/system/media-automount@.service 
[Service]
Type=simple
ExecStart=/usr/bin/media-automount %I

layers on layers

#!/bin/bash

echo "$(date) automount $@" >> /tmp/run
python /home/user/sd-mount-message.py $@

on layers

# cat /home/user/sd-mount-message.py 
#!/usr/bin/env python
import time
import requests
import subprocess
import glob
import sys
import os
from sense_hat import SenseHat

DEV = sys.argv[1]
path = f'/dev/{DEV}'

if not os.path.exists(path):
    sys.exit()

sense = SenseHat()
sense.show_message("OK!", text_colour=[255, 0, 0])

subprocess.check_call(['mount', path, '/mnt/photos/'])

try:
    files = glob.glob("/mnt/photos/DCIM/**/*.JPG")
    print(files)
    for i, fn in enumerate(files):
        xo = 64 * i / len(files)
        x = int(xo % 8)
        y = int(xo // 8)
        print(f"Processing {fn} {x} {y}")

        sense.set_pixel(x, y, 0, 0, 128)
        if os.path.exists(fn):
            try:
                subprocess.check_call(['python', '/home/olly/upload-to-immich.py', fn])
            except Exception as e:
                print(e)
            sense.set_pixel(x, y, 255, 0, 0)
        else:
            break
            sense.show_message("Detached!", text_colour=[0, 0, 255])
except Exception as e:
    print(e)
    sense.show_message("Error!", text_colour=[0, 0, 255])
finally:
    subprocess.check_call(['umount', '/mnt/photos/'])
    sense.show_message("Unmounted!", text_colour=[255, 0, 255])

I think i’ve not handled nearly enough edge cases here but it works for now. works well enough.

the upload-to-immich then clears out the image post upload and it seems to work honestly fine?

# cat /home/user/upload-to-immich.py 
#!/usr/bin/env python
import sys
import os

fn = sys.argv[1]
if not os.path.exists(fn):
    sys.exit(1)

import requests
import json
import datetime


IMMICH_PRV = "http://ip:2283"
IMMICH_PUB = "https://fqdb"
IMMICH_ALBUM "
IMMICH_APIKEY = ''

files = {'assetData': open(fn, 'rb')}

try:
    asset_id = 'olly-' + datetime.datetime.today().strftime("%Y-%m-%d-") + fn[fn.index('DCIM'):].replace('/', '-')
except:
    asset_id = 'olly-' + datetime.datetime.today().strftime("%Y-%m-%d-") + fn.replace('/', '-')


headers = {
    'X-API-Key': IMMICH_APIKEY,
    'Accept': 'application/json',
}

def u2r(t):
    return datetime.datetime.fromtimestamp(t, tz=datetime.timezone.utc).isoformat()

params = {
    "deviceId": 'olly-ad61-42cb-a22a-f7950b95b743',
    "deviceAssetId": asset_id,
    "fileCreatedAt": u2r(os.path.getctime(fn)),
    "fileModifiedAt": u2r(os.path.getmtime(fn)),
}

print("Sending")
r = requests.post(IMMICH_PRV + '/api/assets', headers=headers, files=files, data=params)
res = r.json()
if 'error' in res:
    print(res)
    raise Exception("Error " + res['message'])

id = res['id']

headers['content-type'] = 'application/json'
r = requests.put(IMMICH_PRV + '/api/albums/' + IMMICH_ALBUM + '/assets', headers=headers, data=json.dumps({'ids': [id]}))
res = r.json()

if 'error' in res:
    print(res)
    raise Exception("Error " + res['message'])

# If we've gotten here it's probably safe to unlink it.
os.unlink(fn)

Final UX

  1. Plug card in
  2. Ok!
  3. progress bar
  4. done, clean, unmounted by the time you’ve finished making a cup of coffee