别再只调RTSP了!Python + ONVIF 实现摄像头抓图、预置点巡航的自动化脚本教程

张开发
2026/4/22 12:03:47 15 分钟阅读
别再只调RTSP了!Python + ONVIF 实现摄像头抓图、预置点巡航的自动化脚本教程
PythonONVIF实战突破RTSP限制打造智能摄像头控制系统监控摄像头早已不再是简单的看设备而是智能化管理的重要节点。许多开发者习惯使用RTSP协议获取视频流却忽略了ONVIF协议在设备控制方面的强大能力。本文将带你深入探索如何用PythonONVIF构建一个功能完备的摄像头控制系统实现定时抓图、预置点巡航等高级功能。1. ONVIF协议的核心价值与应用场景RTSP协议解决了视频流的获取问题但在实际项目中我们往往需要对摄像头进行更精细化的控制。这正是ONVIF协议的用武之地——它提供了一套标准化的网络视频设备控制接口。ONVIF的核心优势主要体现在三个方面设备发现与配置自动发现网络中的兼容设备获取设备信息和服务端点PTZ控制实现云台摄像头的平移、倾斜和变焦操作事件处理接收和处理设备触发的事件如移动侦测、输入触发等在实际应用中ONVIF特别适合以下场景智能巡检系统定时控制摄像头转动到预设位置进行画面采集安防监控当传感器触发时自动调整摄像头角度捕捉关键画面视频分析在不同预置点间切换以获取最佳分析视角提示ONVIF协议基于SOAP实现使用WSDL定义服务接口这为不同厂商的设备提供了统一的控制方式。2. 环境搭建与基础配置2.1 Python ONVIF库的选择与安装Python生态中有多个ONVIF客户端库可供选择当前最主流的是python-onvif-zeeppip install onvif-zeep这个库基于zeep SOAP客户端实现支持Python 3.x版本。如果你仍在使用Python 2.x则需要安装旧版的python-onvif。2.2 摄像头准备工作在使用ONVIF控制摄像头前需要确保摄像头支持ONVIF协议大多数主流品牌都支持ONVIF功能已在摄像头设置中启用创建了具有ONVIF权限的用户账户常见品牌摄像头的ONVIF默认端口品牌默认端口备注Hikvision80部分型号使用8000Dahua80Axis80Bosch802.3 基础连接测试以下代码展示了如何建立与摄像头的基础连接from onvif import ONVIFCamera def connect_camera(ip, port, username, password): try: mycam ONVIFCamera(ip, port, username, password) media_service mycam.create_media_service() profiles media_service.GetProfiles() print(f成功连接到摄像头找到{len(profiles)}个配置档) return mycam except Exception as e: print(f连接失败: {str(e)}) return None3. 核心功能实现3.1 定时抓图功能RTSP可以获取视频流但ONVIF提供了更直接的抓图接口。以下是实现定时抓图的完整方案import time from datetime import datetime from onvif import ONVIFCamera class CameraController: def __init__(self, ip, port, username, password): self.camera ONVIFCamera(ip, port, username, password) self.media self.camera.create_media_service() self.media_profile self.media.GetProfiles()[0] def capture_image(self, save_pathNone): 抓取当前画面并保存 if not save_path: timestamp datetime.now().strftime(%Y%m%d_%H%M%S) save_path fcapture_{timestamp}.jpg request self.media.create_type(GetSnapshotUri) request.ProfileToken self.media_profile.token snapshot_uri self.media.GetSnapshotUri(request) # 使用requests下载图片 import requests from requests.auth import HTTPDigestAuth response requests.get( snapshot_uri.Uri, authHTTPDigestAuth(self.camera.username, self.camera.password) ) with open(save_path, wb) as f: f.write(response.content) return save_path # 定时抓图示例 controller CameraController(192.168.1.64, 80, admin, password) for i in range(5): # 每隔10秒抓一次图共5次 controller.capture_image() time.sleep(10)3.2 预置点管理与调用预置点是摄像头控制中最实用的功能之一它允许你保存和快速调用特定的视角位置。class PTZController: def __init__(self, camera): self.ptz camera.create_ptz_service() self.media_profile camera.create_media_service().GetProfiles()[0] def get_presets(self): 获取所有预置点 request self.ptz.create_type(GetPresets) request.ProfileToken self.media_profile.token return self.ptz.GetPresets(request) def goto_preset(self, preset_token): 移动到指定预置点 request self.ptz.create_type(GotoPreset) request.ProfileToken self.media_profile.token request.PresetToken preset_token self.ptz.GotoPreset(request) def set_preset(self, preset_name): 设置当前为预置点 request self.ptz.create_type(SetPreset) request.ProfileToken self.media_profile.token request.PresetName preset_name return self.ptz.SetPreset(request)3.3 自动巡航实现结合预置点和定时功能可以实现摄像头自动巡航import schedule import time def preset_cruise(controller, interval30): 预置点巡航 presets controller.get_presets() current_index 0 def move_to_next(): nonlocal current_index if not presets: print(没有找到预置点) return preset presets[current_index % len(presets)] print(f移动到预置点: {preset.Name}) controller.goto_preset(preset.token) controller.capture_image(fpreset_{preset.Name}_{int(time.time())}.jpg) current_index 1 # 设置定时任务 schedule.every(interval).seconds.do(move_to_next) print(f开始预置点巡航每{interval}秒切换一次) while True: schedule.run_pending() time.sleep(1)4. 高级功能与优化4.1 异常处理与重试机制网络设备控制中稳定的异常处理至关重要from functools import wraps from time import sleep def retry(max_attempts3, delay1): 重试装饰器 def decorator(func): wraps(func) def wrapper(*args, **kwargs): last_exception None for attempt in range(max_attempts): try: return func(*args, **kwargs) except Exception as e: last_exception e print(f尝试 {attempt 1} 失败: {str(e)}) sleep(delay) raise last_exception or Exception(未知错误) return wrapper return decorator class RobustCameraController(CameraController): retry(max_attempts3, delay2) def capture_image(self, save_pathNone): return super().capture_image(save_path) retry(max_attempts2, delay1) def goto_preset(self, preset_token): return super().goto_preset(preset_token)4.2 性能优化技巧连接复用避免频繁创建和销毁服务对象异步操作使用asyncio提高多摄像头控制效率批量操作减少单独请求次数异步控制示例import asyncio from onvif.client import AsyncONVIFCamera async def async_capture(camera, path): media await camera.create_media_service() profile (await media.GetProfiles())[0] snapshot_uri await media.GetSnapshotUri({ ProfileToken: profile.token }) # 异步下载图片 # ... (使用aiohttp等异步HTTP客户端)4.3 与其他技术集成ONVIF常需要与RTSP、OpenCV等技术配合使用import cv2 class HybridCameraController: def __init__(self, onvif_camera, rtsp_url): self.onvif onvif_camera self.rtsp_url rtsp_url def get_live_frame(self): 获取实时视频帧 cap cv2.VideoCapture(self.rtsp_url) ret, frame cap.read() cap.release() return frame if ret else None def analyze_and_react(self): 分析画面并做出反应 frame self.get_live_frame() if frame is None: return # 简单的移动检测 gray cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) gray cv2.GaussianBlur(gray, (21, 21), 0) if not hasattr(self, background): self.background gray return frame_delta cv2.absdiff(self.background, gray) thresh cv2.threshold(frame_delta, 25, 255, cv2.THRESH_BINARY)[1] if cv2.countNonZero(thresh) 1000: # 检测到移动 self.onvif.capture_image(motion_detected.jpg)5. 实战案例智能园区监控系统假设我们要为一个园区部署智能监控系统需求包括每小时对重点区域自动拍照存档每天夜间执行预置点巡航检测到异常时自动调整摄像头角度并记录实现方案class SmartCampusMonitor: def __init__(self, cameras): self.cameras cameras # 多个摄像头控制器列表 def start_daily_routine(self): # 设置定时任务 schedule.every().hour.do(self.capture_key_areas) schedule.every().day.at(20:00).do(self.night_patrol) while True: schedule.run_pending() time.sleep(1) def capture_key_areas(self): for i, cam in enumerate(self.cameras): try: filename farea_{i1}_{time.strftime(%Y%m%d_%H%M)}.jpg cam.capture_image(filename) print(f区域{i1}已存档: {filename}) except Exception as e: print(f区域{i1}存档失败: {str(e)}) def night_patrol(self): print(开始夜间巡逻模式) for cam in self.cameras: try: preset_cruise(cam, interval45) except Exception as e: print(f巡逻异常: {str(e)})在实际项目中我们还需要考虑日志记录、异常报警、配置管理等功能。可以将这些控制逻辑封装为服务通过REST API或MQTT协议提供外部接口。

更多文章