diff --git a/kubernetes/k8.yaml b/kubernetes/k8.yaml
index bc64224..a09f03d 100644
--- a/kubernetes/k8.yaml
+++ b/kubernetes/k8.yaml
@@ -150,7 +150,7 @@ metadata:
name: manager
spec:
containers:
- - image: benblamey/hom-impl-2.manager:latest
+ - image: haoyuan9654/hom-impl-2.manager:latest
# image is local-only atm.
imagePullPolicy: Always
name: manager
diff --git a/manager/.idea/.gitignore b/manager/.idea/.gitignore
new file mode 100644
index 0000000..26d3352
--- /dev/null
+++ b/manager/.idea/.gitignore
@@ -0,0 +1,3 @@
+# Default ignored files
+/shelf/
+/workspace.xml
diff --git a/manager/.idea/.name b/manager/.idea/.name
new file mode 100644
index 0000000..63c9479
--- /dev/null
+++ b/manager/.idea/.name
@@ -0,0 +1 @@
+hom-impl-2.manager
\ No newline at end of file
diff --git a/manager/.idea/compiler.xml b/manager/.idea/compiler.xml
new file mode 100644
index 0000000..659bf43
--- /dev/null
+++ b/manager/.idea/compiler.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/manager/.idea/gradle.xml b/manager/.idea/gradle.xml
new file mode 100644
index 0000000..d405dbe
--- /dev/null
+++ b/manager/.idea/gradle.xml
@@ -0,0 +1,18 @@
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/manager/.idea/jarRepositories.xml b/manager/.idea/jarRepositories.xml
new file mode 100644
index 0000000..fdc392f
--- /dev/null
+++ b/manager/.idea/jarRepositories.xml
@@ -0,0 +1,20 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/manager/.idea/misc.xml b/manager/.idea/misc.xml
new file mode 100644
index 0000000..564e332
--- /dev/null
+++ b/manager/.idea/misc.xml
@@ -0,0 +1,5 @@
+
+
+
+
+
\ No newline at end of file
diff --git a/manager/Dockerfile b/manager/Dockerfile
index c1743e4..f7af69e 100644
--- a/manager/Dockerfile
+++ b/manager/Dockerfile
@@ -1,4 +1,6 @@
-FROM ubuntu:hirsute
+FROM ubuntu:jammy
+# clean and update sources
+RUN apt-get -y update
# Ubuntu 21.04
# with ubuntu:impish, get an error on apt update
@@ -13,11 +15,11 @@ ENV TZ=Europe/Stockholm
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
# Always run update when changing package list, see https://docs.docker.com/develop/develop-images/dockerfile_best-practices/
-RUN apt update ; echo 'editthistoforcerun5'
+#RUN apt update ; echo 'editthistoforcerun5'
# install curl
RUN apt install -y curl
-RUN apt install -y openjdk-16-jre
+RUN apt install -y openjdk-18-jre
RUN java --version
@@ -42,8 +44,8 @@ RUN curl -LO "https://dl.k8s.io/release/v1.22.3/bin/linux/amd64/kubectl"
RUN install -o root -g root -m 0755 kubectl /usr/local/bin/kubectl
RUN kubectl version --client
-COPY build/libs/hom-impl-2.manager-1.0-SNAPSHOT.jar output.jar
+COPY hom-impl-2.manager-1.0-SNAPSHOT.jar output.jar
# /usr/lib/jvm/jdk-17/bin/java -cp output.jar com.benblamey.hom.manager.ManagerMainTest
#ENTRYPOINT ["java","-jar","output.jar"]
-ENTRYPOINT ["/bin/bash"]
\ No newline at end of file
+ENTRYPOINT ["/bin/bash"]
diff --git a/manager/build.gradle b/manager/build.gradle
index 50334ae..5ec8766 100644
--- a/manager/build.gradle
+++ b/manager/build.gradle
@@ -30,6 +30,9 @@ dependencies {
// https://mvnrepository.com/artifact/com.googlecode.json-simple/json-simple
implementation group: 'com.googlecode.json-simple', name: 'json-simple', version: '1.1.1'
+
+ // https://mvnrepository.com/artifact/com.fasterxml.jackson.dataformat/jackson-dataformat-xml
+ implementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-xml', version: '2.13.3'
}
// Create a fat JAR to use with Docker, by overriding the default configuration for 'jar':
@@ -70,4 +73,5 @@ pushDockerImage.dependsOn buildDockerImage
test {
useJUnitPlatform()
-}
\ No newline at end of file
+}
+
diff --git a/manager/src/main/java/com/benblamey/hom/manager/InputTier.java b/manager/src/main/java/com/benblamey/hom/manager/InputTier.java
index 276fdbb..d874bab 100644
--- a/manager/src/main/java/com/benblamey/hom/manager/InputTier.java
+++ b/manager/src/main/java/com/benblamey/hom/manager/InputTier.java
@@ -8,7 +8,7 @@
import java.util.Map;
// Placeholder for existin Kafka stream repr. the input source for the system.
-public class InputTier extends Tier {
+public class InputTier extends Tier {
Logger logger = LoggerFactory.getLogger(InputTier.class);
static final int tierId = 0;
diff --git a/manager/src/main/java/com/benblamey/hom/manager/Manager.java b/manager/src/main/java/com/benblamey/hom/manager/Manager.java
index bfe3901..004e2dc 100644
--- a/manager/src/main/java/com/benblamey/hom/manager/Manager.java
+++ b/manager/src/main/java/com/benblamey/hom/manager/Manager.java
@@ -1,11 +1,16 @@
package com.benblamey.hom.manager;
+import com.fasterxml.jackson.dataformat.xml.XmlMapper;
+
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
+import java.io.*;
import java.util.*;
+
+
public class Manager {
Logger logger = LoggerFactory.getLogger(Manager.class);
@@ -64,6 +69,13 @@ public void addJexlTier(String jexlExpression) throws IOException, InterruptedEx
String inputTopic = m_tiers.isEmpty() ? "haste-input-data" : m_tiers.get(m_tiers.size() - 1).getOutputTopic();
int tierIndex = m_tiers.size();
Tier tier = new JexlDeploymentTier(jexlExpression, tierIndex, inputTopic);
+ TierSerialization ts = new TierSerialization();
+ //re-serialize the current tiers
+ ts.removeOldTiersXml();
+ for (int i = 0; i < m_tiers.size(); i++) {
+ Tier tier1 = m_tiers.get(i);
+ ts.serializeTier(tier1);
+ }
m_tiers.add(tier);
}
@@ -77,6 +89,13 @@ public void addNotebookTier(String filenameAndFunction) throws IOException, Inte
int tierIndex = m_tiers.size();
Tier tier = new PyWorkerDeploymentTier(filenameAndFunction, tierIndex, inputTopic);
m_tiers.add(tier);
+ TierSerialization ts = new TierSerialization();
+ //re-serialize the current tiers
+ ts.removeOldTiersXml();
+ for (int i = 0; i < m_tiers.size(); i++) {
+ Tier tier1 = m_tiers.get(i);
+ ts.serializeTier(tier1);
+ }
}
public void removeTier() throws IOException, InterruptedException {
@@ -85,18 +104,77 @@ public void removeTier() throws IOException, InterruptedException {
}
Tier tier = m_tiers.get(m_tiers.size() - 1);
-
tier.remove();
// TODO - remove old kafka data?
m_tiers.remove(tier);
+ TierSerialization ts = new TierSerialization();
+ //re-serialize the current tiers
+ ts.removeOldTiersXml();
+ for (int i = 0; i < m_tiers.size(); i++) {
+ Tier tier1 = m_tiers.get(i);
+ ts.serializeTier(tier1);
+ }
}
public void addBaseTier(String topicID) {
if (!getTiers().isEmpty()) {
throw new RuntimeException("Can only add base tier if no existing tiers");
}
+ TierSerialization ts = new TierSerialization();
Tier t = new InputTier(topicID);
+ ts.serializeTier(t);
m_tiers.add(t);
}
+
+ //deserialize tiers when the manager is restarted
+ public void deserializeTiers() {
+ if (m_tiers.isEmpty()) {
+ try {
+ FileInputStream fis = new FileInputStream("serializedTiers.xml");
+ Scanner sc = new Scanner(fis);
+ while(sc.hasNextLine())
+ {
+ XmlMapper xmlMapper = new XmlMapper();
+ Tier tier = xmlMapper.readValue(sc.nextLine(), Tier.class);
+ m_tiers.add(tier);
+ }
+ }catch (IOException e){
+ e.printStackTrace();
+ }
+ }
+ }
}
+
+class TierSerialization {
+ public void serializeTier(Tier tier) {
+ try {
+ String xmlStr = null;
+ XmlMapper xmlMapper = new XmlMapper();
+ //String useDir = System.getProperty("user.dir");
+ xmlStr = xmlMapper.writeValueAsString(tier);
+ FileWriter fileWriter = new FileWriter("serializedTiers.xml",true);
+ PrintWriter printWriter = new PrintWriter(fileWriter);
+ printWriter.println(xmlStr);
+ printWriter.close();
+
+ }catch (IOException e){
+ e.printStackTrace();
+ }
+ }
+
+ public void removeOldTiersXml() {
+ try {
+ File f = new File("serializedTiers.xml");
+ if (f.delete()){
+ System.out.println(f.getName()+" deleted!");
+ }
+ else {
+ System.out.println("Failed!");
+ }
+ }catch (Exception e){
+ e.printStackTrace();
+ }
+ }
+}
+
diff --git a/manager/src/main/java/com/benblamey/hom/manager/ManagerMainREST.java b/manager/src/main/java/com/benblamey/hom/manager/ManagerMainREST.java
index ef589ba..485ae37 100644
--- a/manager/src/main/java/com/benblamey/hom/manager/ManagerMainREST.java
+++ b/manager/src/main/java/com/benblamey/hom/manager/ManagerMainREST.java
@@ -1,5 +1,6 @@
package com.benblamey.hom.manager;
+import com.fasterxml.jackson.dataformat.xml.XmlMapper;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
@@ -7,6 +8,7 @@
import org.slf4j.LoggerFactory;
import spark.Response;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -32,6 +34,7 @@ public static void main(final String[] args) throws Exception {
spark.Spark.get("/", (req, res) -> {
logger.info("/");
+ manager.deserializeTiers();
return "The API is running.";
});
diff --git a/manager/src/main/java/com/benblamey/hom/manager/NotebookScraper.java b/manager/src/main/java/com/benblamey/hom/manager/NotebookScraper.java
index c93fb19..dc1199f 100644
--- a/manager/src/main/java/com/benblamey/hom/manager/NotebookScraper.java
+++ b/manager/src/main/java/com/benblamey/hom/manager/NotebookScraper.java
@@ -17,15 +17,15 @@ public static List getFunctions(String directory) throws IOException, In
"bash",
"-ec",
// Unescaped:
- // grep --extended-regexp --only-matching "^\s*\"\s*def ([^(])+\(" *.ipynb | sed -E "s/(.+):\s+\"def (.+)\(/\1,\2/"
- "grep --extended-regexp --only-matching \"^\\s*\\\"def ([^(])+\\(\" *.ipynb | sed -E \"s/(.+):\\s+\\\"def (.+)\\(/\\1::\\2/\""
- }, new File(directory), null, true).stdOut;
+ // grep --extended-regexp --only-matching "^\s*\"\s*def\s*[_a-zA-Z]+\w*\([_a-zA-Z]+\w*\)" *.ipynb | sed -E "s/(.+):\s+\"def (.+)\(/\1,\2/"
+ "grep --extended-regexp --only-matching \"^\\s*\\\"def\\s*[_a-zA-Z]+\\w*\\([_a-zA-Z]+\\w*\\)\" *.ipynb | sed -E \"s/(.+):\\s+\\\"def (.+)\\(/\\1::\\2/\""
+ }, new File(directory), null, true).stdOut;
return Arrays.stream(stdOut.split("\n")).toList();
}
// For testing...
public static void main(String[] args) throws IOException, InterruptedException {
- getFunctions("/Users/benblamey/projects/github-me/hom-impl-2/persistentvolume");
+ getFunctions("C:\\Users\\Savior_Hn\\Desktop\\HASTE-o-MATIC-main\\HASTE-o-MATIC-main\\persistentvolume");
}
}
diff --git a/manager/src/main/java/com/benblamey/hom/manager/Offsets.java b/manager/src/main/java/com/benblamey/hom/manager/Offsets.java
index ba83f4c..1ed4025 100644
--- a/manager/src/main/java/com/benblamey/hom/manager/Offsets.java
+++ b/manager/src/main/java/com/benblamey/hom/manager/Offsets.java
@@ -75,8 +75,8 @@ static List fetchOffsets() {
// System.out.println(result);
List offsetInfos = Arrays.stream(result.split("\\n"))
.filter(line -> line.startsWith("app-hom-tier-"))
- .map(line -> Arrays.stream(line.split("\s+")).toList())
- .map(parts -> new OffsetInfo(parts))
+ .map(line -> Arrays.stream(line.split("\\s+")).toList())
+ .map(parts -> new OffsetInfo((List) parts))
.toList();
return offsetInfos;
}
diff --git a/manager/src/main/java/com/benblamey/hom/manager/Tier.java b/manager/src/main/java/com/benblamey/hom/manager/Tier.java
index f5d28c0..4544ba2 100644
--- a/manager/src/main/java/com/benblamey/hom/manager/Tier.java
+++ b/manager/src/main/java/com/benblamey/hom/manager/Tier.java
@@ -1,10 +1,14 @@
package com.benblamey.hom.manager;
+import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement;
+
import java.io.IOException;
import java.util.Map;
+@JacksonXmlRootElement(localName = "tier")
public abstract class Tier {
+
String outputTopic;
String uniqueTierId;
String friendlyTierId;
diff --git a/manager/src/main/java/com/benblamey/hom/manager/TierSerialization.java b/manager/src/main/java/com/benblamey/hom/manager/TierSerialization.java
new file mode 100644
index 0000000..bedc32c
--- /dev/null
+++ b/manager/src/main/java/com/benblamey/hom/manager/TierSerialization.java
@@ -0,0 +1,12 @@
+package com.benblamey.hom.manager;
+
+import com.fasterxml.jackson.core.JsonGenerationException;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.xml.XmlMapper;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+