From 3039e31d088462ecc4b82d888153479204885df3 Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Tue, 1 Oct 2024 14:03:28 -0700 Subject: [PATCH 1/3] add counter wf --- .../controller/CounterWorkflowController.java | 55 ++++++++++++++ .../workflow/update/CounterWorkflow.java | 71 +++++++++++++++++++ 2 files changed, 126 insertions(+) create mode 100644 src/main/java/io/iworkflow/controller/CounterWorkflowController.java create mode 100644 src/main/java/io/iworkflow/workflow/update/CounterWorkflow.java diff --git a/src/main/java/io/iworkflow/controller/CounterWorkflowController.java b/src/main/java/io/iworkflow/controller/CounterWorkflowController.java new file mode 100644 index 0000000..aacc03a --- /dev/null +++ b/src/main/java/io/iworkflow/controller/CounterWorkflowController.java @@ -0,0 +1,55 @@ +package io.iworkflow.controller; + +import io.iworkflow.core.Client; +import io.iworkflow.core.ClientSideException; +import io.iworkflow.gen.models.ErrorSubStatus; +import io.iworkflow.workflow.microservices.ImmutableSignupForm; +import io.iworkflow.workflow.signup.UserSignupWorkflow; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; + +@Controller +@RequestMapping("/signup") +public class SignupWorkflowController { + + private final Client client; + + public SignupWorkflowController( + final Client client + ) { + this.client = client; + } + + @GetMapping("/submit") + public ResponseEntity start( + @RequestParam String username, + @RequestParam String email + ) { + try { + final ImmutableSignupForm form = ImmutableSignupForm.builder() + .username(username) + .email(email) + .firstName("Test") + .lastName("Test") + .build(); + client.startWorkflow(UserSignupWorkflow.class, username, 3600, form); + } catch (ClientSideException e) { + if (e.getErrorSubStatus() != ErrorSubStatus.WORKFLOW_ALREADY_STARTED_SUB_STATUS) { + throw e; + } + return ResponseEntity.ok("username already started registry"); + } + return ResponseEntity.ok("success"); + } + + @GetMapping("/verify") + ResponseEntity verify( + @RequestParam String username) { + final UserSignupWorkflow rpcStub = client.newRpcStub(UserSignupWorkflow.class, username); + String result = client.invokeRPC(rpcStub::verify); + return ResponseEntity.ok(result); + } +} \ No newline at end of file diff --git a/src/main/java/io/iworkflow/workflow/update/CounterWorkflow.java b/src/main/java/io/iworkflow/workflow/update/CounterWorkflow.java new file mode 100644 index 0000000..3be59f7 --- /dev/null +++ b/src/main/java/io/iworkflow/workflow/update/CounterWorkflow.java @@ -0,0 +1,71 @@ +package io.iworkflow.workflow.update; + +import io.iworkflow.core.Context; +import io.iworkflow.core.ObjectWorkflow; +import io.iworkflow.core.RPC; +import io.iworkflow.core.StateDef; +import io.iworkflow.core.communication.Communication; +import io.iworkflow.core.communication.CommunicationMethodDef; +import io.iworkflow.core.communication.InternalChannelDef; +import io.iworkflow.core.persistence.DataAttributeDef; +import io.iworkflow.core.persistence.Persistence; +import io.iworkflow.core.persistence.PersistenceFieldDef; +import io.iworkflow.workflow.MyDependencyService; +import io.iworkflow.workflow.microservices.SignupForm; +import org.springframework.stereotype.Component; + +import java.util.Arrays; +import java.util.List; + +@Component + +public class UserSignupWorkflow implements ObjectWorkflow { + + public static final String DA_FORM = "Form"; + + public static final String DA_Status = "Status"; + public static final String VERIFY_CHANNEL = "Verify"; + + private MyDependencyService myService; + + public UserSignupWorkflow(MyDependencyService myService) { + this.myService = myService; + } + + @Override + public List getWorkflowStates() { + return Arrays.asList( + StateDef.startingState(new SubmitState(myService)), + StateDef.nonStartingState(new VerifyState(myService)) + ); + } + + @Override + public List getPersistenceSchema() { + return Arrays.asList( + DataAttributeDef.create(SignupForm.class, DA_FORM), + DataAttributeDef.create(String.class, DA_Status) + ); + } + + @Override + public List getCommunicationSchema() { + return Arrays.asList( + InternalChannelDef.create(Void.class, VERIFY_CHANNEL) + ); + } + + // Atomically read/write/send message in RPC + @RPC( + dataAttributesLoadingType = P + ) + public String verify(Context context, Persistence persistence, Communication communication) { + String status = persistence.getDataAttribute(DA_Status, String.class); + if (status == "verified") { + return "already verified"; + } + persistence.setDataAttribute(DA_Status, "verified"); + communication.publishInternalChannel(VERIFY_CHANNEL, null); + return "done"; + } +} From 5f184e86bfe554f2c84570114716304c99c90aa7 Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Tue, 1 Oct 2024 14:03:59 -0700 Subject: [PATCH 2/3] Add counter wf --- .../controller/CounterWorkflowController.java | 42 +++++---------- .../workflow/update/CounterWorkflow.java | 52 +++++-------------- 2 files changed, 26 insertions(+), 68 deletions(-) diff --git a/src/main/java/io/iworkflow/controller/CounterWorkflowController.java b/src/main/java/io/iworkflow/controller/CounterWorkflowController.java index aacc03a..fa969a6 100644 --- a/src/main/java/io/iworkflow/controller/CounterWorkflowController.java +++ b/src/main/java/io/iworkflow/controller/CounterWorkflowController.java @@ -1,10 +1,7 @@ package io.iworkflow.controller; import io.iworkflow.core.Client; -import io.iworkflow.core.ClientSideException; -import io.iworkflow.gen.models.ErrorSubStatus; -import io.iworkflow.workflow.microservices.ImmutableSignupForm; -import io.iworkflow.workflow.signup.UserSignupWorkflow; +import io.iworkflow.workflow.update.CounterWorkflow; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; @@ -12,44 +9,29 @@ import org.springframework.web.bind.annotation.RequestParam; @Controller -@RequestMapping("/signup") -public class SignupWorkflowController { +@RequestMapping("/counter") +public class CounterWorkflowController { private final Client client; - public SignupWorkflowController( + public CounterWorkflowController( final Client client ) { this.client = client; } - @GetMapping("/submit") + @GetMapping("/start") public ResponseEntity start( - @RequestParam String username, - @RequestParam String email + @RequestParam String id ) { - try { - final ImmutableSignupForm form = ImmutableSignupForm.builder() - .username(username) - .email(email) - .firstName("Test") - .lastName("Test") - .build(); - client.startWorkflow(UserSignupWorkflow.class, username, 3600, form); - } catch (ClientSideException e) { - if (e.getErrorSubStatus() != ErrorSubStatus.WORKFLOW_ALREADY_STARTED_SUB_STATUS) { - throw e; - } - return ResponseEntity.ok("username already started registry"); - } + client.startWorkflow(CounterWorkflow.class, id, 3600); return ResponseEntity.ok("success"); } - @GetMapping("/verify") - ResponseEntity verify( - @RequestParam String username) { - final UserSignupWorkflow rpcStub = client.newRpcStub(UserSignupWorkflow.class, username); - String result = client.invokeRPC(rpcStub::verify); - return ResponseEntity.ok(result); + @GetMapping("/inc") + ResponseEntity verify( + @RequestParam String id) { + final CounterWorkflow rpcStub = client.newRpcStub(CounterWorkflow.class, id); + return ResponseEntity.ok(client.invokeRPC(rpcStub::inc)); } } \ No newline at end of file diff --git a/src/main/java/io/iworkflow/workflow/update/CounterWorkflow.java b/src/main/java/io/iworkflow/workflow/update/CounterWorkflow.java index 3be59f7..80b7c9b 100644 --- a/src/main/java/io/iworkflow/workflow/update/CounterWorkflow.java +++ b/src/main/java/io/iworkflow/workflow/update/CounterWorkflow.java @@ -5,67 +5,43 @@ import io.iworkflow.core.RPC; import io.iworkflow.core.StateDef; import io.iworkflow.core.communication.Communication; -import io.iworkflow.core.communication.CommunicationMethodDef; -import io.iworkflow.core.communication.InternalChannelDef; import io.iworkflow.core.persistence.DataAttributeDef; import io.iworkflow.core.persistence.Persistence; import io.iworkflow.core.persistence.PersistenceFieldDef; -import io.iworkflow.workflow.MyDependencyService; -import io.iworkflow.workflow.microservices.SignupForm; +import io.iworkflow.gen.models.PersistenceLoadingType; import org.springframework.stereotype.Component; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; @Component -public class UserSignupWorkflow implements ObjectWorkflow { +public class CounterWorkflow implements ObjectWorkflow { - public static final String DA_FORM = "Form"; - - public static final String DA_Status = "Status"; - public static final String VERIFY_CHANNEL = "Verify"; - - private MyDependencyService myService; - - public UserSignupWorkflow(MyDependencyService myService) { - this.myService = myService; - } + public static final String DA_COUNT = "COUNT"; @Override public List getWorkflowStates() { - return Arrays.asList( - StateDef.startingState(new SubmitState(myService)), - StateDef.nonStartingState(new VerifyState(myService)) - ); + return new ArrayList<>(); } @Override public List getPersistenceSchema() { return Arrays.asList( - DataAttributeDef.create(SignupForm.class, DA_FORM), - DataAttributeDef.create(String.class, DA_Status) - ); - } - - @Override - public List getCommunicationSchema() { - return Arrays.asList( - InternalChannelDef.create(Void.class, VERIFY_CHANNEL) + DataAttributeDef.create(Integer.class, DA_COUNT) ); } - // Atomically read/write/send message in RPC + // Atomically read/write data attributes in RPC will use Temporal sync update features @RPC( - dataAttributesLoadingType = P + dataAttributesLoadingType = PersistenceLoadingType.PARTIAL_WITH_EXCLUSIVE_LOCK, + dataAttributesLockingKeys = {DA_COUNT} ) - public String verify(Context context, Persistence persistence, Communication communication) { - String status = persistence.getDataAttribute(DA_Status, String.class); - if (status == "verified") { - return "already verified"; - } - persistence.setDataAttribute(DA_Status, "verified"); - communication.publishInternalChannel(VERIFY_CHANNEL, null); - return "done"; + public int inc(Context context, Persistence persistence, Communication communication) { + int count = persistence.getDataAttribute(DA_COUNT, Integer.class); + count++; + persistence.setDataAttribute(DA_COUNT, count); + return count; } } From 2a0c8462826405db8f2909f83d69969a2f1706f3 Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Tue, 1 Oct 2024 14:11:06 -0700 Subject: [PATCH 3/3] npe --- .../java/io/iworkflow/workflow/update/CounterWorkflow.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/iworkflow/workflow/update/CounterWorkflow.java b/src/main/java/io/iworkflow/workflow/update/CounterWorkflow.java index 80b7c9b..bef47c8 100644 --- a/src/main/java/io/iworkflow/workflow/update/CounterWorkflow.java +++ b/src/main/java/io/iworkflow/workflow/update/CounterWorkflow.java @@ -39,7 +39,10 @@ public List getPersistenceSchema() { dataAttributesLockingKeys = {DA_COUNT} ) public int inc(Context context, Persistence persistence, Communication communication) { - int count = persistence.getDataAttribute(DA_COUNT, Integer.class); + Integer count = persistence.getDataAttribute(DA_COUNT, Integer.class); + if (count == null) { + count = 0; + } count++; persistence.setDataAttribute(DA_COUNT, count); return count;