Parallelisierung kann eine Anwendung beschleunigen, führt aber auch schnell zu Bugs. Petri-Netze helfen dabei, eine Queue ohne Deadlocks zu implementieren.
Im letzten Beitrag haben wir zwei Datenstrukturen kennengelernt, mit denen sich nebenläufige Prozesse synchronisieren lassen. Mit einem Mutex verhindern wir, dass zwei Prozesse gleichzeitig dieselbe Codepassage ausführen. Über Semaphoren realisieren wir einfache Warteschlangen, um Produzenten und Verbraucher von Daten zu synchronisieren. Auf Basis dieser beiden Strukturen implementieren wir in dieser Folge eine Datenstruktur, mit der wir Nachrichten zwischen Prozessen austauschen: die Queue.
|
Verteilte Systeme in der Bash (Teil 1) |
LM 06/2022, S. 76 |
|
|
Verteilte Systeme in der Bash (Teil 2) |
LM 07/2022, S. 82 |
|
|
Framework für automatisierte Tests |
LM 08/2022, S. 80 |
|
|
Komfortables Logging für die Bash |
LM 09/2022, S. 78 |
|
|
Kommandozeilenparser im Eigenbau |
LM 10/2022, S. 72 |
|
|
Mutexe und Semaphoren in der Bash |
LM 11/2022, S. 80 |
|
|
Deadlock-freie Queues in der Bash |
LM 12/2022, S.82 |
<U>https://www.lm-online.de/48475<U> |
Gegenseitige Blockade
Der Rat, dass man sich vor dem Programmieren über die Struktur Gedanken machen sollte, gilt bei nebenläufigen Prozessen umso mehr. Neben den üblichen Logikfehlern lauert hier eine weitere Gefahr: Deadlocks. Der Begriff bezeichnet eine Situation, in der zwei Prozesse aufeinander warten und keiner der beiden fortfahren kann.
Angenommen, wir haben zwei Mutexe m0 und m1 sowie zwei Prozesse p0 und p1. Um seine Arbeit zu verrichten, schließt p0 erst m0 und dann m1. Zeitgleich schließt p1 erst m1 und versucht dann, den Mutex m0 zu schließen. Den hat ja aber bereits p0 geschlossen. Beide Prozesse können erst dann fortfahren, wenn der jeweils andere “seinen” Mutex öffnet, was aber nie passiert, da sich beide Prozesse blockieren. Genau wie p0 und p1 bleibt auch jeder weitere Prozess stecken, der nun versucht, einen der beiden Mutexe zu schließen.
Bei Deadlocks handelt es sich also um Logikfehler, die zwei oder mehr interagierende Prozesse auslösen. Diese Fehlerklasse tritt auf, wenn das Gedankenmodell nicht mehr stimmt, das wir von unserem Skript oder unserer Anwendung haben. Es gibt plötzlich nicht mehr nur einen einzigen, sondern mehrere Kontrollströme, die sich gegenseitig beeinflussen. Um eine Deadlock-freie Anwendung zu schreiben, brauchen wir deshalb ein Hilfsmittel, mit dem sich Programme mit mehreren Kontrollströmen veranschaulichen lassen. Hier kommen Petri-Netze ins Spiel.
Offensichtlich parallel
Petri-Netze wurden von dem Mathematiker Carl Adam Petri entwickelt, um nebenläufige Schaltvorgänge zu modellieren. Es gibt vier verschiedene Elemente, die in einem einfachen Petri-Netz vorkommen können.
Ein Platz, dargestellt durch einen Kreis, repräsentiert in einem Petri-Netz eine oder mehrere Zeilen Code; ausgenommen sind synchronisierende Funktionen wie »mutex_lock()« oder »sem_post()«. Ein Punkt in einem Platz steht für ein »Token« und verdeutlicht, dass der Code in dem Platz gerade von einem Prozess ausgeführt wird. Transitionen definieren die Bedingungen, unter denen sich Token durch das Netz bewegen können, und erscheinen in einem Petri-Netz als Rechteck in der Verbindung zwischen mehreren Plätzen. Dabei verbinden Pfeile die Transitionen und Plätze, Token bewegen sich entlang der Pfeile durch das Netz. Ein Pfeil verläuft immer zwischen einer Transition und einem Platz, niemals zwischen zwei Transitionen oder Plätzen.
Ein Petri-Netz verändert sich, sobald eine Transition ausgelöst wird. Eine Transition kann genau dann auslösen, wenn in allen eingehenden Plätzen – also allen Plätze, von denen ein Pfeil auf die Transition zeigt – mindestens ein Token vorliegt. Die Transition absorbiert dann von jedem der eingehenden Plätze ein Token und gibt eines an jeden ausgehenden Platz ab – also an die Plätze, auf die die Transition zeigt.
Die Anzahl der absorbierten und ausgegebenen Token muss dabei nicht gleich sein. Durch das Auslösen einer Transition kann sich die Gesamtzahl der Token im Netz verändern. Wenn eine Transition auslöst, so sagt man auch, dass sie feuert. In Abbildung 1 sehen Sie ein Beispiel für eine Transition vor (a) und nach (b) dem Auslösen. Die Transition in (c) kann nicht auslösen.
Rennen im Netz
Nehmen wir nun an, wir haben eine Funktion wie die in Listing 1, die einen in einer Datei gespeicherten Zähler inkrementiert. Diese Funktion können wir als Petri-Netz mit zwei Plätzen und zwei Transitionen modellieren. Der erste Platz p0 steht hier für den Code, der die Funktion aufruft und zu dem sie anschließend zurückkehrt. Da die Funktion weder Mutexe noch Semaphoren benutzt, genügt der Platz p1, um die gesamte Funktion zu modellieren. Führt man das Skript nur einmal aus, startet das Petri-Netz mit einem Token in p0. Es feuert zuerst die Transition t0, dann t1 (Abbildung 2).
Listing 1
Zähler erhöhen
inkrement() {
local datei="$1"
local -i zaehler
zaehler=$(< "$datei")
(( zaehler++ ))
echo "$zaehler" > "$datei"
}
Wird das Skript zweimal gleichzeitig ausgeführt, haben wir zunächst zwei Token in p0, und t0 feuert zuerst. Anschließend können aber sowohl t0 als auch t1 feuern, womit wir zum eigentlichen Problem kommen: Löst t1 zuerst aus, so wird die Funktion »inkrement()« nicht parallel ausgeführt, und es tritt kein Problem auf (Abbildung 3, (a)). Falls jedoch t0 zuerst feuert, befinden sich zwei Prozesse gleichzeitig in der Funktion, und es kann zu einer Race Condition kommen (Abbildung 3, (b)). Wie wir im letzten Beitrag gesehen haben, kann es durchaus passieren, dass der Zähler trotz eines zweimaligen Aufrufs von »inkrement()« nur um eins inkrementiert wird.
Befindet sich das Netz in einem Zustand, in dem mehrere Transitionen feuern können, kann man nicht voraussagen, in welcher Reihenfolge die Transitionen auslösen oder ob sie nicht sogar gleichzeitig feuern. Wir müssen das Netz deshalb so umformen, dass keine zwei Token gleichzeitig den Platz p1 betreten können.
Platz für den Mutex
Wir fügen deshalb einen neuen Platz pm ein, über den wir die Transition t0 steuern. Den neuen Platz verbinden wir mit t0 und geben ihm ein Token, das der Platz abgibt, wenn t0 feuert. Da pm nun über keine weiteren Token mehr verfügt, kann t0 nicht mehr feuern, bis pm wieder ein Token hat. Es kann also kein zweites Token nach p1 gelangen.
Auf der anderen Seite verbinden wir t1 mit pm, sodass pm sein Token zurückbekommt, wenn es von p1 nach p0 zurückkehrt. Auf diese Weise stellen wir sicher, dass nie mehr als ein Token den Platz p1 über die Transition t0 betritt. Wenig überraschend modelliert das resultierende Petri-Netz einen Mutex (Abbildung 4). Die Transition t0 steht für die Funktion »mutex_lock()« und t1 für »mutex_unlock()«. Ein Platz lässt sich also auch wie eine Variable nutzen, in diesem Fall wie der Zustand eines Mutex.
Platz für Semaphoren
Um das Bild zu vervollständigen, wollen wir jetzt noch wissen, wie wir einen Semaphor in einem Petri-Netz modellieren. Anders als im Fall von Mutexen, die gewöhnlich in derselben Funktion geschlossen und geöffnet werden, ist die Funktion, die einen Semaphor inkrementiert, selten auch die, die ihn dekrementiert. Das Petri-Netz eines Semaphors hat daher zwei Seiten, die die zwei Funktionen eines Semaphors darstellen. Die linke Seite modelliert den Aufruf von »sem_post()«, der den Zähler des Semaphors inkrementiert; die rechte Seite modelliert den Aufruf von »sem_wait()«, der den Zähler dekrementiert und den Prozess warten lässt, solange sich keine Token in dem Semaphor befinden. Dieses Verhalten erzielen wir mit einem Platz ps, der die Token von der linken zur rechten Seite des Petri-Netzes fließen lässt (Abbildung 5).
Wir versuchen nun, das Petri-Netz für eine Queue zu modellieren. Da die Queue zum Austausch von Nachrichten dient, braucht sie zwei Funktionen: »queue_put()« fügt eine Nachricht an das Ende der Queue an, »queue_get()« gibt die Nachricht vom Anfang der Queue zurück. Befinden sich keine Nachrichten in der Queue, soll die Funktion auf die Ankunft einer Nachricht warten.
Die Queue implementieren wir mit einer Datei, an deren Ende wir Nachrichten anhängen, beziehungsweise Nachrichten vom Anfang auslesen und entfernen. Da wir gleichzeitige Zugriffe auf die Datei verhindern müssen, schützen wir sie mit einem Mutex. Da »queue_get()« auf eine Nachricht warten soll, wenn die Queue leer ist, könnte man auf die Idee kommen, die Funktion mit einer Schleife zu implementieren. Es geht aber einfacher: Wir lassen »queue_put()« einen Semaphor inkrementieren, auf den »queue_get()« wartet.
Wir modellieren also auch hier zwei Funktionen in einem Petri-Netz, wobei »queue_get()« auf der linken Seite von oben nach unten und »queue_put()« auf der rechten Seite von unten nach oben fließt. Die unterschiedliche Flussrichtung ist nicht zwingend notwendig, macht das Netz aber übersichtlicher (Abbildung 6).
Das Queue-Modul
Wir wissen also, dass eine Queue neben der Datei zum Speichern der Nachrichten auch aus einem Mutex und einem Semaphor besteht. Wir implementieren eine Queue also als ein Verzeichnis, das die drei Objekte »data«, »mutex« und »sem« enthält. Das Anhängen von Nachrichten implementieren wir in der Hilfsfunktion »_queue_put_data()«. Die Hilfsfunktion »_queue_get_data()« gibt die erste Nachricht zurück und entfernt sie aus der Queue.
Wie »queue_get()« auszusehen hat, können wir nun direkt an dem Petri-Netz aus Abbildung 6 ablesen. Mit einem Aufruf von »sem_wait()« sorgen wir zunächst dafür, dass die Queue eine Nachricht enthält. Anschließend stellen wir mit »mutex_lock()« sicher, dass wir die Queue für uns haben, und entnehmen ihr mittels »_queue_get_data()« die Nachricht. Über »mutex_unlock()« geben wir die Queue wieder frei und lassen die Nachricht auf der Standardausgabe erscheinen. Da »sem_wait()« dem Aufrufer erlaubt, einen Timeout zu definieren, gestatten wir auch dem Aufrufer von »queue_get()«, im zweiten Parameter einen Timeout an die Funktion zu übergeben. Die fertige Funktion »queue_get()« sehen Sie in Listing 2.
Listing 2
queue_get()
queue_get() {
local queue="$1"
local -i timeout="${2--1}"
local data
local -i err
if ! sem_wait "$queue/sem" "$timeout"; then
return 1
fi
mutex_lock "$queue/mutex"
data=$(_queue_get_data "$queue")
err="$?"
mutex_unlock "$queue/mutex"
if (( err == 0 )); then
printf '%s\n' "$data"
fi
return "$err"
}
Der Code von »queue_put()« lässt sich ebenfalls direkt aus dem Petri-Netz ablesen, wobei wir dank der Symmetrie bereits erahnen können, dass wir die Funktion »queue_get()« lediglich umdrehen müssen. Erst hängen wir die Nachricht an die Queue an, dann inkrementieren wir den Semaphor (Listing 3).
Listing 3
queue_put()
queue_put() {
local queue="$1"
local data="$2"
local -i err
mutex_lock "$queue/mutex"
_queue_put_data "$queue" "$data"
err="$?"
mutex_unlock "$queue/mutex"
sem_post "$queue/sem"
return "$err"
}
Der komplizierte Teil der Queue ist damit schon fertig. In den Hilfsfunktionen verbirgt sich allerdings noch ein wichtiges Detail. Um Nachrichten unkompliziert anhängen und entfernen zu können, wird jede Zeile in der Datei »data« als eine Nachricht behandelt. Das erlaubt uns, Standardwerkzeuge wie Head und Sed zu nutzen, erfordert aber auch, dass jede Nachricht wirklich nur eine Zeile enthält. Andererseits sollte unsere Queue die Daten des Aufrufers unabhängig vom Inhalt korrekt behandeln, weshalb wir Zeilenumbrüche aus den Nachrichten entfernen müssen.
Statt das händisch zu erledigen und dabei möglicherweise Bugs zu verursachen, wandeln wir die komplette Nachricht in ein Format um, das keine Zeilenumbrüche kennt: Base64. Das erledigt der Befehl »base64« aus den Coreutils. Ohne Optionen aufgerufen, konvertiert er die Daten auf der Standardeingabe in Base64 und gibt sie auf der Standardausgabe aus. Allerdings fügt er alle 76 Zeichen einen Zeilenumbruch ein, der die Ausgabe aufhübscht, für die Daten aber keine Bedeutung hat. Die Ausgabe dieser Zeilenumbrüche lässt sich mit der Option »-w 0« unterdrücken.
Die Umkehrfunktion »_queue_get_data()« liest mit »head -n 1« die erste Zeile aus der Queue und entfernt sie mit »sed -i ‘1d’« daraus. Über »base64 -d« wandelt der Code die Nachricht dann wieder in ihr ursprüngliches Format um. Beide Hilfsfunktionen sehen Sie in Listing 4.
Listing 4
Nachrichten in Base64
_queue_put_data() {
local queue="$1"
local data="$2"
local encoded
if ! encoded=$(base64 -w 0 <<< "$data"); then
return 1
fi
if ! echo "$encoded" >> "$queue/data"; then
return 1
fi
return 0
}
_queue_get_data() {
local queue="$1"
local encoded
if ! encoded=$(head -n 1 "$queue/data"); then
return 1
fi
if ! sed -i '1d' "$queue/data"; then
return 1
fi
if ! base64 -d <<< "$encoded"; then
return 1
fi
return 0
}
Eine Queue erzeugen wir mit der Funktion »queue_init()«, die das im ersten Argument übergebene Verzeichnis erstellt und darin den Semaphor »sem« initialisiert. Die Funktion »queue_destroy()« entfernt eine Queue samt Inhalt. Diese zwei trivialen Funktionen und den ebenso simplen Modulkonstruktor beleuchten wir hier nicht weiter. Das neue Modul speichern wir nun wie gewohnt in »/usr/local/share/bms/include/« unter dem Namen »queue.sh« ab.
Geben und Nehmen
Ein kurzes Beispiel soll veranschaulichen, wie man das neue Modul benutzt. Wir schreiben dafür zwei Skripte, die über eine Queue miteinander kommunizieren. Das erste nennen wir »empfaenger.sh«. Es soll von einer Queue namens »/tmp/msgq« Nachrichten entgegennehmen und auf der Standardausgabe ausgeben. Wenn man das Skript mit [Strg]+[C] beendet, soll es die Queue nicht im Dateisystem zurücklassen, sondern, wie es sich gehört, hinter sich aufräumen. Das Skript muss daher mit »trap« das Signal »SIGINT« abfangen und sich ordnungsgemäß beenden (Listing 5). Hier kommt auch das Timeout der Funktion »queue_get()« zum Einsatz. Es sorgt dafür, dass das Skript regelmäßig prüft, ob es sich beenden soll.
Listing 5
empfaenger.sh (Auszug)
abbruch() {
stop=1
}
main() {
declare -gi stop
if ! queue_init "/tmp/msgq"; then
return 1
fi
trap abbruch INT
stop=0
while (( stop == 0 )); do
local msg
if msg=$(queue_get "/tmp/msgq" 5); then
log_highlight "Nachricht" <<< "$msg"
fi
done
queue_destroy "/tmp/msgq"
return 0
}
Das zweite Skript »sender.sh« soll lediglich eine kurze Nachricht mit Zeilenumbruch an die Queue anhängen, ohne eine Ausgabe zu erzeugen. Wie in Listing 6 zu sehen, braucht es dazu nicht viel mehr als einen Aufruf von »queue_put()«.
Listing 6
sender.sh (Auszug)
main() {
local msg
msg=$(printf 'Hallo,\nWelt!\n')
queue_put "/tmp/msgq" "$msg"
}
In zwei separaten Terminals führen wir nun zuerst den Empfänger aus und dann den Sender. Während Letzterer keine Ausgabe erzeugt, sollte der Empfänger die Nachricht des Senders wie im linken Teil von Abbildung 7 gezeigt auf der Standardausgabe wiedergeben. Auf diese Weise können Skripte auch Nachrichten austauschen, die deutlich länger ausfallen als die aus dem Beispiel.
Ausblick
Queues erlauben uns, Nachrichten unidirektional von einem Skript an ein anderes zu senden. Beim Austausch von Nachrichten in beiden Richtungen kommen wir mit einer einzigen Queue jedoch nicht weit. Im nächsten Teil werden wir deshalb das Modul »ipc« entwickeln, das auf der Basis von Queues eine komfortable Kommunikation zwischen zwei und mehr Prozessen ermöglicht. (jcb/jlu)












