diff --git a/examples/bulk_sms.go b/examples/bulk_sms.go new file mode 100644 index 0000000..3577e83 --- /dev/null +++ b/examples/bulk_sms.go @@ -0,0 +1,2092 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "math/rand" + "mime/multipart" + "reflect" + "regexp" + "strings" + "time" + + "github.com/gofiber/fiber/v2/middleware/session" + "github.com/oarkflow/mq/dag" + "github.com/oarkflow/mq/utils" + + "github.com/oarkflow/jet" + + "github.com/oarkflow/mq" + "github.com/oarkflow/mq/consts" +) + +func main() { + flow := dag.NewDAG("Bulk SMS Sender", "bulk-sms-sender", func(taskID string, result mq.Result) { + fmt.Printf("Bulk SMS workflow completed for task %s: %s\n", taskID, string(utils.RemoveRecursiveFromJSON(result.Payload, "html_content"))) + }, mq.WithSyncMode(true)) + + // Add Bulk SMS workflow nodes + flow.AddDAGNode(dag.Page, "Login", "login", loginSubDAG().Clone(), true) + flow.AddNode(dag.Page, "Bulk SMS Form", "BulkSMSForm", &BulkSMSFormNode{}) + flow.AddNode(dag.Function, "Parse Bulk Data", "ParseBulkData", &ParseBulkDataNode{}) + flow.AddNode(dag.Function, "Validate Bulk Data", "ValidateBulkData", &ValidateBulkDataNode{}) + flow.AddNode(dag.Function, "Send Bulk SMS", "SendBulkSMS", &SendBulkSMSNode{}) + flow.AddNode(dag.Page, "Bulk SMS Result", "BulkSMSResult", &BulkSMSResultNode{}) + flow.AddNode(dag.Page, "Bulk SMS Error", "BulkSMSError", &BulkSMSErrorNode{}) + + // Define edges for Bulk SMS workflow + flow.AddEdge(dag.Simple, "Login to Form", "login", "BulkSMSForm") + flow.AddEdge(dag.Simple, "Form to Parse", "BulkSMSForm", "ParseBulkData") + flow.AddEdge(dag.Simple, "Parse to Validate", "ParseBulkData", "ValidateBulkData") + flow.AddCondition("ValidateBulkData", map[string]string{"valid": "SendBulkSMS"}) + flow.AddCondition("SendBulkSMS", map[string]string{"sent": "BulkSMSResult", "failed": "BulkSMSError"}) + + // Start the flow + if flow.Error != nil { + panic(flow.Error) + } + + fmt.Println("Starting Bulk SMS DAG server on http://0.0.0.0:8084") + fmt.Println("Navigate to the URL to access the bulk SMS form") + flow.Start(context.Background(), "0.0.0.0:8084") +} + +// loginSubDAG creates a login sub-DAG with page for authentication +func loginSubDAG() *dag.DAG { + login := dag.NewDAG("Login Sub DAG", "login-sub-dag", func(taskID string, result mq.Result) { + fmt.Printf("Login Sub DAG Final result for task %s: %s\n", taskID, string(result.Payload)) + }, mq.WithSyncMode(true)) + + login. + AddNode(dag.Page, "Login Page", "login-page", &BulkLoginPage{}). + AddNode(dag.Function, "Verify Credentials", "verify-credentials", &BulkVerifyCredentials{}). + AddNode(dag.Function, "Generate Token", "generate-token", &BulkGenerateToken{}). + AddEdge(dag.Simple, "Login to Verify", "login-page", "verify-credentials"). + AddEdge(dag.Simple, "Verify to Token", "verify-credentials", "generate-token") + + return login +} + +// BulkSMSFormNode - Initial form to collect bulk SMS data +type BulkSMSFormNode struct { + dag.Operation +} + +func (s *BulkSMSFormNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var inputData map[string]any + if len(task.Payload) > 0 { + json.Unmarshal(task.Payload, &inputData) + } + if inputData == nil { + inputData = make(map[string]any) + } + + // Show the form (either initial load or with validation errors) + htmlTemplate := ` + + + + Bulk SMS Sender + + + +
+

📨 Bulk SMS Sender

+
+

Send SMS messages to multiple recipients using CSV or JSON data

+
+ + {{if validation_error}} +
+ ⚠️ Validation Error: {{validation_error}} +
+ {{end}} + +
+
+ + +
+ +
+ + +
+ Use placeholders like {first_name}, {last_name}, {phone} to personalize messages.
+ Example: "Hi {first_name}, welcome to our platform! Your phone: {phone}" +
+
+ +
+
+ + +
+ +
+
+ + +
+ +
+ + +
+ CSV format: first_name,last_name,phone (header required)
+ JSON format: [{"first_name": "John", "last_name": "Doe", "phone": "1234567890"}] +
+
+ +
+

📋 Sample Formats:

+
+ CSV: +
first_name,last_name,phone
+John,Doe,1234567890
+Jane,Smith,1987654321
+Bob,Johnson,1555123456
+
+ +
+
+ +
+
+ +
+ +
+ 📁 Click to select file or drag and drop
+ Supported formats: .csv, .json (Max 10MB) +
+
+
+
+ +
+

📋 Expected File Formats:

+ CSV File (recipients.csv): +
first_name,last_name,phone
+John,Doe,1234567890
+Jane,Smith,1987654321
+
+ JSON File (recipients.json): +
[
+  {"first_name": "John", "last_name": "Doe", "phone": "1234567890"},
+  {"first_name": "Jane", "last_name": "Smith", "phone": "1987654321"}
+]
+
+
+
+ + +
+
+ + + +` + + messageStr, _ := inputData["bulk_data"].(string) + dataFormat, _ := inputData["data_format"].(string) + messageTemplate, _ := inputData["message_template"].(string) + + parser := jet.NewWithMemory(jet.WithDelims("{{", "}}")) + rs, err := parser.ParseTemplate(htmlTemplate, map[string]any{ + "task_id": ctx.Value("task_id"), + "validation_error": inputData["validation_error"], + "bulk_data": messageStr, + "sender_name": inputData["sender_name"], + "message_template": messageTemplate, + "data_format": dataFormat, + "data_format_csv": dataFormat == "csv", + "data_format_json": dataFormat == "json", + }) + if err != nil { + return mq.Result{Error: err, Ctx: ctx} + } + + ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml) + data := map[string]any{ + "html_content": rs, + "step": "form", + } + bt, _ := json.Marshal(data) + return mq.Result{Payload: bt, Ctx: ctx} +} + +// ParseBulkDataNode - Parses CSV/JSON data from form submission +type ParseBulkDataNode struct { + dag.Operation +} + +func (p *ParseBulkDataNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var inputData map[string]any + // Try to parse as JSON first + if err := json.Unmarshal(task.Payload, &inputData); err != nil { + return mq.Result{ + Error: fmt.Errorf("ParseBulkData Error: %s", err.Error()), + Ctx: ctx, + } + } + + var recipients []map[string]any + var parseError string + + // Handle pasted data + bulkData, _ := inputData["bulk_data"].(string) + dataFormat, _ := inputData["data_format"].(string) + messageTemplate, _ := inputData["message_template"].(string) + + fmt.Printf("DEBUG ParseBulkDataNode: dataFormat='%s', bulkData length=%d, messageTemplate='%s'\n", dataFormat, len(bulkData), messageTemplate) + if len(bulkData) > 0 { + previewLen := 100 + if len(bulkData) < previewLen { + previewLen = len(bulkData) + } + fmt.Printf("DEBUG ParseBulkDataNode: bulkData preview: %s...\n", bulkData[:previewLen]) + } + + if bulkData == "" { + parseError = "No data provided" + } else { + var rawRecipients []map[string]any + if dataFormat == "json" { + // Try JSON first + rawRecipients, parseError = parseRecipientDataJSON(bulkData) + if parseError != "" { + // JSON parsing failed, try CSV as fallback + rawRecipients, parseError = parseRecipientDataCSV(bulkData) + if parseError == "" { + parseError = "Selected JSON format but data appears to be CSV. Parsed as CSV instead." + } + } + } else { + // Try CSV first + rawRecipients, parseError = parseRecipientDataCSV(bulkData) + if parseError != "" { + // CSV parsing failed, try JSON as fallback + rawRecipients, parseError = parseRecipientDataJSON(bulkData) + if parseError == "" { + parseError = "Selected CSV format but data appears to be JSON. Parsed as JSON instead." + } + } + } + + if parseError == "" && len(rawRecipients) > 0 { + // Now substitute placeholders in message template for each recipient + recipients, parseError = createMessagesFromTemplate(rawRecipients, messageTemplate) + } + } + + if bulkData == "" { + // Check if file was uploaded + if fileData, exists := inputData["file_upload"]; exists && fileData != nil { + // Handle file upload + fileContent, ok := fileData.(string) + fmt.Println("File content: ", fileContent, ok) + if !ok { + parseError = "Invalid file upload data" + } else { + // Determine file type from filename first, then try parsing + fileName, _ := inputData["file_name"].(string) + isJSONFile := strings.HasSuffix(strings.ToLower(fileName), ".json") + + if isJSONFile { + // File has .json extension, try JSON parsing + recipients, parseError = parseJSONData(fileContent) + if parseError != "" { + parseError = "File has .json extension but contains invalid JSON: " + parseError + } + } else { + // Try JSON first (for content that starts with [ or {), then CSV + trimmedContent := strings.TrimSpace(fileContent) + if strings.HasPrefix(trimmedContent, "[") || strings.HasPrefix(trimmedContent, "{") { + // Looks like JSON, try parsing as JSON + testRecipients, jsonError := parseJSONData(fileContent) + if jsonError == "" { + // Successfully parsed as JSON + recipients = testRecipients + parseError = "" + } else { + // JSON parsing failed, try CSV + recipients, parseError = parseCSVData(fileContent) + if parseError == "" { + parseError = "Content appears to be JSON but parsing failed (" + jsonError + "), successfully parsed as CSV instead" + } + } + } else { + // Doesn't look like JSON, try CSV + recipients, parseError = parseCSVData(fileContent) + } + } + } + } + } + + if parseError != "" { + inputData["validation_error"] = "Data parsing error: " + parseError + bt, _ := json.Marshal(inputData) + return mq.Result{ + Payload: bt, + Ctx: ctx, + ResetTo: "BulkSMSForm", + } + } + + // Store parsed recipients + inputData["recipients"] = recipients + inputData["total_recipients"] = len(recipients) + delete(inputData, "html_content") + + updatedPayload, _ := json.Marshal(inputData) + return mq.Result{Payload: updatedPayload, Ctx: ctx} +} + +// parseMultipartFormData parses multipart form data and returns form fields as map +func parseMultipartFormData(data []byte, boundary string) (map[string]any, error) { + reader := multipart.NewReader(bytes.NewReader(data), boundary) + formData := make(map[string]any) + + for { + part, err := reader.NextPart() + if err == io.EOF { + break + } + if err != nil { + return nil, err + } + + fieldName := part.FormName() + if fieldName == "" { + continue + } + + // Read the part data + partData, err := io.ReadAll(part) + if err != nil { + return nil, err + } + + // Handle file uploads vs regular form fields + filename := part.FileName() + if filename != "" { + // This is a file upload + formData[fieldName] = string(partData) + formData["file_name"] = filename + } else { + // This is a regular form field + formData[fieldName] = string(partData) + } + } + + return formData, nil +} + +// parseRecipientDataCSV parses CSV string and returns recipient data (without messages) +func parseRecipientDataCSV(csvData string) ([]map[string]any, string) { + lines := strings.Split(strings.TrimSpace(csvData), "\n") + if len(lines) < 2 { + return nil, "CSV must have at least a header row and one data row" + } + + // Parse header + header := strings.Split(lines[0], ",") + if len(header) < 1 { + return nil, "CSV header must contain at least a 'phone' column" + } + + // Find phone column index (required) + phoneIndex := -1 + for i, col := range header { + col = strings.TrimSpace(strings.ToLower(col)) + if col == "phone" || col == "phonenumber" || col == "number" || col == "mobile" || col == "cell" { + phoneIndex = i + break + } + } + + if phoneIndex == -1 { + return nil, "CSV must have a 'phone' column" + } + + // Parse data rows + var recipients []map[string]any + for i, line := range lines[1:] { + line = strings.TrimSpace(line) + if line == "" { + continue + } + + // Simple CSV parsing + columns := strings.Split(line, ",") + if len(columns) <= phoneIndex { + return nil, fmt.Sprintf("Row %d: insufficient columns", i+2) + } + + recipient := make(map[string]any) + // Add all columns with their header names + for j, col := range columns { + if j < len(header) { + headerName := strings.TrimSpace(header[j]) + recipient[strings.ToLower(headerName)] = strings.TrimSpace(col) + } + } + + // Ensure phone is present + if phoneVal, exists := recipient["phone"]; !exists || phoneVal == "" { + return nil, fmt.Sprintf("Row %d: missing phone number", i+2) + } + + recipients = append(recipients, recipient) + } + + if len(recipients) == 0 { + return nil, "No valid recipients found in CSV data" + } + + return recipients, "" +} + +// parseRecipientDataJSON parses JSON string and returns recipient data (without messages) +func parseRecipientDataJSON(jsonData string) ([]map[string]any, string) { + // Clean phone numbers in the JSON string + jsonData = cleanPhoneNumbers(jsonData) + + var data any + if err := json.Unmarshal([]byte(jsonData), &data); err != nil { + return nil, "Invalid JSON: " + err.Error() + } + + var recipients []map[string]any + + switch v := data.(type) { + case []any: + // Array of objects + for i, item := range v { + if obj, ok := item.(map[string]any); ok { + recipient := make(map[string]any) + for k, v := range obj { + recipient[strings.ToLower(k)] = v + } + // Ensure phone is present + if phoneVal, exists := recipient["phone"]; !exists || phoneVal == "" { + return nil, fmt.Sprintf("Item %d: missing phone number", i+1) + } + recipients = append(recipients, recipient) + } else { + return nil, fmt.Sprintf("Item %d: not an object", i+1) + } + } + case map[string]any: + // Single object + recipient := make(map[string]any) + for k, v := range v { + recipient[strings.ToLower(k)] = v + } + // Ensure phone is present + if phoneVal, exists := recipient["phone"]; !exists || phoneVal == "" { + return nil, "Missing phone number" + } + recipients = append(recipients, recipient) + default: + return nil, "JSON must be an object or array of objects" + } + + if len(recipients) == 0 { + return nil, "No valid recipients found in JSON data" + } + + return recipients, "" +} + +// createMessagesFromTemplate creates personalized messages by substituting placeholders +func createMessagesFromTemplate(recipients []map[string]any, template string) ([]map[string]any, string) { + if template == "" { + return nil, "Message template is required" + } + + var result []map[string]any + + for _, recipient := range recipients { + message := template + + // Substitute all placeholders in the template + for key, value := range recipient { + if strValue, ok := value.(string); ok { + placeholder := "{" + key + "}" + message = strings.ReplaceAll(message, placeholder, strValue) + } + } + + // Create final recipient with phone and personalized message + finalRecipient := map[string]any{ + "phone": recipient["phone"], + "message": message, + } + + // Copy other fields for reference + for k, v := range recipient { + if k != "phone" && k != "message" { + finalRecipient[k] = v + } + } + + result = append(result, finalRecipient) + } + + return result, "" +} + +// parseJSONData parses JSON data and returns recipients with phone and message +func parseJSONData(jsonData string) ([]map[string]any, string) { + // Clean phone numbers in the JSON string + jsonData = cleanPhoneNumbers(jsonData) + + var data any + if err := json.Unmarshal([]byte(jsonData), &data); err != nil { + return nil, "Invalid JSON: " + err.Error() + } + + var recipients []map[string]any + + switch v := data.(type) { + case []any: + // Array of objects + for i, item := range v { + if obj, ok := item.(map[string]any); ok { + recipient := extractRecipientFromJSON(obj) + if recipient == nil { + return nil, fmt.Sprintf("Item %d: missing required phone or message field", i+1) + } + recipients = append(recipients, recipient) + } else { + return nil, fmt.Sprintf("Item %d: not an object", i+1) + } + } + case map[string]any: + // Single object + recipient := extractRecipientFromJSON(v) + if recipient == nil { + return nil, "Missing required phone or message field" + } + recipients = append(recipients, recipient) + default: + return nil, "JSON must be an object or array of objects" + } + + if len(recipients) == 0 { + return nil, "No valid recipients found in JSON data" + } + + return recipients, "" +} + +// parseCSVData parses CSV data and returns recipients with phone and message +func parseCSVData(csvData string) ([]map[string]any, string) { + lines := strings.Split(strings.TrimSpace(csvData), "\n") + if len(lines) < 2 { + return nil, "CSV must have at least a header row and one data row" + } + + // Parse header + header := strings.Split(lines[0], ",") + if len(header) < 2 { + return nil, "CSV header must contain at least 'phone' and 'message' columns" + } + + // Find phone and message column indices (required) + phoneIndex := -1 + messageIndex := -1 + for i, col := range header { + col = strings.TrimSpace(strings.ToLower(col)) + if col == "phone" || col == "phonenumber" || col == "number" || col == "mobile" || col == "cell" { + phoneIndex = i + } else if col == "message" || col == "msg" || col == "text" || col == "sms" || col == "content" || col == "body" { + messageIndex = i + } + } + + if phoneIndex == -1 { + return nil, "CSV must have a 'phone' column" + } + if messageIndex == -1 { + return nil, "CSV must have a 'message' column" + } + + // Parse data rows + var recipients []map[string]any + for i, line := range lines[1:] { + line = strings.TrimSpace(line) + if line == "" { + continue + } + + // Simple CSV parsing + columns := strings.Split(line, ",") + if len(columns) <= max(phoneIndex, messageIndex) { + return nil, fmt.Sprintf("Row %d: insufficient columns", i+2) + } + + phone := strings.TrimSpace(columns[phoneIndex]) + message := strings.TrimSpace(columns[messageIndex]) + + if phone == "" { + return nil, fmt.Sprintf("Row %d: missing phone number", i+2) + } + if message == "" { + return nil, fmt.Sprintf("Row %d: missing message", i+2) + } + + recipient := map[string]any{ + "phone": phone, + "message": message, + } + + recipients = append(recipients, recipient) + } + + if len(recipients) == 0 { + return nil, "No valid recipients found in CSV data" + } + + return recipients, "" +} + +// cleanPhoneNumbers removes dashes and spaces from phone numbers in JSON strings +func cleanPhoneNumbers(jsonStr string) string { + // Simple regex to remove dashes and spaces from phone number patterns + re := regexp.MustCompile(`"phone"\s*:\s*"([^"]*)"`) + return re.ReplaceAllStringFunc(jsonStr, func(match string) string { + // Extract the phone value + phoneMatch := re.FindStringSubmatch(match) + if len(phoneMatch) > 1 { + phone := phoneMatch[1] + // Remove dashes and spaces + cleaned := strings.ReplaceAll(phone, "-", "") + cleaned = strings.ReplaceAll(cleaned, " ", "") + return `"phone":"` + cleaned + `"` + } + return match + }) +} + +// extractRecipientFromJSON extracts phone and message from a JSON object +func extractRecipientFromJSON(obj map[string]any) map[string]any { + var phone, message string + + // Try different possible field names for phone + phoneFields := []string{"phone", "phonenumber", "number", "mobile", "cell"} + for _, field := range phoneFields { + if val, exists := obj[field]; exists { + if str, ok := val.(string); ok { + phone = str + break + } + } + } + + // Try different possible field names for message + messageFields := []string{"message", "msg", "text", "sms", "content", "body"} + for _, field := range messageFields { + if val, exists := obj[field]; exists { + if str, ok := val.(string); ok { + message = str + break + } + } + } + + if phone == "" || message == "" { + return nil + } + + return map[string]any{ + "phone": phone, + "message": message, + } +} + +// Helper function to get maximum of two integers +func max(a, b int) int { + if a > b { + return a + } + return b +} + +// formatPhoneForDisplay formats phone number for display +func formatPhoneForDisplay(phone string) string { + if len(phone) == 11 && strings.HasPrefix(phone, "1") { + // Format as +1 (XXX) XXX-XXXX + return fmt.Sprintf("+1 (%s) %s-%s", + phone[1:4], + phone[4:7], + phone[7:11]) + } + return phone +} + +// ValidateBulkDataNode - Validates parsed bulk SMS data +type ValidateBulkDataNode struct { + dag.Operation +} + +func (v *ValidateBulkDataNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var inputData map[string]any + if err := json.Unmarshal(task.Payload, &inputData); err != nil { + return mq.Result{ + Error: fmt.Errorf("ValidateBulkData Error: %s", err.Error()), + Ctx: ctx, + } + } + fmt.Println(inputData["recipients"], reflect.TypeOf(inputData["recipients"])) + recipientsInterface, ok := inputData["recipients"].([]interface{}) + if !ok || len(recipientsInterface) == 0 { + inputData["validation_error"] = "No recipients found to validate" + bt, _ := json.Marshal(inputData) + return mq.Result{ + Payload: bt, + Ctx: ctx, + ResetTo: "BulkSMSForm", + } + } + + // Convert []interface{} to []map[string]any + recipients := make([]map[string]any, 0, len(recipientsInterface)) + for _, r := range recipientsInterface { + if rec, ok := r.(map[string]any); ok { + recipients = append(recipients, rec) + } else { + inputData["validation_error"] = fmt.Sprintf("Invalid recipient format: expected map[string]any, got %T", r) + bt, _ := json.Marshal(inputData) + return mq.Result{ + Payload: bt, + Ctx: ctx, + ResetTo: "BulkSMSForm", + } + } + } + + // Validate each recipient + validRecipients := []map[string]any{} + validationErrors := []string{} + + for i, recipient := range recipients { + phone, phoneOk := recipient["phone"].(string) + message, messageOk := recipient["message"].(string) + + if !phoneOk || phone == "" { + validationErrors = append(validationErrors, fmt.Sprintf("Row %d: Missing phone number", i+1)) + continue + } + + if !messageOk || message == "" { + validationErrors = append(validationErrors, fmt.Sprintf("Row %d: Missing message", i+1)) + continue + } + + // Validate phone number format + cleanPhone := regexp.MustCompile(`\D`).ReplaceAllString(phone, "") + if len(cleanPhone) == 10 { + cleanPhone = "1" + cleanPhone + } else if len(cleanPhone) != 11 || !strings.HasPrefix(cleanPhone, "1") { + validationErrors = append(validationErrors, fmt.Sprintf("Row %d: Invalid phone number format: %s", i+1, phone)) + continue + } + + // Validate message length + if len(message) > 160 { + validationErrors = append(validationErrors, fmt.Sprintf("Row %d: Message too long (%d chars, max 160)", i+1, len(message))) + continue + } + + // Check for prohibited content + forbiddenWords := []string{"spam", "scam", "fraud", "hack"} + messageLower := strings.ToLower(message) + hasForbidden := false + for _, word := range forbiddenWords { + if strings.Contains(messageLower, word) { + validationErrors = append(validationErrors, fmt.Sprintf("Row %d: Message contains prohibited content", i+1)) + hasForbidden = true + break + } + } + if hasForbidden { + continue + } + + // Add validated recipient + validRecipient := map[string]any{ + "phone": cleanPhone, + "message": message, + "formatted_phone": formatPhoneForDisplay(cleanPhone), + "char_count": len(message), + "original_index": i + 1, + } + validRecipients = append(validRecipients, validRecipient) + } + + if len(validationErrors) > 0 { + inputData["validation_error"] = "Validation failed:\n" + strings.Join(validationErrors, "\n") + bt, _ := json.Marshal(inputData) + return mq.Result{ + Payload: bt, + Ctx: ctx, + ResetTo: "BulkSMSForm", + } + } + + if len(validRecipients) == 0 { + inputData["validation_error"] = "No valid recipients found after validation" + bt, _ := json.Marshal(inputData) + return mq.Result{ + Payload: bt, + Ctx: ctx, + ResetTo: "BulkSMSForm", + } + } + + // Store validated data + inputData["valid_recipients"] = validRecipients + inputData["valid_count"] = len(validRecipients) + inputData["validation_status"] = "success" + inputData["validated_at"] = time.Now().Format("2006-01-02 15:04:05") + + bt, _ := json.Marshal(inputData) + return mq.Result{Payload: bt, Ctx: ctx, ConditionStatus: "valid"} +} + +// SendBulkSMSNode - Sends SMS to multiple recipients +type SendBulkSMSNode struct { + dag.Operation +} + +func (s *SendBulkSMSNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var inputData map[string]any + if err := json.Unmarshal(task.Payload, &inputData); err != nil { + return mq.Result{ + Error: fmt.Errorf("SendBulkSMS Error: %s", err.Error()), + Ctx: ctx, + } + } + + validRecipientsInterface, ok := inputData["valid_recipients"].([]interface{}) + if !ok || len(validRecipientsInterface) == 0 { + return mq.Result{ + Error: fmt.Errorf("no valid recipients found"), + Ctx: ctx, + } + } + + // Convert []interface{} to []map[string]any + validRecipients := make([]map[string]any, 0, len(validRecipientsInterface)) + for _, r := range validRecipientsInterface { + if rec, ok := r.(map[string]any); ok { + validRecipients = append(validRecipients, rec) + } else { + return mq.Result{ + Error: fmt.Errorf("invalid recipient format: expected map[string]any, got %T", r), + Ctx: ctx, + } + } + } + + senderName, _ := inputData["sender_name"].(string) + totalRecipients := len(validRecipients) + + // Send SMS to each recipient + results := []map[string]any{} + successCount := 0 + failureCount := 0 + + fmt.Printf("📨 Starting bulk SMS send to %d recipients...\n", totalRecipients) + + for i, recipient := range validRecipients { + phone, _ := recipient["phone"].(string) + message, _ := recipient["message"].(string) + + // Generate mock SMS ID + timestamp := time.Now() + smsID := fmt.Sprintf("BULK_SMS_%d_%s_%d", timestamp.Unix(), phone[len(phone)-4:], i+1) + + // Simulate occasional failures (90% success rate) + success := RandomBool() + + result := map[string]any{ + "phone": phone, + "formatted_phone": recipient["formatted_phone"], + "message": message, + "sender_name": senderName, + "char_count": recipient["char_count"], + "original_index": recipient["original_index"], + "sms_id": smsID, + "sent_at": timestamp.Format("2006-01-02 15:04:05"), + "delivery_estimate": "1-2 minutes", + "cost_estimate": "$0.02", + "gateway": "MockSMS Gateway", + } + + if success { + result["status"] = "sent" + successCount++ + fmt.Printf("✅ SMS sent successfully! ID: %s, Phone: %s\n", smsID, formatPhoneForDisplay(phone)) + } else { + result["status"] = "failed" + result["error_message"] = "SMS gateway temporarily unavailable" + failureCount++ + fmt.Printf("❌ SMS failed! Phone: %s, Error: gateway unavailable\n", formatPhoneForDisplay(phone)) + } + + results = append(results, result) + } + + // Calculate statistics + totalCost := float64(successCount) * 0.02 + averageCharCount := 0 + totalChars := 0 + + for _, result := range results { + if charCount, ok := result["char_count"].(int); ok { + totalChars += charCount + } + } + if len(results) > 0 { + averageCharCount = totalChars / len(results) + } + + // Prepare final result data + bulkResult := map[string]any{ + "sender_name": senderName, + "total_recipients": totalRecipients, + "success_count": successCount, + "failure_count": failureCount, + "success_rate": fmt.Sprintf("%.1f%%", float64(successCount)/float64(totalRecipients)*100), + "total_cost": fmt.Sprintf("$%.2f", totalCost), + "average_char_count": averageCharCount, + "total_char_count": totalChars, + "bulk_send_id": fmt.Sprintf("BULK_%d", time.Now().Unix()), + "completed_at": time.Now().Format("2006-01-02 15:04:05"), + "results": results, + } + + if failureCount > 0 { + bulkResult["partial_success"] = true + bulkResult["error_summary"] = fmt.Sprintf("%d of %d SMS messages failed to send", failureCount, totalRecipients) + } else { + bulkResult["full_success"] = true + } + + fmt.Printf("📊 Bulk SMS campaign completed: %d sent, %d failed, Total cost: $%.2f\n", + successCount, failureCount, totalCost) + + bt, _ := json.Marshal(bulkResult) + + if failureCount > 0 { + return mq.Result{ + Payload: bt, + Ctx: ctx, + ConditionStatus: "failed", + } + } + + return mq.Result{ + Payload: bt, + Ctx: ctx, + ConditionStatus: "sent", + } +} + +// RandomBool returns true or false randomly +func RandomBool() bool { + rand.Seed(time.Now().UnixNano()) + return rand.Intn(2) == 1 +} + +// BulkSMSResultNode - Shows successful bulk SMS result +type BulkSMSResultNode struct { + dag.Operation +} + +func (r *BulkSMSResultNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var inputData map[string]any + if len(task.Payload) > 0 { + if err := json.Unmarshal(task.Payload, &inputData); err != nil { + return mq.Result{Error: err, Ctx: ctx} + } + } else { + inputData = make(map[string]any) + } + + htmlTemplate := ` + + + + Bulk SMS Results + + + +
+

📊 Bulk SMS Campaign Results

+ + {{if full_success}} +
+

✅ Campaign Completed Successfully!

+

All SMS messages were sent successfully.

+
+ {{end}} + + {{if partial_success}} +
+

⚠️ Partial Success

+

{{error_summary}}

+
+ {{end}} + +
+
+
{{total_recipients}}
+
Total Recipients
+
+
+
{{success_count}}
+
Successfully Sent
+
+
+
{{failure_count}}
+
Failed
+
+
+
{{success_rate}}
+
Success Rate
+
+
+
{{total_cost}}
+
Total Cost
+
+
+
{{average_char_count}}
+
Avg Characters
+
+
+ +
+ Campaign Details:
+ Bulk Send ID: {{bulk_send_id}}
+ Completed At: {{completed_at}}
+ {{if sender_name}}Sender: {{sender_name}}
{{end}} + Gateway: MockSMS Gateway +
+ +

📋 Detailed Results

+ + + + + + + + + + + + + + {{range $index, $result := results}} + + + + + + + + + + {{end}} + +
#Phone NumberStatusSMS IDSent AtCharactersCost
{{$result.original_index}}{{$result.formatted_phone}} + {{if eq $result.status "sent"}}✅ Sent{{else}}❌ Failed{{end}} + {{$result.sms_id}}{{$result.sent_at}}{{$result.char_count}}{{if eq $result.status "sent"}}$0.02{{else}}-$0.00{{end}}
+ +
+ 📱 Send Another Bulk SMS + 📊 View Metrics + +
+
+ + + +` + + parser := jet.NewWithMemory(jet.WithDelims("{{", "}}")) + + // Convert results to JSON for JavaScript + resultsJSON, _ := json.Marshal(inputData["results"]) + inputData["results_json"] = string(resultsJSON) + + rs, err := parser.ParseTemplate(htmlTemplate, inputData) + if err != nil { + return mq.Result{Error: err, Ctx: ctx} + } + + ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml) + finalData := map[string]any{ + "html_content": rs, + "result": inputData, + "step": "success", + } + bt, _ := json.Marshal(finalData) + return mq.Result{Payload: bt, Ctx: ctx} +} + +// BulkSMSErrorNode - Shows bulk SMS errors +type BulkSMSErrorNode struct { + dag.Operation +} + +func (e *BulkSMSErrorNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var inputData map[string]any + if err := json.Unmarshal(task.Payload, &inputData); err != nil { + return mq.Result{Error: err, Ctx: ctx} + } + + // Determine error type and message + errorMessage, _ := inputData["error_message"].(string) + if errorMessage == "" { + errorMessage = "An unknown error occurred during bulk SMS processing" + } + + htmlTemplate := ` + + + + Bulk SMS Error + + + +
+
+

Bulk SMS Error

+ +
+ {{error_message}} +
+ + {{if partial_success}} +
+ ⚠️ Partial Success: Some SMS messages were sent successfully, but others failed.
+ Successful: {{success_count}} | Failed: {{failure_count}} +
+ {{end}} + +
+ 🔄 Try Again + 📊 Check Status +
+ +
+ DAG Error Handler | Bulk SMS Workflow Failed +
+
+ +` + + parser := jet.NewWithMemory(jet.WithDelims("{{", "}}")) + templateData := map[string]any{ + "error_message": errorMessage, + "partial_success": inputData["partial_success"], + "success_count": inputData["success_count"], + "failure_count": inputData["failure_count"], + } + + rs, err := parser.ParseTemplate(htmlTemplate, templateData) + if err != nil { + return mq.Result{Error: err, Ctx: ctx} + } + + ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml) + finalData := map[string]any{ + "html_content": rs, + "error_data": inputData, + "step": "error", + } + bt, _ := json.Marshal(finalData) + return mq.Result{Payload: bt, Ctx: ctx} +} + +type BulkLoginPage struct { + dag.Operation +} + +func (p *BulkLoginPage) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + // Check if user is already authenticated via session + if sess, ok := ctx.Value("session").(*session.Session); ok { + if authenticated, exists := sess.Get("authenticated").(bool); exists && authenticated { + // User is already authenticated, show auto-submit page + htmlContent := ` + + + + + + Phone Processing System - Already Authenticated + + + +
+
+
+

Already Authenticated

+

Welcome back!

+
+
+

Username: ` + fmt.Sprintf("%v", sess.Get("username")) + `

+

Role: ` + fmt.Sprintf("%v", sess.Get("user_role")) + `

+
+
+ Redirecting in 1 second... +
+
+ + + + + +` + + ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml) + resultData := map[string]any{ + "html_content": htmlContent, + "step": "authenticated", + } + resultPayload, _ := json.Marshal(resultData) + return mq.Result{ + Payload: resultPayload, + Ctx: ctx, + } + } + } + + // Check if this is a form submission + var inputData map[string]any + if len(task.Payload) > 0 { + if err := json.Unmarshal(task.Payload, &inputData); err == nil { + // Check if we have form data (username/password) + if formData, ok := inputData["form"].(map[string]any); ok { + // This is a form submission, pass it through for verification + credentials := map[string]any{ + "username": formData["username"], + "password": formData["password"], + } + inputData["credentials"] = credentials + updatedPayload, _ := json.Marshal(inputData) + return mq.Result{Payload: updatedPayload, Ctx: ctx} + } + } + } + + // Otherwise, show the form + var data map[string]any + if err := json.Unmarshal(task.Payload, &data); err != nil { + data = make(map[string]any) + } + + // HTML content for login page + htmlContent := ` + + + + + + Phone Processing System - Login + + + +
+
+

📱 Phone Processing System

+

Please login to continue

+
+ {{if error}} +
+ ❌ Login Failed: {{error}} +
+ {{end}} +
+
+ + +
+
+ + +
+ +
+
+
+ + + +` + + parser := jet.NewWithMemory(jet.WithDelims("{{", "}}")) + rs, err := parser.ParseTemplate(htmlContent, map[string]any{ + "task_id": ctx.Value("task_id"), + "error": data["error"], + "username": data["username"], + }) + if err != nil { + return mq.Result{Error: err, Ctx: ctx} + } + + ctx = context.WithValue(ctx, consts.ContentType, consts.TypeHtml) + resultData := map[string]any{ + "html_content": rs, + "step": "login", + "data": data, + } + + resultPayload, _ := json.Marshal(resultData) + return mq.Result{ + Payload: resultPayload, + Ctx: ctx, + } +} + +type BulkVerifyCredentials struct { + dag.Operation +} + +func (p *BulkVerifyCredentials) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]any + if err := json.Unmarshal(task.Payload, &data); err != nil { + return mq.Result{Error: fmt.Errorf("VerifyCredentials Error: %s", err.Error()), Ctx: ctx} + } + + // Check if user is already authenticated via session + if sess, ok := ctx.Value("session").(*session.Session); ok { + if authenticated, exists := sess.Get("authenticated").(bool); exists && authenticated { + // User is already authenticated, restore session data + data["authenticated"] = true + data["username"] = sess.Get("username") + data["user_role"] = sess.Get("user_role") + data["auth_token"] = sess.Get("auth_token") + delete(data, "html_content") + updatedPayload, _ := json.Marshal(data) + return mq.Result{Payload: updatedPayload, Ctx: ctx} + } + } + + username, _ := data["username"].(string) + password, _ := data["password"].(string) + + // Simple verification logic + if username == "admin" && password == "password123" { + data["authenticated"] = true + data["user_role"] = "administrator" + } else { + data["authenticated"] = false + data["error"] = "Invalid credentials" + data["validation_error"] = "Phone number is required" + data["error_field"] = "phone" + bt, _ := json.Marshal(data) + return mq.Result{ + Payload: bt, + Ctx: ctx, + ResetTo: "back", // Reset to form instead of going to error page + } + } + delete(data, "html_content") + updatedPayload, _ := json.Marshal(data) + return mq.Result{Payload: updatedPayload, Ctx: ctx} +} + +type BulkGenerateToken struct { + dag.Operation +} + +func (p *BulkGenerateToken) ProcessTask(ctx context.Context, task *mq.Task) mq.Result { + var data map[string]any + if err := json.Unmarshal(task.Payload, &data); err != nil { + return mq.Result{Error: fmt.Errorf("GenerateToken Error: %s", err.Error()), Ctx: ctx} + } + + if authenticated, ok := data["authenticated"].(bool); ok && authenticated { + data["auth_token"] = "jwt_token_123456789" + data["token_expires"] = "2025-09-19T13:00:00Z" + data["login_success"] = true + + // Store authentication in session + if sess, ok := ctx.Value("session").(*session.Session); ok { + sess.Set("authenticated", true) + sess.Set("username", data["username"]) + sess.Set("user_role", data["user_role"]) + sess.Set("auth_token", data["auth_token"]) + sess.Save() + } + } + + delete(data, "html_content") + updatedPayload, _ := json.Marshal(data) + return mq.Result{ + Payload: updatedPayload, + Ctx: ctx, + Status: mq.Completed, + } +} diff --git a/examples/form.go b/examples/form.go index 5269f34..9c06245 100644 --- a/examples/form.go +++ b/examples/form.go @@ -27,7 +27,7 @@ func main() { // Add SMS workflow nodes // Note: Page nodes have no timeout by default, allowing users unlimited time for form input - flow.AddDAGNode(dag.Page, "Login", "login", loginSubDAG().Clone(), true) + flow.AddDAGNode(dag.Page, "Login", "login", loginSubDAGSingle().Clone(), true) flow.AddNode(dag.Page, "SMS Form", "SMSForm", &SMSFormNode{}) flow.AddNode(dag.Function, "Validate Input", "ValidateInput", &ValidateInputNode{}) flow.AddNode(dag.Function, "Send SMS", "SendSMS", &SendSMSNode{}) @@ -50,8 +50,8 @@ func main() { flow.Start(context.Background(), "0.0.0.0:8083") } -// loginSubDAG creates a login sub-DAG with page for authentication -func loginSubDAG() *dag.DAG { +// loginSubDAGSingle creates a login sub-DAG with page for authentication +func loginSubDAGSingle() *dag.DAG { login := dag.NewDAG("Login Sub DAG", "login-sub-dag", func(taskID string, result mq.Result) { fmt.Printf("Login Sub DAG Final result for task %s: %s\n", taskID, string(result.Payload)) }, mq.WithSyncMode(true)) @@ -751,7 +751,7 @@ func (v *ValidateInputNode) ProcessTask(ctx context.Context, task *mq.Task) mq.R "sender_name": senderName, "validated_at": time.Now().Format("2006-01-02 15:04:05"), "validation_status": "success", - "formatted_phone": formatPhoneForDisplay(cleanPhone), + "formatted_phone": formatPhoneForDisplaySingle(cleanPhone), "char_count": len(message), } @@ -804,7 +804,7 @@ func (s *SendSMSNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Result resultData := map[string]any{ "phone": phone, - "formatted_phone": formatPhoneForDisplay(phone), + "formatted_phone": formatPhoneForDisplaySingle(phone), "message": message, "sender_name": senderName, "sms_status": "sent", @@ -816,7 +816,7 @@ func (s *SendSMSNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Result "char_count": len(message), } - fmt.Printf("📱 SMS sent successfully! ID: %s, Phone: %s\n", smsID, formatPhoneForDisplay(phone)) + fmt.Printf("📱 SMS sent successfully! ID: %s, Phone: %s\n", smsID, formatPhoneForDisplaySingle(phone)) bt, _ := json.Marshal(resultData) return mq.Result{Payload: bt, Ctx: ctx, ConditionStatus: "sent"} @@ -1151,7 +1151,7 @@ func (e *ErrorPageNode) ProcessTask(ctx context.Context, task *mq.Task) mq.Resul } // Helper function to format phone number for display -func formatPhoneForDisplay(phone string) string { +func formatPhoneForDisplaySingle(phone string) string { if len(phone) == 11 && strings.HasPrefix(phone, "1") { // Format as +1 (XXX) XXX-XXXX return fmt.Sprintf("+1 (%s) %s-%s", diff --git a/go.mod b/go.mod index a5f4bde..e371faa 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/oarkflow/dipper v0.0.6 github.com/oarkflow/errors v0.0.6 github.com/oarkflow/expr v0.0.11 - github.com/oarkflow/form v0.0.0-20241203111156-b1be5636af43 + github.com/oarkflow/form v0.0.1 github.com/oarkflow/jet v0.0.4 github.com/oarkflow/json v0.0.28 github.com/oarkflow/jsonschema v0.0.4 diff --git a/go.sum b/go.sum index ac74d15..9fd1dad 100644 --- a/go.sum +++ b/go.sum @@ -41,16 +41,14 @@ github.com/oarkflow/errors v0.0.6 h1:qTBzVblrX6bFbqYLfatsrZHMBPchOZiIE3pfVzh1+k8 github.com/oarkflow/errors v0.0.6/go.mod h1:UETn0Q55PJ+YUbpR4QImIoBavd6QvJtyW/oeTT7ghZM= github.com/oarkflow/expr v0.0.11 h1:H6h+dIUlU+xDlijMXKQCh7TdE6MGVoFPpZU7q/dziRI= github.com/oarkflow/expr v0.0.11/go.mod h1:WgMZqP44h7SBwKyuGZwC15vj46lHtI0/QpKdEZpRVE4= -github.com/oarkflow/form v0.0.0-20241203111156-b1be5636af43 h1:AjNCAnpzDi6BYVUfXUUuIdWruRu4npSSTrR3eZ6Vppw= -github.com/oarkflow/form v0.0.0-20241203111156-b1be5636af43/go.mod h1:fYwqhq8Sig9y0cmgO6q6WN8SP/rrsi7h2Yyk+Ufrne8= +github.com/oarkflow/form v0.0.1 h1:N1eZg/OT9islr2M9Tb1sk3D/sShjl1m1gEaZEX+Bxe4= +github.com/oarkflow/form v0.0.1/go.mod h1:fYwqhq8Sig9y0cmgO6q6WN8SP/rrsi7h2Yyk+Ufrne8= github.com/oarkflow/jet v0.0.4 h1:rs0nTzodye/9zhrSX7FlR80Gjaty6ei2Ln0pmaUrdwg= github.com/oarkflow/jet v0.0.4/go.mod h1:YXIc47aYyx1xKpnmuz1Z9o88cxxa47r7X3lfUAxZ0Qg= github.com/oarkflow/json v0.0.28 h1:pCt7yezRDJeSdSu2OZ6Aai0F4J9qCwmPWRsCmfaH8Ds= github.com/oarkflow/json v0.0.28/go.mod h1:E6Mg4LoY1PHCntfAegZmECc6Ux24sBpXJAu2lwZUe74= github.com/oarkflow/jsonschema v0.0.4 h1:n5Sb7WVb7NNQzn/ei9++4VPqKXCPJhhsHeTGJkIuwmM= github.com/oarkflow/jsonschema v0.0.4/go.mod h1:AxNG3Nk7KZxnnjRJlHLmS1wE9brtARu5caTFuicCtnA= -github.com/oarkflow/log v1.0.83 h1:T/38wvjuNeVJ9PDo0wJDTnTUQZ5XeqlcvpbCItuFFJo= -github.com/oarkflow/log v1.0.83/go.mod h1:dMn57z9uq11Y264cx9c9Ac7ska9qM+EBhn4qf9CNlsM= github.com/oarkflow/log v1.0.84 h1:MdYV6NgSV9/38J6yLAPXICk1HHAZRwQGb1Bk9Dmxb30= github.com/oarkflow/log v1.0.84/go.mod h1:dMn57z9uq11Y264cx9c9Ac7ska9qM+EBhn4qf9CNlsM= github.com/oarkflow/squealx v0.0.56 h1:8rPx3jWNnt4ez2P10m1Lz4HTAbvrs0MZ7jjKDJ87Vqg= @@ -81,8 +79,6 @@ golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k= golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= -golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE= -golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= golang.org/x/time v0.13.0 h1:eUlYslOIt32DgYD6utsuUeHs4d7AsEYLuIAdg7FlYgI= golang.org/x/time v0.13.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=