Search the Community
Showing results for tags 'python'.
Found 2 results
-
Доброе время суток. Сегодня пойдет речь о том, как переправлять данные с MQTT брокера в базу данных MySQL. Транспортировать будем как сами адреса, так и значения всех топиков, на которые оформлена подписка. Данную задачу нельзя назвать распространенной, но все же, она имеет место быть и может пригодиться в том случае, если данные востребованы в системах не способных работать с MQTT протоколом самостоятельно или брокер находится в изолированной системе, а данные востребованы, например, в GUI за её приделами. Для осуществления задуманного нам потребуется самостоятельный процесс, который сыграет роль транспортного узла между MQTT брокером и базой данный MySQL. А значит, его придется где-то держать. В моем случае, это сервер под управлением операционной системой Linux Ubuntu 16.04.3 и дальнейшее описание будет под неё, но для других ОС действия аналогичные. Сам демон будет написан на Python и для его работы нам потребуется: python3 python-pip python-dev libmysqlclient-dev библиотека paho-mqtt для python https://pypi.python.org/pypi/paho-mqtt библиотека mysqlclient для python https://github.com/PyMySQL/mysqlclient-python Но начнем мы, в первую очередь, с подготовки базы данных. На плечи MySQL ляжет не только хранение, но и частичная обработка данных. Для этих целей нам потребуется отдельная база, хранимая процедура и функция с реализацией небольшой логики, и пользователь с ограниченными правами под чьим именем мы будем обращаться к ним. В конце поста, кроме самого демона, будет опубликован .sql файл, который достаточно просто импортировать, например с помощью стандартный средств базы данных mysql -uroot -p Вводим пароль администратора MySQL и импортируем наш .sql файл, но прежде, дочитайте статью до конца, возможно, Вы захотите внести свои изменения. mysql> source /media/mqttMySqlClient.sql После этого будет получен следующий результат: Создана база (схема) с именем mqtt Пользователь с именем mqtt-agent и паролем p@$$w0rd имеющий возможность подключаться с внешних адресов Пользователю будут назначены ограниченные права (только EXECUTE) в этой схеме Будет добавлена процедура update_topic, на которую ляжет задача добавления и обновления данных Будет добавлена функция get_topic для упрощения поиска данных На тот случай, если Вы захотите внести изменения или создать все ручками, рассмотрим содержимое sql файла. Если схема mqtt не существует, она будет создана CREATE DATABASE IF NOT EXISTS `mqtt`; Аналогичным образом будет создан пользователь mqtt-agent. Если необходимо конкретизировать, с какого адреса будет производиться подключение под этим пользователем, то замените % на доменное имя или ip адрес хоста. Если планируется использовать демона на том же сервере где установлен MySQL, замените % на localhost. Также разрешено не более 2 активных подключений, измените это значение на необходимое Вам. CREATE USER IF NOT EXISTS 'mqtt-agent'@'%' IDENTIFIED BY 'p@$$w0rd' WITH MAX_USER_CONNECTIONS 2; Пользователю будут выставлены ограниченные права. Ему будет разрешено пользоваться только хранимыми процедурами и функциями. Никакие самостоятельные запросы выполнять нельзя. GRANT EXECUTE ON `mqtt`.* TO 'mqtt-agent'@'%'; Переходим к работе с самой схемой USE `mqtt`; Будет создана таблица topics со следующей структурой. md5 - содержит уникальный одноименный хеш полученный из полного адреса топика. Именно по этому ключу и будет производиться поиск данных. Почему именно по нему, а не по самому имени? Дело в том, что md5 хеш имеет фиксированную, заранее известную, длину, что нельзя сказать о имени топика. Именно это ограничение не позволит сделать имя топика первичным ключом и явно идентифицировать данные в таблице. time - содержит UNIX время добавления/обновления данных по конкретному топику (по умолчанию GMT+0) topic - адрес топика. В контексте, упомянутого ранее, поля md5, не несет для нас никакой смысловой нагрузки. value - данные опубликованные в топике. DROP TABLE IF EXISTS `topics`; CREATE TABLE `topics` ( `md5` varchar(32) NOT NULL, `time` bigint(20) DEFAULT NULL, `topic` text, `value` text, PRIMARY KEY (`md5`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; Теперь необходимо создать хранимые процедуры и функции, но сделать это будет невозможно из-за присутствие в их синтаксисе разделителя совпадающего с концом данных в sql запросе - ";" Чтобы избежать этот неловкий момент, изменяем разделить на произвольный. DELIMITER $$ Создает процедуру update_topic. Она принимает в качестве входных параметров два значения, адрес топика и опубликованные данные. Оба параметра являются текстовыми. Процедура, вычисляет md5 хеш из адреса топика и уже по нему производит поиск записи в таблице. Если запись не будет найдена, она будет создана, в противном случае данные в поле value будут обновлены. Данная процедура должна ускорить работу демона и избавить его от задержек, которые были бы неминуемы при выполнении этих же запросов на стороне клиента. DROP PROCEDURE IF EXISTS `update_topic`$$ CREATE DEFINER=CURRENT_USER() PROCEDURE `update_topic`(topic text, value text) BEGIN declare nMD5 varchar(32) default md5(topic); declare NUM bit; declare uTime bigint(20) default UNIX_TIMESTAMP(); SELECT COUNT(t.md5) INTO NUM FROM topics t WHERE t.md5 = nMD5; if NUM <> 1 then INSERT INTO topics VALUES(nMD5, uTime, topic, value); else UPDATE topics t SET t.time = uTime, t.value = value WHERE t.md5 = nMD5; end if; END$$ Также будет добавлена функция get_topic. Она необходима для запроса данных от имени созданного ранее пользователя и ограничений, наложенных на него. Функция принимает адрес топика в текстовом виде, производит вычисление md5 хеша и основываясь на его совпадении с имеющимися записями выводит значение поля value искомого топика. DROP FUNCTION IF EXISTS `get_topic`$$ CREATE DEFINER=CURRENT_USER() FUNCTION `get_topic` (topic text) RETURNS text BEGIN declare hMD5 varchar(32) default md5(topic); declare topicValue text; SELECT t.value INTO topicValue FROM topics t WHERE t.md5 = hMD5; RETURN topicValue; END$$ И в завершении всего, будет восстановлено стандартное значение разделителя. DELIMITER ; На этом разбор sql файла можно считать законченным. Он не содержит каких-либо сложным манипуляций и должен быть понятен. Все эти операции можно выполнить руками, но я совету воспользоваться импортом, как и было описано ранее. Переходим к демону В первую очередь устанавливаем необходимые пакеты. sudo apt-get install python3 python-pip python-dev libmysqlclient-dev Устанавливаем недостающие библиотеки для Python pip install paho-mqtt mysqlclient Добавим пользователя из-под которого будет запускаться демон sudo useradd --shell /usr/sbin/nologin --system mqtt-agent Выставляем все необходимые права (каталог /media как пример) sudo chown mqtt-agent:mqtt-agent /media/mqttMySqlClient.py sudo chmod 0700 /media/mqttMySqlClient.py Добавляем демона в автозагрузку через планировщик задач и от имени созданного пользователя sudo crontab -u mqtt-agent -e Добавляем в конец следующую запись @reboot /media/mqttMySqlClient.py start Запускаем демона от имени все того же пользователя sudo -u mqtt-agent /media/mqttMySqlClient.py start Это основное, что требуется сделать на сервере для организации работы демона. Переходим к разбору настроек программы Т.к изначально за основу была взята концепция другого демона из ветки Zabbix, то конфигурация перекочевала оттуда и аналогично разбита на несколько секций. """ Настройки MQTT """ mqtt_server = "mqtt.it4it.club" mqtt_port = 1883 mqtt_login = "" mqtt_password = "" mqtt_client_id = "mqttMySqlClient" Настройки подключения к брокеру. Все должно быть интуитивно понятным. Помните, что при совпадении идентификаторов клиентов, они начнут конкурировать за подключение и по очереди терять связь. Не допускайте их совпадений. """ Список топиков для подписки """ subscribe = { '$SYS/#', '#', } Список топиков для подписки указывается через запятую и в кавычках. """ Настройки MySQL """ mysql_host = "127.0.0.1" mysql_port = 3306 mysql_user = "mqtt-agent" mysql_passwd = "p@$$w0rd" mysql_schema = "mqtt" mysql_log_file = "/var/log/mqttMySqlClient.log" Настройки подключения с MySQL серверу также не должны вызывать вопросов. """ Настройки общие """ pid_file = "/tmp/mqttMySqlClient.pid" Последний параметр указывает на размещение .pid файла демона. Команды управления классические start - запуск в режиме демона stop - остановка в режиме демона restart - перезапуск в режиме демона window - запуск в оконном режиме, также позволяет запускать процесс в операционных системах Windows После запуска, демон пытается установить связь с MQTT брокером и пока это не произойдет, связь с MySQL сервером устанавливаться не будет. Если во время работы, связь с брокером будет потеряна то в принудительном порядке, будет разорвано соединение с базой данных. Таким образом, по активным сессиям MySQL сервера можно судить о наличии связи у демона с брокером. Во время простоя, а в нашем случае, это отсутствие потока данных от брокера, для проверки связи с сервером базы данных будет использована процедура эмуляции ping для MySQL сервера. Она представляет из себя простую арифметическую задачу на сложение не приводящей к работе с данными в базе. Операция выполняется крайне быстро и её удачное выполнение сигнализирует клиенту о наличии связи с базой, а базе об активности клиента. В связи с этим, периодическая активность клиента при отсутствие данных от брокера, является показателем нормальной работы. И на последок Если вы хотите произвести выборку из таблицы под именем пользователя используемым по умолчанию и с его ограничениями. Необходимо воспользоваться функцией get_topic с указанием полного адреса, интересующего топика. select mqtt.get_topic('$SYS/broker/version'); В ответ мы получим +---------------------------------------+ | mqtt.get_topic('$SYS/broker/version') | +---------------------------------------+ | mosquitto version 1.4.8 | +---------------------------------------+ 1 row in set (0,00 sec) На этом пока все. Файлы проекта: PS: Это тестовая версия демона и возможно она будет претерпевать некоторые изменения.
-