|
| 1 | +# SPDX-FileCopyrightText: 2026 John Park for Adafruit Industries |
| 2 | +# SPDX-License-Identifier: MIT |
| 3 | +""" |
| 4 | +MagTag ePaper Camera Display - receives and displays images from Adafruit IO |
| 5 | +Optimized for battery operation with deep sleep between checks |
| 6 | +""" |
| 7 | + |
| 8 | +import binascii |
| 9 | +import gc |
| 10 | +import os |
| 11 | +import ssl |
| 12 | +import time |
| 13 | +import traceback |
| 14 | +from io import BytesIO |
| 15 | + |
| 16 | +import adafruit_imageload |
| 17 | +import adafruit_requests |
| 18 | +import alarm |
| 19 | +import displayio |
| 20 | +import socketpool |
| 21 | +import wifi |
| 22 | +from adafruit_io.adafruit_io import IO_HTTP |
| 23 | +from adafruit_magtag.magtag import MagTag |
| 24 | + |
| 25 | +# ============ USER CONFIGURATION ============ |
| 26 | +# Sleep interval in seconds between image checks |
| 27 | +# Examples: 60 = 1 min, 300 = 5 min, 900 = 15 min, 3600 = 1 hour |
| 28 | +SLEEP_INTERVAL = 300 # Start with 300 seconds for testing |
| 29 | +# ============================================ |
| 30 | + |
| 31 | +print("MagTag ePaper Camera Display (Deep Sleep Mode)") |
| 32 | +print(f"Sleep interval: {SLEEP_INTERVAL} seconds") |
| 33 | + |
| 34 | +# Check if we're waking from deep sleep |
| 35 | +if alarm.wake_alarm: |
| 36 | + print("Woke from deep sleep") |
| 37 | +else: |
| 38 | + print("Cold boot - clearing sleep memory") |
| 39 | + # Clear sleep memory on cold boot |
| 40 | + alarm.sleep_memory[0:100] = b"\x00" * 100 |
| 41 | + |
| 42 | +# Initialize MagTag |
| 43 | +magtag = MagTag() |
| 44 | + |
| 45 | +# Set NeoPixels brightness and turn amber on power up |
| 46 | +magtag.peripherals.neopixels.brightness = 0.1 |
| 47 | +magtag.peripherals.neopixels.fill(0x202000) # Amber |
| 48 | + |
| 49 | +# WiFi credentials from settings.toml |
| 50 | +print(f"Connecting to {os.getenv('CIRCUITPY_WIFI_SSID')}") |
| 51 | +wifi.radio.connect( |
| 52 | + os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv("CIRCUITPY_WIFI_PASSWORD") |
| 53 | +) |
| 54 | +print(f"Connected to {os.getenv('CIRCUITPY_WIFI_SSID')}!") |
| 55 | + |
| 56 | +# Turn cyan when connected to WiFi |
| 57 | +magtag.peripherals.neopixels.fill(0x002020) # Cyan |
| 58 | +time.sleep(0.3) |
| 59 | + |
| 60 | +# Set up Adafruit IO |
| 61 | +pool = socketpool.SocketPool(wifi.radio) |
| 62 | +requests = adafruit_requests.Session(pool, ssl.create_default_context()) |
| 63 | +aio_username = os.getenv("ADAFRUIT_AIO_USERNAME") |
| 64 | +aio_key = os.getenv("ADAFRUIT_AIO_KEY") |
| 65 | +io_client = IO_HTTP(aio_username, aio_key, requests) |
| 66 | + |
| 67 | +# Get the camera feed |
| 68 | +feed_camera = io_client.get_feed("epapercam") |
| 69 | + |
| 70 | +# Pre-calculate display dimensions |
| 71 | +DISPLAY_WIDTH = magtag.graphics.display.width # 296 |
| 72 | +DISPLAY_HEIGHT = magtag.graphics.display.height # 128 |
| 73 | + |
| 74 | + |
| 75 | +def get_last_timestamp(): |
| 76 | + """Retrieve the last image timestamp from sleep memory.""" |
| 77 | + try: |
| 78 | + # Read 100 bytes from sleep memory (enough for a timestamp string) |
| 79 | + stored = bytes(alarm.sleep_memory[0:100]).decode("utf-8").strip("\x00") |
| 80 | + if stored: |
| 81 | + print(f"Last stored timestamp: {stored}") |
| 82 | + return stored |
| 83 | + return None |
| 84 | + except (UnicodeDecodeError, AttributeError): |
| 85 | + return None |
| 86 | + |
| 87 | + |
| 88 | +def save_timestamp(timestamp): |
| 89 | + """Save the current image timestamp to sleep memory.""" |
| 90 | + try: |
| 91 | + # Encode timestamp as UTF-8 bytes and store in sleep memory |
| 92 | + timestamp_bytes = timestamp.encode("utf-8") |
| 93 | + # Pad with null bytes to fill 100 bytes |
| 94 | + padded = timestamp_bytes + b"\x00" * (100 - len(timestamp_bytes)) |
| 95 | + alarm.sleep_memory[0:100] = padded[:100] |
| 96 | + print(f"Saved timestamp to memory: {timestamp}") |
| 97 | + except Exception as e: # pylint: disable=broad-except |
| 98 | + print(f"Error saving timestamp: {e}") |
| 99 | + |
| 100 | + |
| 101 | +def scale_and_crop_image(src_bitmap, src_y_offset, scale, img_palette): |
| 102 | + """Scale and crop the source bitmap to display size.""" |
| 103 | + # Create display-sized bitmap |
| 104 | + display_bitmap = displayio.Bitmap(DISPLAY_WIDTH, DISPLAY_HEIGHT, len(img_palette)) |
| 105 | + |
| 106 | + # Scale and crop the image |
| 107 | + for y in range(DISPLAY_HEIGHT): |
| 108 | + src_y = int((y + src_y_offset) / scale) |
| 109 | + if src_y >= src_bitmap.height: |
| 110 | + src_y = src_bitmap.height - 1 |
| 111 | + for x in range(DISPLAY_WIDTH): |
| 112 | + src_x = int(x / scale) |
| 113 | + if src_x >= src_bitmap.width: |
| 114 | + src_x = src_bitmap.width - 1 |
| 115 | + display_bitmap[x, y] = src_bitmap[src_x, src_y] |
| 116 | + |
| 117 | + return display_bitmap |
| 118 | + |
| 119 | + |
| 120 | +def fetch_and_display_epaper_cam_image(data): |
| 121 | + """Display ePaper Camera image from already-fetched Adafruit IO data.""" |
| 122 | + try: |
| 123 | + # Force garbage collection before processing |
| 124 | + gc.collect() |
| 125 | + print(f"Free memory before processing: {gc.mem_free()} bytes") |
| 126 | + |
| 127 | + # Turn on NeoPixels to indicate processing |
| 128 | + magtag.peripherals.neopixels.fill(0x202020) # Dim white |
| 129 | + |
| 130 | + print("Processing image...") |
| 131 | + base64_image = data["value"] |
| 132 | + print(f"Received image data: {len(base64_image)} bytes (base64)") |
| 133 | + |
| 134 | + # Decode base64 to get GIF binary data |
| 135 | + gif_data = binascii.a2b_base64(base64_image) |
| 136 | + print(f"Decoded GIF: {len(gif_data)} bytes") |
| 137 | + |
| 138 | + # Free base64_image |
| 139 | + base64_image = None |
| 140 | + gc.collect() |
| 141 | + |
| 142 | + # Create a BytesIO wrapper for the GIF data |
| 143 | + gif_stream = BytesIO(gif_data) |
| 144 | + |
| 145 | + # Load with adafruit_imageload (supports GIF) |
| 146 | + bitmap, palette = adafruit_imageload.load( |
| 147 | + gif_stream, bitmap=displayio.Bitmap, palette=displayio.Palette |
| 148 | + ) |
| 149 | + print(f"Loaded image: {bitmap.width}x{bitmap.height}") |
| 150 | + |
| 151 | + # Free gif_data and gif_stream |
| 152 | + gif_data = None |
| 153 | + gif_stream = None |
| 154 | + gc.collect() |
| 155 | + |
| 156 | + # Calculate scale factor to fill display width |
| 157 | + scale = DISPLAY_WIDTH / bitmap.width |
| 158 | + scaled_height = int(bitmap.height * scale) |
| 159 | + |
| 160 | + print(f"Scaling to: {DISPLAY_WIDTH}x{scaled_height} (scale={scale:.2f})") |
| 161 | + |
| 162 | + # Calculate vertical offset for centering (if scaled image is taller than display) |
| 163 | + src_y_offset = ( |
| 164 | + (scaled_height - DISPLAY_HEIGHT) // 2 |
| 165 | + if scaled_height > DISPLAY_HEIGHT |
| 166 | + else 0 |
| 167 | + ) |
| 168 | + |
| 169 | + # Scale and crop the image using helper function |
| 170 | + display_bitmap = scale_and_crop_image(bitmap, src_y_offset, scale, palette) |
| 171 | + |
| 172 | + # Free original bitmap after scaling |
| 173 | + bitmap = None |
| 174 | + gc.collect() |
| 175 | + |
| 176 | + # Create a TileGrid for display |
| 177 | + tile_grid = displayio.TileGrid(display_bitmap, pixel_shader=palette, x=0, y=0) |
| 178 | + |
| 179 | + # Create a group and add the tile grid |
| 180 | + group = displayio.Group() |
| 181 | + group.append(tile_grid) |
| 182 | + |
| 183 | + # Display on the MagTag |
| 184 | + magtag.graphics.display.root_group = group |
| 185 | + |
| 186 | + print("Refreshing display...") |
| 187 | + magtag.graphics.display.refresh() |
| 188 | + |
| 189 | + print("Image displayed on MagTag!") |
| 190 | + |
| 191 | + # Turn off NeoPixels after successful display |
| 192 | + magtag.peripherals.neopixels.fill(0x000000) |
| 193 | + |
| 194 | + # Final garbage collection |
| 195 | + gc.collect() |
| 196 | + print(f"Free memory after processing: {gc.mem_free()} bytes") |
| 197 | + |
| 198 | + return True # Success |
| 199 | + |
| 200 | + except Exception as err: # pylint: disable=broad-except |
| 201 | + # Turn NeoPixels red to indicate error |
| 202 | + magtag.peripherals.neopixels.fill(0x200000) # Dim red |
| 203 | + time.sleep(2) |
| 204 | + magtag.peripherals.neopixels.fill(0x000000) |
| 205 | + |
| 206 | + print(f"Error: {err}") |
| 207 | + traceback.print_exception(type(err), err, err.__traceback__) |
| 208 | + |
| 209 | + # Garbage collection on error |
| 210 | + gc.collect() |
| 211 | + |
| 212 | + return False # Failure |
| 213 | + |
| 214 | + |
| 215 | +# Main execution - check for new image then deep sleep |
| 216 | +try: |
| 217 | + # Turn green while checking for new images |
| 218 | + magtag.peripherals.neopixels.fill(0x002000) # Green |
| 219 | + |
| 220 | + # Get last known timestamp |
| 221 | + last_timestamp = get_last_timestamp() |
| 222 | + |
| 223 | + # Fetch metadata to check timestamp |
| 224 | + print("Checking for new image...") |
| 225 | + feed_data = io_client.receive_data(feed_camera["key"]) |
| 226 | + current_timestamp = feed_data.get("updated_at") |
| 227 | + |
| 228 | + print(f"Current image timestamp: {current_timestamp}") |
| 229 | + |
| 230 | + # Compare timestamps |
| 231 | + if current_timestamp != last_timestamp: |
| 232 | + print("New image detected! Downloading and displaying...") |
| 233 | + success = fetch_and_display_epaper_cam_image(feed_data) |
| 234 | + |
| 235 | + if success: |
| 236 | + # Save the new timestamp |
| 237 | + save_timestamp(current_timestamp) |
| 238 | + else: |
| 239 | + print("Display failed - not updating stored timestamp") |
| 240 | + else: |
| 241 | + print("No new image - skipping download to save battery") |
| 242 | + # Turn off green LED since we're not doing anything |
| 243 | + magtag.peripherals.neopixels.fill(0x000000) |
| 244 | + |
| 245 | + print(f"Going to deep sleep for {SLEEP_INTERVAL} seconds...") |
| 246 | + time.sleep(1) # Brief delay to see the message |
| 247 | + |
| 248 | + # Create a time alarm for waking up |
| 249 | + time_alarm = alarm.time.TimeAlarm(monotonic_time=time.monotonic() + SLEEP_INTERVAL) |
| 250 | + |
| 251 | + # Enter deep sleep - this will restart the program when it wakes |
| 252 | + alarm.exit_and_deep_sleep_until_alarms(time_alarm) |
| 253 | + |
| 254 | +except Exception as main_err: # pylint: disable=broad-except |
| 255 | + print(f"Error in main execution: {main_err}") |
| 256 | + traceback.print_exception(type(main_err), main_err, main_err.__traceback__) |
| 257 | + |
| 258 | + # Even on error, go to sleep to preserve battery |
| 259 | + magtag.peripherals.neopixels.fill(0x200000) # Red error indicator |
| 260 | + time.sleep(2) |
| 261 | + magtag.peripherals.neopixels.fill(0x000000) |
| 262 | + |
| 263 | + print(f"Error - sleeping for {SLEEP_INTERVAL} seconds...") |
| 264 | + time.sleep(1) |
| 265 | + |
| 266 | + time_alarm = alarm.time.TimeAlarm(monotonic_time=time.monotonic() + SLEEP_INTERVAL) |
| 267 | + alarm.exit_and_deep_sleep_until_alarms(time_alarm) |
0 commit comments