Пример использования: автоматическое зависимое наблюдение-трансляция (аналитика ADS-B)

Автоматическое зависимое наблюдение-вещание — Википедия

Automatic Dependent Surveillance-Broadcast (ADS-B) — это технология наблюдения, при которой воздушное судно определяет…

en.wikipedia.org

Программный стек:

Система воздушного транспорта следующего поколения (NextGen)

Официальный сайт правительства США

www.faa.gov

Аппаратный стек:

Использованная литература:

Основной исходный код

Используя стек FLiP с открытым исходным кодом, мы можем легко отслеживать воздушные суда! Для этого требуется немного оборудования и немного магии Python. Вы можете создать свой собственный дома с помощью нескольких аппаратных средств и простого программного обеспечения с открытым исходным кодом.

Мы не только собираем, передаем, обогащаем и храним эти данные, но и приносим пользу миру. Вы можете проверить мою ленту на веб-сайте FlightAware.

Статистика Timothy Spann ADS-B Feeder — FlightAware

Вы можете помочь нам сохранить FlightAware бесплатным, разрешив рекламу с сайта FlightAware.com. Мы прилагаем все усилия, чтобы наша реклама…

Flightaware.com

Если вы считаете, что это похоже на некоторые данные, которые вам нужны для вашего Lakehouse, давайте начнем их принимать прямо сейчас. После того, как вы настроили свое оборудование, см. указания от Пиаваре. После того, как вы перезагрузились и все работает, вы сможете увидеть статус всего, что работает.

Консоль Raspberry Pi 4

Опубликованные онлайн-панели данных

Локальная лента рейсов работает на Raspberry Pi 4

Если вы хотите перейти к видео, демонстрирующему то, что я сделал, вы можете это сделать.

Давайте создадим темы, которые нам понадобятся для всех этих необработанных и обработанных полетных данных.bin/pulsar-admin topics create persistent://public/default/adsbraw

bin/pulsar-admin topics create persistent://public/default/aircraftbin/pulsar-admin topics create persistent://public/default/adsblogbin/pulsar-admin topics create persistent://public/default/adsbdead

Первая тема — это наши необработанные данные JSON ADS-B. Мы можем захотеть использовать это позже, чтобы позволить этой теме хранить данные навсегда, возможно, в какой-то момент мы включим многоуровневое хранилище и автоматически сохраним его в хранилище объектов в AWS, Google Cloud или Azure. У нас также есть тема для чистых данных, самолеты. Наконец, у нас есть тема для журналов, выводимых из нашей обработки, и тема для сообщений, которые можно обработать. Они не мертвы, поскольку мы можем воскресить их, как зомби, и снова обработать. Никогда не отказывайтесь от своих данных!

Первое, что я сделал, это изучил данные, которые отображались на этой красивой местной карте. Запустив Chrome в режиме разработчика, я увидел все вызовы REST. Я вижу данные JSON из простых вызовов REST. Здесь я взял то, что мне было нужно, и поместил это в этот удобный скрипт Python.

Поскольку мы собираемся передавать, хранить, анализировать, запрашивать и в целом делиться этими данными с различными разработчиками, аналитиками, специалистами по данным и другими людьми, мы должны убедиться, что знаем, что это за данные.

 hex (String optional)
flight (String optional - name of plane)
alt_baro (int optional - altitude)
alt_geom (int optional)
track (int optional)
baro_rate (int optional)
category (string optional)
nav_qnh (float optional)
nav_altitude_mcp (int optional)
nav_heading (float optional)
nic (int optional)
rc (int optional)
seen_pos (float optional)
version (int optional)
nic_baro (int optional)
nac_p (int optional)
nac_v (int optional)
sil (int optional)
sil_type (string optional)
mlat (array optional)
tisb (array optional)
messages (int optional)
seen (float optional)
rssi (float optional)
squawk (optional) - look at # conversion 7600, 7700, 4000, 5000, 7777, 6100, 5400, 4399, 4478, ...)
speed (optional)
mach (optional speed, mac to mph *767)
emergency (optional string)
lat (long optional)
lon (long optional)

Нам нужно определить схему с именами, типами и опциями. После того, как мы это сделали, мы можем создать для него схему JSON или Python и использовать ее в Apache Pulsar, Pulsar SQL (Presto/Trino), Apache Spark SQL, Apache Flink SQL и любом потребителе, который может читать схему из схемы Pulsar. Реестр. Данные без контракта — это просто байты.

  • поле: hex — идентификатор ИКАО
  • поле: рейс является ИДЕНТИФИКАЦИОННЫМ идентификатором
  • поле: altBaro — высота в футах (барометрическая)
  • поле: lat, lon — широта и долгота
  • поле: gs — путевая скорость в узлах.
  • поле: altGeom — высота (геометрическая)

Я посмотрел, что это за поля, и сделал несколько заметок. Значения Squawk интересны и могут быть интересны людям, использующим SQL позже.

Как только вы загрузите данные в класс Java и начнете отправлять сообщения в тему вашего самолета, вы можете извлечь автоматически сгенерированный схема.

bin/pulsar-admin schemas get persistent://public/default/aircraft

Давайте создадим наше приложение Python для сбора данных и публикации их в Pulsar! Мы могли бы использовать множество различных библиотек, поскольку Pulsar поддерживает множество протоколов, таких как Websockets, Kafka, MQTT, AMQP и RocketMQ. Чтобы все было просто и ванильно, я собираюсь использовать проверенный и надежный протокол Pulsar и стандартную библиотеку Python Pulsar. Я установил последнюю версию с pip3.10 установить пульсар-клиент[all]. Я сделал все это, потому что мне нужна была библиотека FastAvr, GRPC, схемы и другие причудливые вещи. Вы можете установить то, что вам нужно.

Полный код Python здесь. Я покажу вам важные моменты.

import pulsar
from pulsar import Client, AuthenticationOauth2
client = pulsar.Client(service_url, authentication = AuthenticationOauth2(auth_params))producer = client.create_producer(topic=topic ,properties={"producer-name": "adsb-rest","producer-id": "adsb-py" })uniqueid = 'thrml_{0}_{1}'.format(randomword(3),strftime("%Y%m%d%H%M%S",gmtime()))uuid2 = '{0}_{1}'.format(strftime("%Y%m%d%H%M%S",gmtime()),uuid.uuid4())url_data = "http://localhost:8080/data/aircraft.json?_=" + str(uuid.uuid4())response = json.dumps(requests.get(url_data).json())producer.send(response.encode('utf-8'),partition_key=uniqueid)

Затем мы можем запустить его и начать создавать необработанные данные ADS-B JSON для нашей темы Pulsar.

Давайте проверим, поступают ли данные.

bin/pulsar-client consume "persistent://public/default/adsbraw" -s adsbrawreader -n 0

Как выглядят эти необработанные данные?

{'now': 1659471117.0, 'messages': 7381380, 'aircraft': [{'hex': 'ae6d7a', 'alt_baro': 25000, 'mlat': [], 'tisb': [], 'messages': 177, 'seen': 0.1, 'rssi': -22.7}, {'hex': 'a66174', 'alt_baro': 23000, 'mlat': [], 'tisb': [], 'messages': 5, 'seen': 23.6, 'rssi': -27.8}, .. }

Ну, этого много. Я сократил это для прокрутки.

Спасибо, что остаетесь с нами до сих пор, вот кот.

Этот кот мой, этот кот толстый. Она в порядке с этим.

Необработанные данные хороши и все такое, я, конечно, могу использовать Apache Spark, Flink, Python или другие программы для их очистки. Я рекомендую вам настроить приемник Delta Lake, Apache Hudi или Apache Iceberg для хранения этих необработанных данных в вашем домике у озера, если хотите.

Примечание архитектора: Вы можете просто оставить его в Apache Pulsar навсегда или в многоуровневом хранилище, контролируемом Apache Pulsar.

Я хотел сделать это быстро и автоматически в среде Pulsar, поэтому я написал быстрый Ява Функция Pulsar для разделения, анализа, обогащения, очистки и направления данных в новую тему. Это будет очищенная тема данных. Мы могли бы встроить эту функцию в Питон или же Голанг также. В этот раз я выбрал Java.

Да, нам пришлось создать собственную функцию, прежде чем мы смогли развернуть ее на шаге 4. Давайте быстро взглянем на нашу Java-функция:

public class ADSBFunction implements Function<byte[], Void> {

Это означает, что наша функция принимает необработанные байты, но ничего не возвращает. У нас нет указанного здесь вывода, так как я буду динамически решать, куда отправить вывод. Мы могли отправить на любое количество тем на лету.

context.newOutputMessage(PERSISTENT_PUBLIC_DEFAULT, JSONSchema.of(Aircraft.class)) .key(UUID.randomUUID().toString()) .property(LANGUAGE, JAVA) .value(aircraft).send();

В конце моей функции я собираюсь отправить данные в тему (можно создать их на лету, если мы хотим отправить их в разные темы). Возможно, мы захотим отправить все полеты Илона Маска в специальную тему. Мы могли бы выполнить эти поиски с помощью чего-то вроде Сцилла, я сделал это для своего приложения Air Quality. Я добавляю ключ, добавляю свойство, устанавливаю схему и отправляю данные. Что здесь хорошо, так это то, что мне не нужно использовать формальный язык для определения схемы. Я могу просто создать старый добрый Java Bean. Сохраняя это простым и старомодным, это работает для меня.

Между этим кодом у меня есть помощник оказание услуг который анализирует этот беспорядок JSON и извлекает хорошие биты по одному событию самолета за раз. Это довольно просто, но приятно хранить этот код в простой функции, которая запускается при каждом событии или сообщении, поступающем в тему ADSBRAW. Вам понадобится Java JDK (11 или 17) и Maven. Я рекомендую вам использовать SDKMan так что вы можете запускать несколько JVMS и инструментов.

Чтобы построить функцию, нам просто нужно ввести:

mvn package

Pulsar упрощает добавление обработки событий по каждой теме, поэтому я создал простую обработку на Java. Их очень легко развертывать, отслеживать, запускать, останавливать и удалять.

Давайте развернем наш функция:

bin/pulsar-admin functions create --auto-ack true --jar /opt/demo/java/pulsar-adsb-function/target/adsb-1.0.jar --classname "dev.pulsarfunction.adsb.ADSBFunction" --dead-letter-topic "persistent://public/default/adsbdead" --inputs "persistent://public/default/adsbraw" --log-topic "persistent://public/default/adsblog" --name ADSB --namespace default --tenant public --max-message-retries 5

Для людей, плохо знакомых с Apache Pulsar, обратите внимание, что мы должны указать пространство имен и арендатора, где это будет жить. Это сделано для возможности обнаружения, мультиарендности и просто чистоты. У нас может быть столько входных тем, сколько мы хотим. Темы журнала и недоставленных сообщений предназначены для специальных выходных данных. В этом случае для Java наше приложение хранится в файле JAR.

После развертывания давайте проверим статус:

bin/pulsar-admin functions status --name ADSB

Результатом является JSON, который поддается автоматизации DevOps. Если бы мы захотели, мы могли бы управлять этим с помощью REST или инструмента DevOps.

{
"numInstances" : 1,
"numRunning" : 1,
"instances" : [{
"instanceId" : 0,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReceived" : 28,
"numSuccessfullyProcessed" : 28,
"numUserExceptions" : 0,
"latestUserExceptions" : [],
"numSystemExceptions" : 0,
"latestSystemExceptions" : [],
"averageLatency" : 144.23374035714286,
"lastInvocationTime" : 1659725881406,
"workerId" : "c-standalone-fw-127.0.0.1-8080"
}
} ]
}

Если бы мы хотели остановить это:

bin/pulsar-admin functions stop --name ADSB --namespace default --tenant public

Если нам нужно удалить его:

bin/pulsar-admin functions delete --name ADSB --namespace default --tenant public

Теперь, когда функция обработала данные, давайте быстро проверим эти чистые данные.

bin/pulsar-client consume "persistent://public/default/aircraft" -s "aircraftconsumer" -n 0

Пример возвращаемой строки JSON:

----- got message -----
key:[c480cd8e-a803-47fe-81b4-aafdec0f6b68], properties:[language=Java], content:{"flight":"N86HZ","category":"A7","emergency":"none","squawk":1200,"hex":"abcd45","gs":52.2,"track":106.7,"lat":40.219757,"lon":-74.580566,"nic":9,"rc":75,"version":2,"sil":3,"gva":2,"sda":2,"mlat":[],"tisb":[],"messages":2259,"seen":1.1,"rssi":-19.9}

Примечание архитектора: Всегда устанавливайте ключ, когда вы создаете сообщения.

Теперь у нас есть чистые данные!

Давайте проверим этот поток данных с помощью Apache Spark Structured Streaming!

val dfPulsar = spark.readStream.format("pulsar").option("service.url", "pulsar://pulsar1:6650").option("admin.url", " "persistent://public/default/aircraft").load()

dfPulsar.printSchema()
root
|-- altBaro: integer (nullable = true)
|-- altGeom: integer (nullable = true)
|-- baroRate: integer (nullable = true)
|-- category: string (nullable = true)
|-- emergency: string (nullable = true)
|-- flight: string (nullable = true)
|-- gs: double (nullable = true)
|-- gva: integer (nullable = true)
|-- hex: string (nullable = true)
|-- lat: double (nullable = true)
|-- lon: double (nullable = true)
|-- mach: double (nullable = true)
|-- messages: integer (nullable = true)
|-- mlat: array (nullable = true)
| |-- element: struct (containsNull = false)
|-- nacP: integer (nullable = true)
|-- nacV: integer (nullable = true)
|-- navAltitudeMcp: integer (nullable = true)
|-- navHeading: double (nullable = true)
|-- navQnh: double (nullable = true)
|-- nic: integer (nullable = true)
|-- nicBaro: integer (nullable = true)
|-- rc: integer (nullable = true)
|-- rssi: double (nullable = true)
|-- sda: integer (nullable = true)
|-- seen: double (nullable = true)
|-- seenPos: double (nullable = true)
|-- sil: integer (nullable = true)
|-- silType: string (nullable = true)
|-- speed: double (nullable = true)
|-- squawk: integer (nullable = true)
|-- tisb: array (nullable = true)
| |-- element: struct (containsNull = false)
|-- track: double (nullable = true)
|-- version: integer (nullable = true)
|-- __key: binary (nullable = true)
|--__topic: string (nullable = true)
|-- __messageId: binary (nullable = true)
|--__publishTime: timestamp (nullable = true)
|-- __eventTime: timestamp (nullable = true)
|--__messageProperties: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)

val pQuery = dfPulsar.selectExpr("*").writeStream.format("console")
.option("truncate", false).start()

Приведенный выше код Spark, подключенный к кластеру Pulsar, получил данные из нашей темы Pulsar и построил таблицу. Как видите, установка этой схемы — отличная идея. Затем мы можем легко запросить его как микропакет и в этом случае вывести его на консоль для отладки. Мы также могли бы отправить этот поток куда-нибудь еще, например, на S3.

Давайте запустим непрерывный SQL-запрос с помощью Apache Flink.

CREATE CATALOG pulsar WITH (
'type' = 'pulsar',
'service-url' = 'pulsar://pulsar1:6650',
'admin-url' = 'http://pulsar1:8080',
'format' = 'json'
);

USE CATALOG pulsar;

Создаем каталог для подключения Flink к Pulsar.

Давайте посмотрим на нашу таблицу.

describe aircraft;
+------------------+-----------------------+------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+------------------+-----------------------+------+-----+--------+-----------+
| alt_baro | INT | true | | | |
| alt_geom | INT | true | | | |
| baro_rate | INT | true | | | |
| category | STRING | true | | | |
| emergency | STRING | true | | | |
| flight | STRING | true | | | |
| gs | DOUBLE | true | | | |
| gva | INT | true | | | |
| hex | STRING | true | | | |
| lat | DOUBLE | true | | | |
| lon | DOUBLE | true | | | |
| mach | DOUBLE | true | | | |
| messages | INT | true | | | |
| mlat | ARRAY<ROW<> NOT NULL> | true | | | |
| nac_p | INT | true | | | |
| nac_v | INT | true | | | |
| nav_altitude_mcp | INT | true | | | |
| nav_heading | DOUBLE | true | | | |
| nav_qnh | DOUBLE | true | | | |
| nic | INT | true | | | |
| nic_baro | INT | true | | | |
| rc | INT | true | | | |
| rssi | DOUBLE | true | | | |
| sda | INT | true | | | |
| seen | DOUBLE | true | | | |
| seen_post | DOUBLE | true | | | |
| sil | INT | true | | | |
| sil_type | STRING | true | | | |
| speed | DOUBLE | true | | | |
| squawk | INT | true | | | |
| tisb | ARRAY<ROW<> NOT NULL> | true | | | |
| track | DOUBLE | true | | | |
| version | INT | true | | | |
+------------------+-----------------------+------+-----+--------+-----------+
33 rows in set

Давайте запустим несколько простых запросов.

select alt_baro,
gs,
alt_geom,
baro_rate,
mach,
hex, flight, lat, lon
from aircraft;

select max(alt_baro) as MaxAltitudeFeet, min(alt_baro) as MinAltitudeFeet, avg(alt_baro) as AvgAltitudeFeet,
max(alt_geom) as MaxGAltitudeFeet, min(alt_geom) as MinGAltitudeFeet, avg(alt_geom) as AvgGAltitudeFeet,
max(gs) as MaxGroundSpeed, min(gs) as MinGroundSpeed, avg(gs) as AvgGroundSpeed,
count(alt_baro) as RowCount,
hex as ICAO, flight as IDENT
from aircraft
group by flight, hex;

Мы можем выполнять базовые запросы, которые будут возвращать каждую поступившую строку или агрегировать их.

Мы сделали это. Давайте начнем транслировать нашу маленькую трансляцию мечты.

Спасибо, что остались за все здание приложения. Надеюсь скоро увидеть вас на встрече или мероприятии. Контакт мне если у вас есть какие-либо вопросы или вы ищете другие приложения, созданные на базе FLiPN+.

Дополнительные ресурсы для создания более продвинутых приложений: