diff --git a/resources/spyglass.conf b/resources/spyglass.conf index b68e532..23b18bb 100644 --- a/resources/spyglass.conf +++ b/resources/spyglass.conf @@ -83,3 +83,38 @@ CONTROLS="" #### NOTE: Name of the file to be used to apply tuning filter. #### If dir not defined, default pycamera2 directories will be used. # TUNING_FILTER="ov5647_noir.json" + +#### MJPEG encoder linger (INTEGER)[default: -1] +#### NOTE: Seconds the MJPEG encoder (and the camera, when no other +#### encoder is active) keeps running after the last consumer +#### disconnects. Use 0 or a small positive value to reduce idle +#### CPU at the cost of cold-start latency on the next /snapshot +#### or /stream. +#### -1 keeps the encoder running once started; spyglass +#### pre-warms it at startup. Preserves the legacy "always +#### on" behavior and the lowest /snapshot latency (e.g. +#### for timelapses). +#### 0 stops the encoder immediately when the last consumer +#### disconnects. Lowest idle CPU. +#### > 0 stops the encoder N seconds after the last consumer +#### disconnects; a fresh request within the window cancels +#### the stop. Bridges brief reconnects without paying +#### cold-start latency on each one. +# MJPEG_LINGER_SECONDS="-1" + +#### WebRTC encoder linger (INTEGER)[default: 5] +#### NOTE: Seconds the WebRTC (H264) encoder (and the camera, when no +#### other encoder is active) keeps running after the last peer +#### disconnects. Use 0 or a small positive value to reduce idle +#### CPU at the cost of cold-start latency on the next peer +#### connection. +#### -1 keeps the encoder running once started; spyglass +#### pre-warms it at startup. +#### 0 stops the encoder immediately when the last peer +#### disconnects. Lowest idle CPU; every new peer pays +#### cold-start latency. +#### > 0 (default: 5) stops the encoder N seconds after the +#### last peer disconnects; a fresh connection within the +#### window cancels the stop. Bridges brief reconnects +#### without paying cold-start latency on each one. +# WEBRTC_LINGER_SECONDS="5" diff --git a/scripts/spyglass b/scripts/spyglass index 59c3217..bf7ee1c 100755 --- a/scripts/spyglass +++ b/scripts/spyglass @@ -121,6 +121,8 @@ run_spyglass() { --orientation_exif "${ORIENTATION_EXIF:-h}" \ --tuning_filter "${TUNING_FILTER:-}"\ --tuning_filter_dir "${TUNING_FILTER_DIR:-}" \ + --mjpeg-linger-seconds "${MJPEG_LINGER_SECONDS:--1}" \ + --webrtc-linger-seconds "${WEBRTC_LINGER_SECONDS:-5}" \ --controls-string "${CONTROLS:-0=0}" # 0=0 to prevent error on empty string } diff --git a/spyglass/camera/camera.py b/spyglass/camera/camera.py index e28b4c3..92504a9 100644 --- a/spyglass/camera/camera.py +++ b/spyglass/camera/camera.py @@ -105,6 +105,8 @@ def start_and_run_server( webrtc_url="/webrtc", orientation_exif=0, use_sw_encoding=False, + mjpeg_linger_seconds=-1, + webrtc_linger_seconds=5, ): pass diff --git a/spyglass/camera/csi.py b/spyglass/camera/csi.py index 440ea71..9cc7a46 100644 --- a/spyglass/camera/csi.py +++ b/spyglass/camera/csi.py @@ -5,6 +5,7 @@ from picamera2.outputs import FileOutput from spyglass import WEBRTC_ENABLED, camera +from spyglass.camera.lazy_encoder import CameraSession, LazyEncoder from spyglass.server.http_server import StreamingHandler @@ -18,6 +19,8 @@ def start_and_run_server( webrtc_url="/webrtc", orientation_exif=0, use_sw_encoding=False, + mjpeg_linger_seconds=-1, + webrtc_linger_seconds=5, ): if _hw_encoder_available and not use_sw_encoding: from picamera2.encoders import MJPEGEncoder @@ -41,12 +44,33 @@ def get_frame(inner_self): output.condition.wait() return output.frame - self.picam2.start_encoder(MJPEGEncoder(), FileOutput(output)) + session = CameraSession(self.picam2) + mjpeg_encoder = LazyEncoder( + self.picam2, + MJPEGEncoder, + FileOutput(output), + session=session, + linger_seconds=mjpeg_linger_seconds, + ) + StreamingHandler.mjpeg_encoder = mjpeg_encoder if WEBRTC_ENABLED: from picamera2.encoders import H264Encoder - self.picam2.start_encoder(H264Encoder(), self.media_track) - self.picam2.start() + h264_encoder = LazyEncoder( + self.picam2, + H264Encoder, + self.media_track, + session=session, + linger_seconds=webrtc_linger_seconds, + ) + StreamingHandler.h264_encoder = h264_encoder + else: + StreamingHandler.h264_encoder = None + + if mjpeg_linger_seconds < 0: + mjpeg_encoder.acquire() + if WEBRTC_ENABLED and webrtc_linger_seconds < 0: + h264_encoder.acquire() self._run_server( bind_address, @@ -60,4 +84,11 @@ def get_frame(inner_self): ) def stop(self): - self.picam2.stop_recording() + try: + self.picam2.stop_encoder() + except Exception: + pass + try: + self.picam2.stop() + except Exception: + pass diff --git a/spyglass/camera/lazy_encoder.py b/spyglass/camera/lazy_encoder.py new file mode 100644 index 0000000..54d4a01 --- /dev/null +++ b/spyglass/camera/lazy_encoder.py @@ -0,0 +1,156 @@ +"""Reference-counted lazy start/stop wrappers for picamera2. + +CameraSession wraps Picamera2.start()/stop(): the camera only runs while +at least one consumer (encoder) holds a reference. + +LazyEncoder wraps Picamera2.start_encoder()/stop_encoder(): the encoder +only runs while at least one consumer (HTTP stream / snapshot / WebRTC +peer connection) holds a reference. Each LazyEncoder also holds a +reference on the CameraSession while running, so the camera itself +turns off when no encoders are active. + +LazyEncoder supports a ``linger_seconds`` parameter: + +* ``< 0`` keeps the encoder running once started; subsequent releases that + drive the ref-count to zero do not stop it. Useful for the MJPEG path + when paired with a startup pre-warm so e.g. timelapse snapshots stay on + the warm path. +* ``0`` stops the encoder immediately when the last consumer releases. +* ``> 0`` schedules a delayed stop; a fresh acquire within the window + cancels the pending stop. Useful to bridge brief reconnects without + paying the cold-start cost on every reconnect. +""" + +import threading + + +class CameraSession: + def __init__(self, picam2): + self._picam2 = picam2 + self._refs = 0 + self._lock = threading.Lock() + + def acquire(self): + with self._lock: + self._refs += 1 + if self._refs > 1: + return + try: + self._picam2.start() + except Exception: + self._refs -= 1 + raise + + def release(self): + with self._lock: + if self._refs == 0: + return + self._refs -= 1 + if self._refs == 0: + self._picam2.stop() + + +class LazyEncoder: + def __init__( + self, + picam2, + encoder_factory, + output, + session=None, + linger_seconds=0, + ): + """ + :param picam2: the Picamera2 instance to start/stop the encoder on. + :param encoder_factory: zero-arg callable returning a fresh Encoder. + :param output: the picamera2 Output to attach to the encoder. + :param session: optional CameraSession. If provided, the camera is + started/stopped together with the encoder so the camera only runs + when at least one encoder is active. + :param linger_seconds: behavior when the last consumer releases. ``0`` + stops immediately; ``>0`` schedules a stop that is cancelled if a + new consumer acquires within the window; ``<0`` keeps the encoder + running forever after the first start. + """ + self._picam2 = picam2 + self._encoder_factory = encoder_factory + self._output = output + self._session = session + self._linger_seconds = linger_seconds + self._encoder = None + self._refs = 0 + self._lock = threading.Lock() + self._stop_timer = None + self._stop_token = 0 + + def acquire(self): + with self._lock: + self._cancel_linger_locked() + self._refs += 1 + if self._encoder is not None: + return + session_acquired = False + try: + if self._session is not None: + self._session.acquire() + session_acquired = True + self._encoder = self._encoder_factory() + self._picam2.start_encoder(self._encoder, self._output) + except Exception: + self._refs -= 1 + self._encoder = None + if session_acquired and self._session is not None: + self._session.release() + raise + + def release(self): + with self._lock: + if self._refs == 0: + return + self._refs -= 1 + if self._refs > 0 or self._linger_seconds < 0: + return + if self._linger_seconds == 0: + self._stop_now_locked() + else: + self._schedule_linger_locked() + + def _stop_now_locked(self): + encoder = self._encoder + self._encoder = None + try: + self._picam2.stop_encoder(encoder) + finally: + if self._session is not None: + self._session.release() + + def _cancel_linger_locked(self): + if self._stop_timer is None: + return + self._stop_timer.cancel() + self._stop_timer = None + self._stop_token += 1 + + def _schedule_linger_locked(self): + self._stop_token += 1 + token = self._stop_token + timer = threading.Timer( + self._linger_seconds, self._linger_callback, args=(token,) + ) + timer.daemon = True + self._stop_timer = timer + timer.start() + + def _linger_callback(self, token): + with self._lock: + if self._stop_token != token: + return + self._stop_timer = None + if self._refs == 0 and self._encoder is not None: + self._stop_now_locked() + + def __enter__(self): + self.acquire() + return self + + def __exit__(self, *exc): + self.release() diff --git a/spyglass/camera/usb.py b/spyglass/camera/usb.py index 6a0e7c6..4a87a27 100644 --- a/spyglass/camera/usb.py +++ b/spyglass/camera/usb.py @@ -12,6 +12,8 @@ def start_and_run_server( webrtc_url="/webrtc", orientation_exif=0, use_sw_encoding=False, + mjpeg_linger_seconds=-1, + webrtc_linger_seconds=5, ): def get_frame(inner_self): # TODO: Cuts framerate in 1/n with n streams open, add some kind of buffer diff --git a/spyglass/cli.py b/spyglass/cli.py index b88b70f..160ac85 100644 --- a/spyglass/cli.py +++ b/spyglass/cli.py @@ -100,6 +100,8 @@ def main(args=None): parsed_args.webrtc_url, parsed_args.orientation_exif, use_sw_encoding, + parsed_args.mjpeg_linger_seconds, + parsed_args.webrtc_linger_seconds, ) finally: cam.stop() @@ -346,6 +348,45 @@ def get_parser(): action="store_true", help="List available camera controls and exits.", ) + parser.add_argument( + "--mjpeg-linger-seconds", + type=int, + default=-1, + help="How long the MJPEG encoder (and the camera, when no other " + "encoder is active) keeps running after the last consumer " + "disconnects. Use 0 or a small positive value to free encoder and " + "camera resources while idle, reducing CPU use at the cost of " + "cold-start latency on the next /snapshot or /stream.\n" + " -1 (default) keeps the encoder running once started; spyglass " + "pre-warms it at startup. Preserves the legacy 'always on' " + "behavior and the lowest /snapshot latency (e.g. for timelapse " + "use cases).\n" + " 0 stops the encoder immediately when the last consumer " + "disconnects. Lowest idle CPU.\n" + " > 0 stops the encoder N seconds after the last consumer " + "disconnects; a fresh request within the window cancels the stop. " + "Bridges brief reconnects without paying cold-start latency on " + "each one.", + ) + parser.add_argument( + "--webrtc-linger-seconds", + type=int, + default=5, + help="How long the WebRTC (H264) encoder (and the camera, when no " + "other encoder is active) keeps running after the last peer " + "disconnects. Use 0 or a small positive value to free encoder and " + "camera resources while idle, reducing CPU use at the cost of " + "cold-start latency on the next peer connection.\n" + " -1 keeps the encoder running once started; spyglass pre-warms " + "it at startup.\n" + " 0 stops the encoder immediately when the last peer " + "disconnects. Lowest idle CPU; every new peer pays cold-start " + "latency.\n" + " > 0 (default: 5) stops the encoder N seconds after the last " + "peer disconnects; a fresh connection within the window cancels " + "the stop. Bridges brief reconnects without paying cold-start " + "latency on each one.", + ) camera_group = parser.add_mutually_exclusive_group() camera_group.add_argument( "-n", diff --git a/spyglass/server/jpeg.py b/spyglass/server/jpeg.py index 78bdb8e..12e2ba7 100644 --- a/spyglass/server/jpeg.py +++ b/spyglass/server/jpeg.py @@ -10,6 +10,9 @@ def start_streaming(handler: "StreamingHandler"): + encoder = getattr(handler, "mjpeg_encoder", None) + if encoder is not None: + encoder.acquire() try: send_default_headers(handler) handler.send_header("Content-Type", "multipart/x-mixed-replace; boundary=FRAME") @@ -30,9 +33,15 @@ def start_streaming(handler: "StreamingHandler"): logger.warning( "Removed streaming client %s: %s", handler.client_address, str(e) ) + finally: + if encoder is not None: + encoder.release() def send_snapshot(handler: "StreamingHandler"): + encoder = getattr(handler, "mjpeg_encoder", None) + if encoder is not None: + encoder.acquire() try: send_default_headers(handler) frame = handler.get_frame() @@ -45,6 +54,9 @@ def send_snapshot(handler: "StreamingHandler"): handler.wfile.write(frame[2:]) except Exception as e: logger.warning("Removed client %s: %s", handler.client_address, str(e)) + finally: + if encoder is not None: + encoder.release() def send_default_headers(handler: "StreamingHandler"): diff --git a/spyglass/server/webrtc_whep.py b/spyglass/server/webrtc_whep.py index 0f0c0e9..0e2c1a3 100644 --- a/spyglass/server/webrtc_whep.py +++ b/spyglass/server/webrtc_whep.py @@ -77,46 +77,66 @@ async def do_POST_async(handler: "StreamingHandler"): pc = RTCPeerConnection() secret = uuid.uuid4() + h264_encoder = getattr(handler, "h264_encoder", None) + encoder_acquired = False + + def _cleanup(): + nonlocal encoder_acquired + pcs.pop(str(secret), None) + if h264_encoder is not None and encoder_acquired: + encoder_acquired = False + h264_encoder.release() + @pc.on("connectionstatechange") async def on_connectionstatechange(): print(f"Connection state {pc.connectionState}") if pc.connectionState == "failed": await pc.close() elif pc.connectionState == "closed": - pcs.pop(str(secret)) + _cleanup() print(f"{len(pcs)} connections still open.") - pcs[str(secret)] = pc - track = media_relay.subscribe(handler.media_track) - sender = pc.addTrack(track) - codecs = RTCRtpSender.getCapabilities("video").codecs - transceiver = next(t for t in pc.getTransceivers() if t.sender == sender) - transceiver.setCodecPreferences( - [codec for codec in codecs if codec.mimeType == "video/H264"] - ) - - await pc.setRemoteDescription(offer) - answer = await pc.createAnswer() - await pc.setLocalDescription(answer) - - while pc.iceGatheringState != "complete": - await asyncio.sleep(1) - - send_default_headers(HTTPStatus.CREATED, handler) - - handler.send_header("Content-Type", "application/sdp") - handler.send_header("ETag", "*") - - handler.send_header("ID", secret) - handler.send_header( - "Access-Control-Expose-Headers", "ETag, ID, Accept-Patch, Link, Location" - ) - handler.send_header("Accept-Patch", "application/trickle-ice-sdpfrag") - handler.headers["Link"] = get_ICE_servers() - handler.send_header("Location", f"/whep/{secret}") - handler.send_header("Content-Length", len(pc.localDescription.sdp)) - handler.end_headers() - handler.wfile.write(bytes(pc.localDescription.sdp, "utf-8")) + try: + if h264_encoder is not None: + h264_encoder.acquire() + encoder_acquired = True + + pcs[str(secret)] = pc + + track = media_relay.subscribe(handler.media_track) + sender = pc.addTrack(track) + codecs = RTCRtpSender.getCapabilities("video").codecs + transceiver = next(t for t in pc.getTransceivers() if t.sender == sender) + transceiver.setCodecPreferences( + [codec for codec in codecs if codec.mimeType == "video/H264"] + ) + + await pc.setRemoteDescription(offer) + answer = await pc.createAnswer() + await pc.setLocalDescription(answer) + + while pc.iceGatheringState != "complete": + await asyncio.sleep(1) + + send_default_headers(HTTPStatus.CREATED, handler) + + handler.send_header("Content-Type", "application/sdp") + handler.send_header("ETag", "*") + + handler.send_header("ID", secret) + handler.send_header( + "Access-Control-Expose-Headers", "ETag, ID, Accept-Patch, Link, Location" + ) + handler.send_header("Accept-Patch", "application/trickle-ice-sdpfrag") + handler.headers["Link"] = get_ICE_servers() + handler.send_header("Location", f"/whep/{secret}") + handler.send_header("Content-Length", len(pc.localDescription.sdp)) + handler.end_headers() + handler.wfile.write(bytes(pc.localDescription.sdp, "utf-8")) + except Exception: + _cleanup() + await pc.close() + raise async def do_PATCH_async(streaming_handler: "StreamingHandler"): diff --git a/tests/test_cli.py b/tests/test_cli.py index 925f6df..80550a4 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -22,6 +22,8 @@ DEFAULT_TUNING_FILTER = None DEFAULT_TUNING_FILTER_DIR = None DEFAULT_CAMERA_NUM = 0 +DEFAULT_MJPEG_LINGER_SECONDS = -1 +DEFAULT_WEBRTC_LINGER_SECONDS = 5 mock_libcamera = MagicMock() @@ -206,7 +208,15 @@ def test_run_server_with_configuration_from_arguments(mock_init_camera): ) cam_instance = mock_init_camera.return_value cam_instance.start_and_run_server.assert_called_once_with( - "1.2.3.4", 1234, "streaming-url", "snapshot-url", "webrtc-url", 1, True + "1.2.3.4", + 1234, + "streaming-url", + "snapshot-url", + "webrtc-url", + 1, + True, + DEFAULT_MJPEG_LINGER_SECONDS, + DEFAULT_WEBRTC_LINGER_SECONDS, ) @@ -253,4 +263,6 @@ def test_run_server_with_orientation(mock_init_camera, input_value, expected_out "webrtc-url", expected_output, True, + DEFAULT_MJPEG_LINGER_SECONDS, + DEFAULT_WEBRTC_LINGER_SECONDS, ) diff --git a/tests/test_lazy_encoder.py b/tests/test_lazy_encoder.py new file mode 100644 index 0000000..05b9c45 --- /dev/null +++ b/tests/test_lazy_encoder.py @@ -0,0 +1,195 @@ +import time +from unittest.mock import MagicMock + +import pytest + +from spyglass.camera.lazy_encoder import CameraSession, LazyEncoder + + +def _make(picam2=None, *, session=None, linger_seconds=0): + if picam2 is None: + picam2 = MagicMock() + if session is None: + session = CameraSession(picam2) + encoder_factory = MagicMock() + output = MagicMock() + lazy = LazyEncoder( + picam2, + encoder_factory, + output, + session=session, + linger_seconds=linger_seconds, + ) + return lazy, picam2, session, encoder_factory, output + + +def test_first_acquire_starts_session_then_encoder(): + lazy, picam2, _, factory, output = _make() + lazy.acquire() + + picam2.start.assert_called_once_with() + picam2.start_encoder.assert_called_once_with(factory.return_value, output) + + +def test_extra_acquires_share_encoder_and_session(): + lazy, picam2, _, _, _ = _make() + lazy.acquire() + lazy.acquire() + lazy.acquire() + + picam2.start.assert_called_once() + picam2.start_encoder.assert_called_once() + + +def test_release_with_linger_zero_stops_immediately(): + lazy, picam2, _, _, _ = _make(linger_seconds=0) + lazy.acquire() + lazy.release() + + picam2.stop_encoder.assert_called_once() + picam2.stop.assert_called_once_with() + + +def test_release_with_linger_negative_keeps_running(): + lazy, picam2, _, _, _ = _make(linger_seconds=-1) + lazy.acquire() + lazy.release() + + picam2.stop_encoder.assert_not_called() + picam2.stop.assert_not_called() + + +def test_linger_negative_subsequent_acquire_does_not_restart_encoder(): + lazy, picam2, _, _, _ = _make(linger_seconds=-1) + lazy.acquire() + lazy.release() + lazy.acquire() + + picam2.start.assert_called_once() + picam2.start_encoder.assert_called_once() + + +def test_release_with_positive_linger_does_not_stop_immediately(): + lazy, picam2, _, _, _ = _make(linger_seconds=10) + lazy.acquire() + lazy.release() + + assert lazy._stop_timer is not None + picam2.stop_encoder.assert_not_called() + picam2.stop.assert_not_called() + + lazy._stop_timer.cancel() # clean up so the test process exits cleanly + + +def test_acquire_cancels_pending_linger_stop(): + lazy, picam2, _, _, _ = _make(linger_seconds=10) + lazy.acquire() + lazy.release() + assert lazy._stop_timer is not None + + lazy.acquire() + assert lazy._stop_timer is None + picam2.stop_encoder.assert_not_called() + + +def test_positive_linger_eventually_stops(): + lazy, picam2, _, _, _ = _make(linger_seconds=0.05) + lazy.acquire() + lazy.release() + + deadline = time.monotonic() + 1.0 + while time.monotonic() < deadline and picam2.stop_encoder.call_count == 0: + time.sleep(0.01) + + picam2.stop_encoder.assert_called_once() + picam2.stop.assert_called_once() + + +def test_release_at_zero_refs_is_noop(): + lazy, picam2, _, _, _ = _make(linger_seconds=0) + lazy.release() # never acquired + + picam2.start.assert_not_called() + picam2.stop_encoder.assert_not_called() + + +def test_acquire_rolls_back_when_start_encoder_fails(): + picam2 = MagicMock() + picam2.start_encoder.side_effect = RuntimeError("boom") + lazy, _, session, _, _ = _make(picam2=picam2, linger_seconds=0) + + with pytest.raises(RuntimeError): + lazy.acquire() + + # Both refs rolled back; camera was stopped via session.release(). + assert session._refs == 0 + picam2.start.assert_called_once() + picam2.stop.assert_called_once() + + # A retry succeeds. + picam2.start_encoder.side_effect = None + lazy.acquire() + assert picam2.start_encoder.call_count == 2 + + +def test_release_still_releases_session_if_stop_encoder_raises(): + picam2 = MagicMock() + picam2.stop_encoder.side_effect = RuntimeError("boom") + lazy, _, session, _, _ = _make(picam2=picam2, linger_seconds=0) + + lazy.acquire() + with pytest.raises(RuntimeError): + lazy.release() + + assert session._refs == 0 + picam2.stop.assert_called_once() + + +def test_two_encoders_share_one_session(): + picam2 = MagicMock() + session = CameraSession(picam2) + mjpeg, _, _, _, _ = _make(picam2=picam2, session=session, linger_seconds=0) + h264, _, _, _, _ = _make(picam2=picam2, session=session, linger_seconds=0) + + mjpeg.acquire() + h264.acquire() + picam2.start.assert_called_once() + + mjpeg.release() + picam2.stop.assert_not_called() + + h264.release() + picam2.stop.assert_called_once() + + +def test_stale_timer_callback_after_cancel_is_ignored(): + lazy, picam2, _, _, _ = _make(linger_seconds=0.05) + lazy.acquire() + lazy.release() + # Acquire before the timer can fire to cancel the stop. + lazy.acquire() + + # Give a stale callback time to try to run. + time.sleep(0.15) + + picam2.stop_encoder.assert_not_called() + picam2.stop.assert_not_called() + + +def test_reacquire_after_full_stop_starts_again(): + lazy, picam2, _, _, _ = _make(linger_seconds=0) + lazy.acquire() + lazy.release() + lazy.acquire() + + assert picam2.start.call_count == 2 + assert picam2.start_encoder.call_count == 2 + + +def test_context_manager_acquires_and_releases(): + lazy, picam2, _, _, _ = _make(linger_seconds=0) + with lazy: + picam2.start_encoder.assert_called_once() + picam2.stop_encoder.assert_not_called() + + picam2.stop_encoder.assert_called_once()