Mit Nachrichten lassen sich unkompliziert Daten zwischen Prozessen austauschen. Allerdings kommt es darauf an, wie man sie einsetzt: Es können sehr flexible und erweiterbare Systeme entstehen, oder aber starre und fehleranfällige Monster.
Der vorige Beitrag dieser Serie entwickelte einen Mechanismus, der mithilfe von Queues Nachrichten von einem Skript zu einem anderen befördert. Queues können Nachrichten allerdings nur in eine Richtung versenden, was die Kommunikation mit mehreren Prozessen umständlich macht. Entwickler, die unser Modul-Framework einsetzen, sollten sich nicht mit derartigen Details beschäftigen müssen. Deshalb schreiben wir dieses Mal ein Modul namens »ipc«, das die Queues hinter einer einfachen API zum Austausch von Nachrichten versteckt.
Da eine bidirektionale Kommunikation zwei Queues voraussetzt, wäre es naheliegend, für jedes Prozesspaar zwei solcher Warteschlangen zu verwenden. Ein Prozess, der mit mehreren anderen Prozessen kommuniziert, müsste sich dann aber merken, welche Queue zu welchem Prozess gehört. Das ist zwar nicht unmöglich, doch es geht auch einfacher.
Inspiration finden wir in der analogen Welt: Wir spendieren jedem Prozess nur eine Queue, die er dafür aber wie einen Briefkasten verwendet. Um einem Prozess eine Nachricht zu schicken, müssen wir dann lediglich wissen, wo der passende Briefkasten steht. Damit der Empfänger wiederum erkennen kann, wer die Nachricht versandt hat, muss sie – wie auch in der analogen Welt – in einem Briefumschlag mit Empfänger und Absender stecken.
|
Verteilte Systeme in der Bash (Teil 1) |
LM 06/2022, S. 76 |
|
|
Verteilte Systeme in der Bash (Teil 2) |
LM 07/2022, S. 82 |
|
|
Framework für automatisierte Tests |
LM 08/2022, S. 80 |
|
|
Komfortables Logging für die Bash |
LM 09/2022, S. 78 |
|
|
Kommandozeilenparser im Eigenbau |
LM 10/2022, S. 72 |
|
|
Mutexe und Semaphoren in der Bash |
LM 11/2022, S. 80 |
|
|
Deadlock-freie Queues in der Bash |
LM 12/2022, S.82 |
|
|
IPC mit den Mitteln der Bash |
LM**01/2023, S.80 |
<U>https://www.lm-online.de/48493<U> |
Endpunkte
Mit unserem IPC-Modul wollen wir aber Details wie die Queues vor dem Entwickler verstecken. Daher führen wir das Konzept des Endpunkts ein: Ähnlich wie ein Socket im Netzwerk-Stack ist ein Endpunkt eine Datenstruktur, die dem Senden und Empfangen von Nachrichten dient. In unserem Fall handelt es sich um ein Verzeichnis, das unter anderem eine Queue enthält. Als Adresse eines Endpunkts gilt sein Speicherort im Dateisystem relativ zum Verzeichnis »/var/lib/bms/ipc/«.
Ein Endpunkt kann eine öffentliche oder eine private Adresse haben, wobei als öffentliche Adresse eine Zeichenkette wie »pub/echo« fungiert. Eine private Adresse hingegen fängt mit »priv/$user« an und wird vom IPC-Modul zufällig vergeben. Öffentliche Adressen entsprechen in etwa den wohldefinierten Ports bei TCP/IP; private Adressen entsprechen den Ports, die der Kernel zufällig vergibt.
Ein Endpunkt mit einer öffentlichen Adresse lässt sich durch einen Aufruf wie den aus der ersten Zeile von Listing 1 vergeben. Wer keine öffentliche Adresse benötigt, kann sie wie im Aufruf in Zeile 2 weglassen. Die Funktion gibt den Pfad des erstellten Endpunkts zurück. Dieser Pfad wird als erstes Argument zum Empfangen (Zeile 3) respektive zum Versenden (Zeile 4) einer Nachricht an die anderen IPC-Funktionen übergeben.
Listing 1
Endpunkte
endp=$(ipc_endpoint_open "pub/echo") endp=$(ipc_endpoint_open) msg=$(ipc_endpoint_recv "$endp") ipc_endpoint_send "$endp" "pub/echo" "Hallo Welt!"
Gut verpackt
Die Daten, die wir nach einem erfolgreichen Aufruf von »ipc_endpoint_recv()« erhalten, sind in einem Umschlag verpackt. Wir statten das Modul daher mit einer Reihe von Funktionen aus, mit denen wir Nachrichten auspacken können: Mit »ipc_msg_get_source()« und »ipc_msg_get_destination()« bekommen wir die Adresse des Absenders beziehungsweise Empfängers der Nachricht. Über »ipc_msg_get_user()« sowie »ipc_msg_get_timestamp()« lässt sich herausfinden, welcher Benutzer eine Nachricht wann versendet hat. Den Inhalt des Umschlags, also die eigentliche Nachricht, erhalten wir über die Funktion »ipc_msg_get_data()«. Nachrichten implementieren wir als JSON-Objekte, die zwischen Endpoints übertragen werden (Listing 2).
Listing 2
JSON-Briefumschlag
{
"source": "priv/mk.bash.30928.1667014015.30758",
"destination": "pub/echo",
"user": "mk",
"timestamp": 1667014035,
"data": "SGFsbG8gV2VsdCEK"
}
Wie schon bei der Implementierung der Queue kodieren wir die Nutzdaten in Base64, damit wir sie ohne Bedenken in das JSON-Objekt einbetten können. Das Erstellen der Nachricht geht relativ leicht von der Hand, wie die Hilfsfunktion »_ipc_msg_new()« in Listing 3 zeigt.
Das Auslesen von Nachrichten hingegen ist erheblich komplizierter und eine beliebte Quelle von Bugs. Wir machen deshalb einen großen Bogen um die Idee, Nachrichten von Hand zu parsen, und greifen auf den Befehl »jq« zurück, der sich in den Paketquellen aller gängigen Distributionen findet. Listing 4 zeigt, wie sich die Funktion »ipc_msg_get_data()« mit Jq realisieren lässt. Die weiteren Funktionen unterscheiden sich nur darin, dass die anderen Felder nicht dekodiert werden müssen.
Listing 3
JSON erzeugen
_ipc_msg_new() {
local source="$1"
local destination="$2"
local data="$3"
local -i timestamp
local encoded_data
if ! timestamp=$(date +"%s"); then
return 1
fi
if ! encoded_data=$(base64 -w 0 <<< "$data"); then
return 1
fi
printf '{"source": "%s",' "$source"
printf ' "destination": "%s",' "$destination"
printf ' "user": "%s",' "$USER",
printf ' "timestamp": %d,' "$timestamp"
printf ' "data": "%s"}\n' "$encoded_data"
return 0
}
Listing 4
JSON mit Jq parsen
ipc_msg_get_data() {
local msg="$1"
local encoded_data
local data
if ! encoded_data=$(jq -e -r '.data' <<< "$msg"); then
return 1
fi
if ! data=$(base64 -d <<< "$encoded_data"); then
return 1
fi
echo "$data"
return 0
}
Somit ist auch klar, wie die Funktion »ipc_endpoint_send()« auszusehen hat. Mit einem Aufruf von »_ipc_msg_new()« erzeugt sie eine neue Nachricht im JSON-Format und wirft sie kurzerhand per »queue_put()« in den Briefkasten des Empfängers (Listing 5).
Listing 5
Nachrichten senden
ipc_endpoint_send() {
local from="$1"
local to="$2"
local data="$3"
local msg
local queue
queue="$__ipc_root/$to/queue"
if ! msg=$(_ipc_msg_new "$from" "$to" "$data"); then
return 1
fi
if ! queue_put "$queue" "$msg"; then
return 1
fi
return 0
}
Der Funktion »ipc_endpoint_recv()«, die das Empfangen von Nachrichten implementiert, wollen wir ein Timeout übergeben können, damit Skripte nicht ewig auf Nachrichten warten. Die Funktion wird dadurch aber nicht komplizierter, da wir Timeouts schon in der Queue implementiert haben. Wir müssen daher den Timeout nur an »queue_get()« durchreichen (Listing 6). Wichtig ist dabei, »-1« als Timeout weiterzureichen, falls der Aufrufer keinen Wert übergeben hat.
Listing 6
Nachrichten empfangen
ipc_endpoint_recv() {
local endpoint="$1"
local -i timeout="${2--1}"
local queue
queue="$__ipc_root/$endpoint/queue"
if ! queue_get "$queue" "$timeout"; then
return 1
fi
return 0
}
Modulkonstruktor
Nun fehlen nur noch der Modulkonstruktor und die Funktionen zum Öffnen und Schließen von Endpunkten. Wie wir bereits in den anderen Funktionen gesehen haben, muss der Modulkonstruktor die Variable »__ipc_root« deklarieren, die auf das Verzeichnis zeigt, in dem sich alle Endpunkte befinden. Damit die Kommunikation mit mehreren Benutzern korrekt funktioniert, sind an dieser Stelle einige Kniffe nötig.
Zunächst benötigen wir eine Benutzergruppe bms_ipc, die wir kurzerhand mit »groupadd bms_ipc« erstellen. Alle Benutzer, die das IPC-Modul verwenden wollen, müssen in dieser Gruppe sein. Die Verzeichnisse »pub/« und »priv/« in »/var/lib/bms/ipc/« müssen wir ebenfalls von Hand erzeugen, da sie besondere Berechtigungen brauchen.
Wenn ein Prozess eine Nachricht an einen Endpunkt sendet, muss er in der Lage sein, Mutexe des Endpunkts zu öffnen und zu schließen. Das heißt, er muss Symlinks in den Unterverzeichnissen des Endpunkts erstellen und löschen, wofür er Schreibzugriff auf die Verzeichnisse braucht. Zugriffsrechte im IPC-Verzeichnis erhält ein Prozess über die Gruppe, weshalb alle Dateien und Unterverzeichnisse im IPC-Verzeichnis der Gruppe bms_ipc gehören müssen.
Wenn unter Linux ein Nutzer eine Datei oder ein Verzeichnis erstellt, gehört es aber zunächst der primären Gruppe des Benutzers. Wir müssten also jedes Mal, wenn wir ein neues Objekt im Dateisystem erstellen, die Gruppe händisch anpassen. Glücklicherweise gibt es eine Alternative: Wenn wir mit »chmod g+s Ordner« das Setgid-Bit eines Verzeichnisses setzen, erben alle Dateien und Verzeichnisse, die fortan in dem Verzeichnis erstellt werden, die Gruppenzugehörigkeit des Elternverzeichnisses. Verzeichnisse erben außerdem das Setgid-Bit, weshalb wir durch Setzen des Bits auf dem IPC-Verzeichnis erreichen, dass alle Objekte in dem Verzeichnis der Gruppe bms_ipc gehören. Das IPC-Verzeichnis erstellen wir daher mit den Befehlen aus Listing 7.
Listing 7
IPC-Verzeichnis erzeugen
$ mkdir -p /var/lib/bms/ipc/{pub,priv}
$ chown -R root:bms_ipc /var/lib/bms/ipc chmod -R g+rwxs /var/lib/bms/ipc
Im Modulkonstruktor müssen wir nun nichts weiter tun, als die Module »queue« und »log« einzubinden und die nötigen globalen Variablen zu deklarieren. Schon interessanter ist die Funktion »ipc_endpoint_open()«, die einen neuen Endpunkt erstellt. Sie muss zunächst den Namen des Endpunkts festlegen, falls der Anrufer keinen öffentlichen Namen angefordert hat. Als Namen verwenden wir eine Kombination aus dem Benutzernamen, dem Namen des Skripts, der PID, dem Zeitstempel und einer Zufallszahl. So verringern wir nicht nur das Risiko, dass zwei Prozesse denselben Endpunkt erstellen: Der Name des Endpunkts kann auch beim Debugging nützlich sein.
Nachdem das Verzeichnis des Endpunkts erstellt ist, brauchen wir eine Queue für den Endpunkt. Das erfordert allerdings einen nicht ganz stubenreinen Trick. Die Queue speichert Daten in einer internen Datei, die erst beim ersten Aufruf von »queue_put()« erzeugt wird. Beim Erstellen des Endpunkts müssen wir der Gruppe aber Schreibzugriff auf die Datei geben, weshalb wir sie hier per »touch« erstellen. Schöner wäre es, den Aufruf von »touch« in »queue_init()« unterzubringen. Die vollständige Funktion zum Öffnen von Endpunkten zeigt Listing 8. Das Modul legen wir mit dem Namen »ipc.sh« unter »/usr/local/share/bms/include/« ab.
Listing 8
Endpunkt erstellen
ipc_endpoint_open() {
local name="$1"
local endpoint
if [[ -z "$name" ]]; then
local self
local timestamp
self="${0##*/}"
if ! timestamp=$(date +"%s"); then
return 1
fi
name="priv/$USER.$self.$$.$timestamp.$RANDOM"
fi
endpoint="$__ipc_root/$name"
if ! [ -d "$endpoint" ]; then
if ! mkdir -p "$endpoint"; then
return 1
fi
if ! queue_init "$endpoint/queue" ||
! touch "$endpoint/queue/data" ||
! echo "$USER" > "$endpoint/owner" ||
! chmod -R g+rwxs "$endpoint"; then
if ! rm -rf "$endpoint"; then
log_error "Could not clean up $endpoint"
fi
return 1
fi
fi
echo "$name"
return 0
}
Funktionstest
Wir wollen nun mit einem möglichst einfachen Beispiel prüfen, ob das Modul richtig funktioniert. Dazu implementieren wir das Echo-Protokoll mit unserem neuen IPC-Mechanismus. Dazu benötigen wir einen Server und einen Client. Letzterer sendet eine beliebige Nachricht an den Server, der wiederum nichts anderes tut, als die Nachricht an den Client zurückzusenden.
Entsprechend unkompliziert fällt der Code für den Echo-Server aus: Wir öffnen mit »ipc_endpoint_open()« einen Endpunkt unter der Adresse »pub/echo« und empfangen mit »ipc_endpoint_recv()« so lange Nachrichten, bis ein Fehler auftritt. Wenn wir eine Nachricht empfangen haben, ermitteln wir mit »ipc_msg_get_source()« den Absender und mit »ipc_msg_get_data()« den Inhalt der Nachricht, die wir anschließend mit »ipc_endpoint_send()« zurücksenden (Listing 9).
Listing 9
echo-server.sh (Auszug)
main() {
local endp
local msg
if ! endp=$(ipc_endpoint_open "pub/echo"); then
return 1
fi
while msg=$(ipc_endpoint_recv "$endp"); do
local source
local data
source=$(ipc_msg_get_source "$msg")
data=$(ipc_msg_get_data "$msg")
if ! ipc_endpoint_send "$endp" "$source" "$data"; then
break
fi
done
ipc_endpoint_close "$endp"
return 0
}
Das Echo-Protokoll ist dazu gedacht, Latenzen in einem Netzwerk zu messen. Auf der Client-Seite senden wir also nicht nur eine Nachricht und warten auf die Antwort, sondern messen auch die Zeit vom Absenden bis zum Empfang der Antwort. Durch den Code zur Zeiterfassung fällt der Echo-Client etwas unübersichtlicher aus als der Server, die Funktionsweise ist aber simpel: Mit »ipc_endpoint_send()« senden wir eine Nachricht an »pub/echo« und nehmen dann mit »ipc_endpoint_recv()« die Antwort entgegen.
Falls der Echo-Server nicht läuft, wollen wir nicht ewig auf eine Antwort warten. Wir weisen »ipc_endpoint_recv()« daher an, höchstens 30 Sekunden auf eine Nachricht zu warten. Mit »date +”%s%N”« und der Funktion »get_delay()« ermitteln wir die aktuelle Zeit in Nanosekunden beziehungsweise geben die Differenz zwischen zwei Zeitpunkten in für Menschen lesbarer Form aus. Der Auszug in Listing 10 enthält diese Funktion nicht, Sie können Sie aber dem Begleitmaterial zu diesem Beitrag entnehmen.
Listing 10
echo-client.sh (Auszug)
main() {
local endp
local message
local -i time_sent
local -i time_received
local -i err
opt_add "m" "message" "v" "Hallo Welt!" "Zu sendende Nachricht"
if ! opt_parse "$@"; then
return 1
fi
message=$(opt_get "message")
if ! endp=$(ipc_endpoint_open); then
return 1
fi
time_sent=$(date +"%s%N")
err=1
if ipc_endpoint_send "$endp" "pub/echo" "$message"; then
local response
echo "Warte auf Antwort von Echo-Server..."
if response=$(ipc_endpoint_recv "$endp" 30); then
time_received=$(date +"%s%N")
ipc_msg_get_data "$response" | log_highlight "Antwort"
echo "Dauer: $(get_delay "$time_sent" "$time_received")"
err=0
else
log_error "Timeout oder Fehler beim Empfang"
fi
fi
ipc_endpoint_close "$endp"
return "$err"
}
In zwei separaten Terminals starten wir nun zunächst den Echo-Server und dann den Echo-Client. Sofern die Benutzergruppe und Verzeichnisse korrekt erstellt wurden, sollte das Ergebnis dem in Abbildung 1 gezeigten entsprechen. Ein Round-Trip, also der Weg vom Client zum Server und zurück hat dort etwas über eine Sekunde gedauert, trotz einer Nutzlast von nur wenigen Bytes. Der Austausch von Nachrichten funktioniert zwar wie beabsichtigt, zur Implementierung von hochperformanten Systemen eignet sich unser Framework aber – wie zu erwarten war – nicht.
Viele Vorteile
Mit unserem neuen Modul können wir nun zwischen Bash-Skripten Nachrichten austauschen, ähnlich wie in anderen Sprachen. Da unsere Endpoints im Dateisystem gespeichert sind, hat unser Modul anderen Implementierungen gegenüber sogar mehrere Vorteile. Zum einen gehen erhaltene Nachrichten, die der Empfänger noch nicht verarbeitet hat, bei einem Neustart des Empfängers (egal, ob Skript oder gesamtes System) nicht verloren. Solange der empfangende Endpoint existiert, spielt es keinerlei Rolle, ob der Empfängerprozess überhaupt läuft. Sobald er »ipc_endpoint_recv()« aufruft, erhält er auch die Nachrichten, die der Endpoint in seiner Abwesenheit erhalten hat.
Dadurch, dass mehrere Prozesse gleichzeitig auf denselben Endpoint zugreifen können, ergeben sich über das Senden und Empfangen von Nachrichten hinaus praktische Eigenschaften. Nehmen mehrere Prozesse gleichzeitig auf derselben öffentlichen Adresse – und somit auf demselben Endpoint – Nachrichten entgegen, werden die Nachrichten von der Queue der Reihe nach an die Empfänger ausgegeben, und wir erhalten eine Art naives Load Balancing. So können wir Verarbeitungsschritte ohne viel Aufwand parallelisieren.
Das setzt allerdings voraus, dass der gesamte zum Verarbeiten einer Nachricht nötige Kontext in der Nachricht selbst enthalten ist, da wir nicht davon ausgehen können, dass zwei Nachrichten von demselben Prozess verarbeitet werden. Dieses Kommunikationsmodell eignet sich besonders für Microservices, also für Architekturen, bei denen viele kleine Prozesse kleinere Aufgaben unabhängig voneinander erledigen.
Pubs und Subs
Für flexible Messaging-basierte Architekturen fehlt uns aber noch ein Feature, das wir ergänzen wollen: Publish-Subscribe-Messaging (oft PubSub-Messaging oder nur kurz PubSub genannt). Anders als beim bisher behandelten Point-to-Point-Messaging werden Nachrichten dabei nicht an einen Empfänger adressiert, sondern an ein Topic (also ein Thema). Um Nachrichten zu empfangen, muss ein Empfänger ein Topic abonnieren.
Nehmen wir als Beispiel ein System, das die Daten eines Temperatursensors verarbeitet. Ein Sensorprozess liest in regelmäßigen Abständen einen Sensor aus und sendet die Messwerte an einen Anzeigeprozess, der die Werte auf einem Display ausgibt. Wenn wir dieses System mit Point-to-Point-Messaging implementieren, muss der Sensorprozess die Adresse des Anzeigeprozesses kennen, da er sonst keine Nachrichten versenden kann.
Angenommen, wir wollen in dieses System nun einen Prozess integrieren, der bei kritischen Temperaturen eine E-Mail an den Administrator sendet. Bei diesem Ansatz ist es nicht damit getan, den neuen Prozess zu starten, da der Sensorprozess ihn nicht kennt und ihm daher keine Nachrichten zukommen lässt. Obwohl der neue Prozess vom Sensorprozess abhängt, muss in dieser Architektur der Sensorprozess verändert werden, damit das System funktioniert.
Da beim PubSub-Messaging die Nachrichten nicht direkt an Endpunkte gesendet werden, entfällt die Notwendigkeit, Adressen von einzelnen Prozessen zu konfigurieren. In unserem Beispiel würde der Sensorprozess die Messwerte also einfach unter einem Topic wie »temperatur« veröffentlichen und hätte damit seine Arbeit getan. Die anderen Prozesse würden ihrerseits das Topic »temperatur« abonnieren und empfangene Nachrichten wie gehabt verarbeiten. Wenn wir zu diesem System nun einen neuen Prozess für Benachrichtigungen per IRC hinzufügen, muss der neue Prozess lediglich das Topic »temperatur« abonnieren, und schon ist er in das System integriert. Da die Topics auf diese Weise die einzelnen Prozesse des Systems entkoppeln, muss man das bestehende System nicht mehr verändern, um neue Prozesse hinzuzufügen. PubSub-Messaging erlaubt es daher, sehr flexible Architekturen zu implementieren.
Obwohl PubSub eine erhebliche Erweiterung unseres Moduls darstellt, kommen nur drei Funktionen zur API hinzu: Mit »ipc_endpoint_subscribe()« und »ipc_endpoint_unsubscribe()« lässt sich ein Topic abonnieren beziehungsweise ein Abonnement kündigen. Zum Veröffentlichen von Nachrichten dient die neue Funktion »ipc_endpoint_publish()«, deren Gegenstück die bestehende Funktion »ipc_endpoint_recv()« bildet. Letztere lässt sich unverändert einsetzen.
Um PubSub zu implementieren, müssen wir dem Modul nur Topics hinzufügen. Ein Topic ist letzten Endes eine Liste der Endpoints, die zu einem Thema Nachrichten empfangen wollen. Wir implementieren Topics als Unterverzeichnisse von »/var/lib/bms/ipc/pubsub/«, die wir wie die anderen Verzeichnisse mit dem Setgid-Bit versehen müssen. Abonniert ein Endpoint ein Topic, erstellen wir in dem Verzeichnis des Topics einen Link auf den Endpoint. Veröffentlicht ein Prozess eine Nachricht zu einem Topic, muss er lediglich eine Nachricht an jeden Link im Verzeichnis des Topics senden. Zum Kündigen eines Abonnements genügt es, den entsprechenden Link aus dem Verzeichnis des Topics zu entfernen.
Die Funktion »ipc_endpoint_subscribe()« führt also zwei Schritte aus: Zuerst erstellt sie das Topic, falls es nicht bereits existiert, und danach die nötigen Symlinks, um das Topic zu abonnieren. Wir implementieren sie daher wie in Listing 11 gezeigt durch Aufrufe der zwei Hilfsfunktionen »_ipc_topic_create()« und »_ipc_endpoint_topic_subscribe()«. Erstere erstellt das Topic-Verzeichnis und setzt mit »chmod g+rwxs« die nötigen Berechtigungen. Da diese Funktion nicht sehr kompliziert ist, finden Sie sie nur im Begleitmaterial zu diesem Artikel.
Listing 11
Topics abonnieren
_ipc_endpoint_topic_subscribe() {
local endpoint="$1"
local topic="$2"
local topicdir
local subscription
topicdir="$__ipc_root/pubsub/$topic"
subscription="$topicdir/${endpoint//\//_}"
if ! ln -sf "$endpoint" "$subscription"; then
return 1
fi
if ! ln -sfn "$topicdir" "$__ipc_root/$endpoint/subscriptions/$topic"; then
rm -f "$subscription"
return 1
fi
return 0
}
ipc_endpoint_subscribe() {
local endpoint="$1"
local topic="$2"
if ! _ipc_topic_create "$topic" ||
! _ipc_endpoint_topic_subscribe "$endpoint" "$topic"; then
return 1
fi
return 0
}
Die zweite Funktion ist zwar nicht viel komplizierter, doch der Teufel steckt hier im Detail. Zum einen enthalten Namen von Endpunkten Schrägstriche, die wir mit der String-Expansion »${endpoint//\//_}« in Unterstriche umwandeln müssen. Zum anderen erstellen wir nicht nur im Verzeichnis des Topics einen Link auf den Endpunkt, sondern umgekehrt auch im Verzeichnis des Endpunkts einen Link auf den Topic. So können wir beim Schließen eines Endpunkts ohne viel Aufwand alle Abonnements entfernen.
Doch hier stellt Ln uns eine Falle: Wenn der Link bereits existiert, dereferenziert es ihn. Zeigt er auf ein Verzeichnis, wie es hier der Fall ist, erstellt Ln einen neuen Link in dem Verzeichnis. Es würde also ein zweiter Link im Verzeichnis des Topics entstehen, der auf das Topic selbst zeigt. Da der Link keinen Endpunkt referenziert, würde er beim Versenden von Nachrichten Fehler verursachen. Dem zweiten Aufruf von Ln müssen wir daher ein »-n« mitgeben und es damit anweisen, Links nicht zu dereferenzieren.
Im Kündigen von Abonnements verbirgt sich keine derartige Falle. Die Funktion »ipc_endpoint_unsubscribe()« muss lediglich die zwei Links entfernen, die den Endpoint mit dem Topic verbinden.
Wenn ein Skript eine Nachricht unter einem Topic veröffentlicht, benutzt es die Funktion »ipc_endpoint_publish()«. Diese Funktion ruft zunächst »_ipc_topic_create()« auf, um sicherzustellen, dass das Topic existiert. Anschließend iteriert sie über alle Endpunkte, die das Topic abonniert haben, und sendet jedem einzelnen eine Nachricht. Die Abonnenten finden wir mit der Hilfsfunktion »_ipc_topic_get_subscribers()«, die nichts anderes tut, als alle Symlinks in dem Topic mit »find -type l« ausfindig zu machen und mit »readlink« ihre Ziele auszugeben. Zum Senden der Nachrichten verwenden wir die bereits bekannte Funktion »ipc_endpoint_send()« (Listing 12).
Listing 12
PubSub-Messages versenden
_ipc_topic_get_subscribers() {
local topic="$1"
if ! find "$__ipc_root/pubsub/$topic" -mindepth 1 -maxdepth 1 -type l -exec readlink {} \; ; then
return 1
fi
return 0
}
ipc_endpoint_publish() {
local endpoint="$1"
local topic="$2"
local message="$3"
local -i err
local subscriber
err=0
if ! _ipc_topic_create "$topic"; then
return 1
fi
while read -r subscriber; do
if ! ipc_endpoint_send "$endpoint" "$subscriber" "$message" "$topic"; then
err=1
fi
done < <(_ipc_topic_get_subscribers "$topic")
return "$err"
}
Als aufmerksamen Lesern sind Ihnen bei den neuen Funktionen sicher zwei Punkte aufgefallen. Der erste ist das Verzeichnis »subscriptions/«, in dem wir die Abonnements eines Endpunkts festhalten. Damit das funktioniert, muss das Verzeichnis selbstverständlich beim Aufruf von »ipc_endpoint_open()« erstellt werden. Indem wir »/subscriptions« an den Pfad im Aufruf »mkdir -p “$endpoint”« anhängen, sorgen wir dafür, dass das Verzeichnis gleich miterzeugt wird.
Der zweite Punkt: Beim Aufruf von »ipc_endpoint_send()« ist ein neuer, optionaler Parameter hinzugekommen. Mit ihm kann ein Aufrufer den Namen eines Topics übergeben und so dem JSON-Objekt hinzufügen. Das ist erforderlich, damit ein Empfänger, der mehr als ein Topic abonniert hat, erkennen kann, zu welchem davon eine Nachricht gehört. Die nötigen Änderungen bleiben aber überschaubar. In »ipc_endpoint_send()« reichen wir das neue Argument lediglich an »_ipc_msg_new()« durch. Falls der Topic gesetzt wurde, fügt letztere Funktion es unter dem Namen »topic« in das JSON-Objekt ein.
PubSub im Test
Zum Schluss wollen wir wissen, ob unsere PubSub-Erweiterung wie erwartet funktioniert. Dazu implementieren wir erneut das Temperatursensorsystem, das zuvor schon als Beispiel herhalten musste (Listing 13). Da die verfügbaren Sensoren je nach Plattform variieren, teilt unser Sensorprozess alle 5 Sekunden die Variable »$RANDOM« durch 100 und veröffentlicht den so simulierten Messwert unter dem Topic »temperatur«. Falls vorhanden, kann man auch einen der Temperatursensoren per Sysfs auslesen. Auf dem System des Autors lässt sich zum Beispiel mit »cat /sys/class/thermal/thermal_zone1/temp« die CPU-Temperatur in Milligrad Celsius ermitteln.
Listing 13
temp-sensor.sh (Auszug)
main() {
local sensor
local endp
if ! endp=$(ipc_endpoint_open); then
return 1
fi
while true; do
ipc_endpoint_publish "$endp" "temperatur" "$((RANDOM % 100))"
sleep 5
done
ipc_endpoint_close "$endp"
return 0
}
Den Anzeigeprozess implementieren wir wie in Listing 14 zu sehen. Wir öffnen einen Endpunkt und abonnieren das Topic »temperatur«. Anschließend warten wir in einer Schleife auf Nachrichten und geben deren Inhalt auf der Standardausgabe aus.
Listing 14
temp-anzeige.sh (Auszug)
main() {
local endp
if ! endp=$(ipc_endpoint_open); then
return 1
fi
if ipc_endpoint_subscribe "$endp" "temperatur"; then
local msg
while msg=$(ipc_endpoint_recv "$endp"); do
ipc_msg_get_data "$msg"
done
fi
ipc_endpoint_close "$endp"
return 0
}
Wie gehabt, starten wir nun die beiden Skripte in zwei separaten Terminals und beobachten deren Ausgabe. Während der Sensorprozess keine Ausgabe erzeugt, sollte der Anzeigeprozess im Fünf-Sekunden-Takt Messwerte ausgeben (Abbildung 2).
Fazit
Ob Point-to-Point oder Publish-Subscribe: Mit dem IPC-Modul können wir Shell-Skripte im Handumdrehen miteinander sprechen lassen. Für welche Variante wir uns entscheiden, hängt von der Architektur des Systems ab, das wir bauen.
In dieser Serie wollen wir nicht nur ein flexibles System entwickeln, sondern auch die Fähigkeiten der Bash möglichst beeindruckend zur Schau stellen. Wir werden deshalb den Fokus auf asynchrone Kommunikation mit PubSub-Messaging legen. Doch bevor wir uns auf den ersten Prozess unseres Systems stürzen, müssen wir noch einen Schritt zurück machen: Wir benötigen auch noch ein Modul zum Entwickeln von Daemon-Prozessen. Darum geht es im nächsten Beitrag dieser Serie. (jcb)





