Browse Source

feat: multiprocessing execution

Ahmad Kholid 3 years ago
parent
commit
d812344e78

+ 3 - 2
src/background/index.js

@@ -148,9 +148,10 @@ checkWorkflowStates();
 async function checkVisitWebTriggers(changeInfo, tab) {
   if (!changeInfo.status || changeInfo.status !== 'complete') return;
 
-  const tabIsUsed = await workflow.states.get(
-    ({ state }) => state.activeTab.id === tab.id
+  const tabIsUsed = await workflow.states.get(({ state }) =>
+    state.tabIds.includes(tab.id)
   );
+
   if (tabIsUsed) return;
 
   const visitWebTriggers = await storage.get('visitWebTriggers');

+ 8 - 6
src/background/workflow-engine/blocks-handler/handler-blocks-group.js

@@ -14,7 +14,9 @@ function blocksGroup({ data, outputs }, { prevBlockData }) {
     }
 
     const blocks = data.blocks.reduce((acc, block, index) => {
-      let nextBlock = data.blocks[index + 1]?.itemId;
+      let nextBlock = {
+        connections: [{ node: data.blocks[index + 1]?.itemId }],
+      };
 
       if (index === data.blocks.length - 1) {
         nextBlock = nextBlockId;
@@ -25,20 +27,20 @@ function blocksGroup({ data, outputs }, { prevBlockData }) {
         id: block.itemId,
         name: block.id,
         outputs: {
-          output_1: {
-            connections: [{ node: nextBlock }],
-          },
+          output_1: nextBlock,
         },
       };
 
       return acc;
     }, {});
 
-    Object.assign(this.blocks, blocks);
+    Object.assign(this.engine.blocks, blocks);
 
     resolve({
       data: prevBlockData,
-      nextBlockId: data.blocks[0].itemId,
+      nextBlockId: {
+        connections: [{ node: data.blocks[0].itemId }],
+      },
     });
   });
 }

+ 10 - 8
src/background/workflow-engine/blocks-handler/handler-delete-data.js

@@ -5,24 +5,26 @@ function deleteData({ data, outputs }) {
     data.deleteList.forEach((item) => {
       if (item.type === 'table') {
         if (item.columnId === '[all]') {
-          this.referenceData.table = [];
-          this.columns = { column: { index: 0, name: 'column', type: 'any' } };
+          this.engine.referenceData.table = [];
+          this.engine.columns = {
+            column: { index: 0, name: 'column', type: 'any' },
+          };
         } else {
-          const columnName = this.columns[item.columnId].name;
+          const columnName = this.engine.columns[item.columnId].name;
 
-          this.referenceData.table.forEach((_, index) => {
-            const row = this.referenceData.table[index];
+          this.engine.referenceData.table.forEach((_, index) => {
+            const row = this.engine.referenceData.table[index];
             delete row[columnName];
 
             if (!row || Object.keys(row).length === 0) {
-              this.referenceData.table[index] = {};
+              this.engine.referenceData.table[index] = {};
             }
           });
 
-          this.columns[item.columnId].index = 0;
+          this.engine.columns[item.columnId].index = 0;
         }
       } else if (item.variableName) {
-        delete this.referenceData.variables[item.variableName];
+        delete this.engine.referenceData.variables[item.variableName];
       }
     });
 

+ 7 - 7
src/background/workflow-engine/blocks-handler/handler-execute-workflow.js

@@ -57,8 +57,8 @@ async function executeWorkflow({ outputs, data }) {
           globalData: isWhitespace(data.globalData) ? null : data.globalData,
         },
         parentWorkflow: {
-          id: this.id,
-          name: this.workflow.name,
+          id: this.engine.id,
+          name: this.engine.workflow.name,
         },
       },
       events: {
@@ -70,7 +70,7 @@ async function executeWorkflow({ outputs, data }) {
             const { dataColumns, globalData, googleSheets, table } =
               engine.referenceData;
 
-            this.referenceData.workflow[data.executeId] = {
+            this.engine.referenceData.workflow[data.executeId] = {
               globalData,
               dataColumns,
               googleSheets,
@@ -79,12 +79,12 @@ async function executeWorkflow({ outputs, data }) {
           }
         },
       },
-      states: this.states,
-      logger: this.logger,
-      blocksHandler: this.blocksHandler,
+      states: this.engine.states,
+      logger: this.engine.logger,
+      blocksHandler: this.engine.blocksHandler,
     };
 
-    if (workflow.drawflow.includes(this.workflow.id)) {
+    if (workflow.drawflow.includes(this.engine.workflow.id)) {
       throw new Error('workflow-infinite-loop');
     }
 

+ 4 - 4
src/background/workflow-engine/blocks-handler/handler-export-data.js

@@ -2,17 +2,17 @@ import browser from 'webextension-polyfill';
 import { default as dataExporter, files } from '@/utils/data-exporter';
 import { getBlockConnection } from '../helper';
 
-async function exportData({ data, outputs }) {
+async function exportData({ data, outputs }, { refData }) {
   const nextBlockId = getBlockConnection({ outputs });
 
   try {
     const dataToExport = data.dataToExport || 'data-columns';
-    let payload = this.referenceData.table;
+    let payload = refData.table;
 
     if (dataToExport === 'google-sheets') {
-      payload = this.referenceData.googleSheets[data.refKey] || [];
+      payload = refData.googleSheets[data.refKey] || [];
     } else if (dataToExport === 'variable') {
-      payload = this.referenceData.variables[data.variableName] || [];
+      payload = refData.variables[data.variableName] || [];
 
       if (!Array.isArray(payload)) {
         payload = [payload];

+ 3 - 3
src/background/workflow-engine/blocks-handler/handler-google-sheets.js

@@ -60,7 +60,7 @@ async function updateSpreadsheetValues(
   }
 }
 
-export default async function ({ data, outputs }) {
+export default async function ({ data, outputs }, { refData }) {
   const nextBlockId = getBlockConnection({ outputs });
 
   try {
@@ -76,10 +76,10 @@ export default async function ({ data, outputs }) {
       result = spreadsheetValues;
 
       if (data.refKey && !isWhitespace(data.refKey)) {
-        this.referenceData.googleSheets[data.refKey] = spreadsheetValues;
+        refData.googleSheets[data.refKey] = spreadsheetValues;
       }
     } else if (data.type === 'update') {
-      result = await updateSpreadsheetValues(data, this.referenceData.table);
+      result = await updateSpreadsheetValues(data, refData.table);
     }
 
     return {

+ 3 - 3
src/background/workflow-engine/blocks-handler/handler-handle-dialog.js

@@ -4,7 +4,7 @@ function handleDialog({ data, outputs }) {
   const nextBlockId = getBlockConnection({ outputs });
 
   return new Promise((resolve, reject) => {
-    if (!this.workflow.settings.debugMode) {
+    if (!this.settings.debugMode) {
       const error = new Error('not-debug-mode');
       error.nextBlockId = nextBlockId;
 
@@ -18,8 +18,8 @@ function handleDialog({ data, outputs }) {
     };
 
     const methodName = 'Page.javascriptDialogOpening';
-    if (!this.eventListeners[methodName]) {
-      this.on(methodName, () => {
+    if (!this.engine.eventListeners[methodName]) {
+      this.engine.on(methodName, () => {
         sendDebugCommand(
           this.activeTab.id,
           'Page.handleJavaScriptDialog',

+ 1 - 1
src/background/workflow-engine/blocks-handler/handler-handle-download.js

@@ -71,7 +71,7 @@ function handleDownload({ data, outputs }) {
     };
 
     const handleChanged = ({ state, id, filename }) => {
-      if (this.isDestroyed || isResolved) {
+      if (this.engine.isDestroyed || isResolved) {
         browser.downloads.onChanged.removeListener(handleChanged);
         return;
       }

+ 1 - 1
src/background/workflow-engine/blocks-handler/handler-hover-element.js

@@ -6,7 +6,7 @@ export async function hoverElement(block) {
   try {
     if (!this.activeTab.id) throw new Error('no-tab');
 
-    const { debugMode, executedBlockOnWeb } = this.workflow.settings;
+    const { debugMode, executedBlockOnWeb } = this.settings;
 
     if (!debugMode) {
       await attachDebugger(this.activeTab.id);

+ 1 - 1
src/background/workflow-engine/blocks-handler/handler-interaction-block.js

@@ -39,7 +39,7 @@ async function interactionHandler(block) {
       (block.data.getValue && block.data.saveData)
     ) {
       const currentColumnType =
-        this.columns[block.data.dataColumn]?.type || 'any';
+        this.engine.columns[block.data.dataColumn]?.type || 'any';
       const insertDataToColumn = (value) => {
         this.addDataToColumn(block.data.dataColumn, value);
 

+ 1 - 1
src/background/workflow-engine/blocks-handler/handler-loop-breakpoint.js

@@ -24,7 +24,7 @@ function loopBreakpoint(block, { prevBlockData }) {
       });
     } else {
       delete this.loopList[block.data.loopId];
-      delete this.referenceData.loopData[block.data.loopId];
+      delete this.engine.referenceData.loopData[block.data.loopId];
 
       resolve({
         data: prevBlockData,

+ 9 - 10
src/background/workflow-engine/blocks-handler/handler-loop-data.js

@@ -1,7 +1,7 @@
 import { parseJSON } from '@/utils/helper';
 import { getBlockConnection } from '../helper';
 
-async function loopData({ data, id, outputs }) {
+async function loopData({ data, id, outputs }, { refData }) {
   const nextBlockId = getBlockConnection({ outputs });
 
   try {
@@ -13,25 +13,24 @@ async function loopData({ data, id, outputs }) {
       let currentLoopData;
 
       if (data.loopThrough === 'numbers') {
-        currentLoopData = this.referenceData.loopData[data.loopId].data + 1;
+        currentLoopData = refData.loopData[data.loopId].data + 1;
       } else {
         currentLoopData = this.loopList[data.loopId].data[index];
       }
 
-      this.referenceData.loopData[data.loopId] = {
+      refData.loopData[data.loopId] = {
         data: currentLoopData,
         $index: index,
       };
     } else {
       const getLoopData = {
         numbers: () => data.fromNumber,
-        table: () => this.referenceData.table,
+        table: () => refData.table,
         'custom-data': () => JSON.parse(data.loopData),
-        'data-columns': () => this.referenceData.table,
-        'google-sheets': () =>
-          this.referenceData.googleSheets[data.referenceKey],
+        'data-columns': () => refData.table,
+        'google-sheets': () => refData.googleSheets[data.referenceKey],
         variable: () => {
-          const variableVal = this.referenceData.variables[data.variableName];
+          const variableVal = refData.variables[data.variableName];
 
           return parseJSON(variableVal, variableVal);
         },
@@ -75,7 +74,7 @@ async function loopData({ data, id, outputs }) {
             : data.maxLoop || currLoopData.length,
       };
       /* eslint-disable-next-line */
-      this.referenceData.loopData[data.loopId] = {
+      refData.loopData[data.loopId] = {
         data:
           data.loopThrough === 'numbers'
             ? data.fromNumber
@@ -88,7 +87,7 @@ async function loopData({ data, id, outputs }) {
 
     return {
       nextBlockId,
-      data: this.referenceData.loopData[data.loopId],
+      data: refData.loopData[data.loopId],
     };
   } catch (error) {
     error.nextBlockId = nextBlockId;

+ 2 - 2
src/background/workflow-engine/blocks-handler/handler-new-tab.js

@@ -45,7 +45,7 @@ async function newTab(block) {
 
     this.activeTab.url = url;
     if (tab) {
-      if (this.workflow.settings.debugMode || customUserAgent) {
+      if (this.settings.debugMode || customUserAgent) {
         await attachDebugger(tab.id, this.activeTab.id);
 
         if (customUserAgent) {
@@ -80,7 +80,7 @@ async function newTab(block) {
 
     this.activeTab.frameId = 0;
 
-    if (!this.workflow.settings.debugMode && customUserAgent) {
+    if (!this.settings.debugMode && customUserAgent) {
       chrome.debugger.detach({ tabId: tab.id });
     }
 

+ 1 - 1
src/background/workflow-engine/blocks-handler/handler-switch-tab.js

@@ -32,7 +32,7 @@ export default async function ({ data, outputs }) {
     await browser.tabs.update(tab.id, { active: true });
   }
 
-  if (this.workflow.settings.debugMode) {
+  if (this.settings.debugMode) {
     await attachDebugger(tab.id, this.activeTab.id);
   }
 

+ 40 - 289
src/background/workflow-engine/engine.js

@@ -1,17 +1,8 @@
 import browser from 'webextension-polyfill';
 import { nanoid } from 'nanoid';
 import { tasks } from '@/utils/shared';
-import {
-  clearCache,
-  toCamelCase,
-  sleep,
-  parseJSON,
-  isObject,
-  objectHasKey,
-} from '@/utils/helper';
-import referenceData from '@/utils/reference-data';
-import { convertData, waitTabLoaded, getBlockConnection } from './helper';
-import executeContentScript from './execute-content-script';
+import { clearCache, sleep, parseJSON, isObject } from '@/utils/helper';
+import Worker from './worker';
 
 class WorkflowEngine {
   constructor(
@@ -26,13 +17,7 @@ class WorkflowEngine {
     this.parentWorkflow = parentWorkflow;
     this.saveLog = workflow.settings?.saveLog ?? true;
 
-    this.loopList = {};
-    this.repeatedTasks = {};
-
-    this.windowId = null;
-    this.triggerBlock = null;
-    this.currentBlock = null;
-    this.childWorkflowId = null;
+    this.workers = new Map();
 
     this.isDestroyed = false;
     this.isUsingProxy = false;
@@ -58,13 +43,6 @@ class WorkflowEngine {
     }
     this.options = options;
 
-    this.activeTab = {
-      url: '',
-      frameId: 0,
-      frames: {},
-      groupId: null,
-      id: options?.tabId,
-    };
     this.referenceData = {
       variables,
       table: [],
@@ -87,38 +65,6 @@ class WorkflowEngine {
     };
   }
 
-  reset() {
-    this.loopList = {};
-    this.repeatedTasks = {};
-
-    this.windowId = null;
-    this.currentBlock = null;
-    this.childWorkflowId = null;
-
-    this.isDestroyed = false;
-    this.isUsingProxy = false;
-
-    this.history = [];
-    this.preloadScripts = [];
-    this.columns = { column: { index: 0, name: 'column', type: 'any' } };
-
-    this.activeTab = {
-      url: '',
-      frameId: 0,
-      frames: {},
-      groupId: null,
-      id: this.options?.tabId,
-    };
-    this.referenceData = {
-      table: [],
-      loopData: {},
-      workflow: {},
-      googleSheets: {},
-      variables: this.options.variables,
-      globalData: this.referenceData.globalData,
-    };
-  }
-
   init() {
     if (this.workflow.isDisabled) return;
 
@@ -174,7 +120,6 @@ class WorkflowEngine {
     this.blocks = blocks;
     this.startedTimestamp = Date.now();
     this.workflow.table = columns;
-    this.currentBlock = triggerBlock;
 
     this.states.on('stop', this.onWorkflowStopped);
 
@@ -185,7 +130,7 @@ class WorkflowEngine {
         parentState: this.parentWorkflow,
       })
       .then(() => {
-        this.executeBlock(this.currentBlock);
+        this.addWorker({ blockId: triggerBlock.id });
       });
   }
 
@@ -199,6 +144,13 @@ class WorkflowEngine {
     this.init(state.currentBlock);
   }
 
+  addWorker(detail) {
+    const worker = new Worker(this);
+    worker.init(detail);
+
+    this.workers.set(worker.id, worker);
+  }
+
   addLogHistory(detail) {
     if (
       !this.saveLog &&
@@ -214,7 +166,7 @@ class WorkflowEngine {
       detail.replacedValue ||
       (tasks[detail.name]?.refDataKeys && this.saveLog)
     ) {
-      const { activeTabUrl, variables, loopData, prevBlockData } = JSON.parse(
+      const { activeTabUrl, variables, loopData } = JSON.parse(
         JSON.stringify(this.referenceData)
       );
 
@@ -223,7 +175,7 @@ class WorkflowEngine {
           loopData,
           variables,
           activeTabUrl,
-          prevBlockData,
+          prevBlockData: detail.prevBlockData || '',
         },
         replacedValue: detail.replacedValue,
       };
@@ -234,39 +186,6 @@ class WorkflowEngine {
     this.history.push(detail);
   }
 
-  addDataToColumn(key, value) {
-    if (Array.isArray(key)) {
-      key.forEach((item) => {
-        if (!isObject(item)) return;
-
-        Object.entries(item).forEach(([itemKey, itemValue]) => {
-          this.addDataToColumn(itemKey, itemValue);
-        });
-      });
-
-      return;
-    }
-
-    const columnId =
-      (this.columns[key] ? key : this.columnsId[key]) || 'column';
-    const currentColumn = this.columns[columnId];
-    const columnName = currentColumn.name || 'column';
-    const convertedValue = convertData(value, currentColumn.type);
-
-    if (objectHasKey(this.referenceData.table, currentColumn.index)) {
-      this.referenceData.table[currentColumn.index][columnName] =
-        convertedValue;
-    } else {
-      this.referenceData.table.push({ [columnName]: convertedValue });
-    }
-
-    currentColumn.index += 1;
-  }
-
-  setVariable(name, value) {
-    this.referenceData.variables[name] = value;
-  }
-
   async stop() {
     try {
       if (this.childWorkflowId) {
@@ -297,6 +216,19 @@ class WorkflowEngine {
     await browser.storage.local.set({ workflowQueue });
   }
 
+  destroyWorker(workerId) {
+    this.workers.delete(workerId);
+
+    if (this.workers.size === 0) {
+      this.addLogHistory({
+        type: 'finish',
+        name: 'finish',
+      });
+      this.dispatchEvent('finish');
+      this.destroy('success');
+    }
+  }
+
   async destroy(status, message) {
     try {
       if (this.isDestroyed) return;
@@ -311,6 +243,7 @@ class WorkflowEngine {
       }
 
       const endedTimestamp = Date.now();
+      this.workers.clear();
       this.executeQueue();
 
       if (!this.workflow.isTesting) {
@@ -371,151 +304,21 @@ class WorkflowEngine {
     }
   }
 
-  async executeBlock(block, prevBlockData, isRetry) {
-    const currentState = await this.states.get(this.id);
-
-    if (!currentState || currentState.isDestroyed) {
-      if (this.isDestroyed) return;
-
-      await this.destroy('stopped');
-      return;
-    }
-
-    this.currentBlock = block;
-    this.referenceData.prevBlockData = prevBlockData;
-    this.referenceData.activeTabUrl = this.activeTab.url || '';
-
-    if (!isRetry) {
-      await this.states.update(this.id, { state: this.state });
-      this.dispatchEvent('update', { state: this.state });
-    }
-
-    const startExecuteTime = Date.now();
-
-    const blockHandler = this.blocksHandler[toCamelCase(block.name)];
-    const handler =
-      !blockHandler && tasks[block.name].category === 'interaction'
-        ? this.blocksHandler.interactionBlock
-        : blockHandler;
-
-    if (!handler) {
-      console.error(`"${block.name}" block doesn't have a handler`);
-      this.destroy('stopped');
-      return;
-    }
-
-    const replacedBlock = referenceData({
-      block,
-      data: this.referenceData,
-      refKeys:
-        isRetry || block.data.disableBlock
-          ? null
-          : tasks[block.name].refDataKeys,
-    });
-    const blockDelay = this.workflow.settings?.blockDelay || 0;
-    const addBlockLog = (status, obj = {}) => {
-      this.addLogHistory({
-        type: status,
-        name: block.name,
-        description: block.data.description,
-        replacedValue: replacedBlock.replacedValue,
-        duration: Math.round(Date.now() - startExecuteTime),
-        ...obj,
-      });
+  async updateState(data) {
+    const state = {
+      ...this.state,
+      ...data,
+      tabIds: [],
+      currentBlock: [],
     };
 
-    try {
-      let result;
-
-      if (block.data.disableBlock) {
-        result = {
-          data: '',
-          nextBlockId: getBlockConnection(block),
-        };
-      } else {
-        result = await handler.call(this, replacedBlock, {
-          prevBlockData,
-          refData: this.referenceData,
-        });
-
-        if (result.replacedValue) {
-          replacedBlock.replacedValue = result.replacedValue;
-        }
-
-        addBlockLog(result.status || 'success', {
-          logId: result.logId,
-        });
-      }
-
-      if (result.nextBlockId) {
-        setTimeout(() => {
-          this.executeBlock(this.blocks[result.nextBlockId], result.data);
-        }, blockDelay);
-      } else {
-        this.addLogHistory({
-          type: 'finish',
-          name: 'finish',
-        });
-        this.dispatchEvent('finish');
-        this.destroy('success');
-      }
-    } catch (error) {
-      const { onError: blockOnError } = replacedBlock.data;
-      if (blockOnError && blockOnError.enable) {
-        if (blockOnError.retry && blockOnError.retryTimes) {
-          await sleep(blockOnError.retryInterval * 1000);
-          blockOnError.retryTimes -= 1;
-          await this.executeBlock(replacedBlock, prevBlockData, true);
-
-          return;
-        }
-
-        const nextBlockId = getBlockConnection(
-          block,
-          blockOnError.toDo === 'continue' ? 1 : 2
-        );
-        if (blockOnError.toDo !== 'error' && nextBlockId) {
-          this.executeBlock(this.blocks[nextBlockId], '');
-          return;
-        }
-      }
-
-      addBlockLog('error', {
-        message: error.message,
-        ...(error.data || {}),
-      });
-
-      const { onError } = this.workflow.settings;
-
-      if (onError === 'keep-running' && error.nextBlockId) {
-        setTimeout(() => {
-          this.executeBlock(this.blocks[error.nextBlockId], error.data || '');
-        }, blockDelay);
-      } else if (onError === 'restart-workflow' && !this.parentWorkflow) {
-        const restartKey = `restart-count:${this.id}`;
-        const restartCount = +localStorage.getItem(restartKey) || 0;
-        const maxRestart = this.workflow.settings.restartTimes ?? 3;
-
-        if (restartCount >= maxRestart) {
-          localStorage.removeItem(restartKey);
-          this.destroy();
-          return;
-        }
-
-        this.reset();
-
-        const triggerBlock = Object.values(this.blocks).find(
-          ({ name }) => name === 'trigger'
-        );
-        this.executeBlock(triggerBlock);
-
-        localStorage.setItem(restartKey, restartCount + 1);
-      } else {
-        this.destroy('error', error.message);
-      }
+    this.workers.forEach((worker) => {
+      state.tabIds.push(worker.activeTab.id);
+      state.currentBlock.push(worker.currentBlock);
+    });
 
-      console.error(`${block.name}:`, error);
-    }
+    await this.states.update(this.id, { state });
+    this.dispatchEvent('update', { state });
   }
 
   dispatchEvent(name, params) {
@@ -535,16 +338,7 @@ class WorkflowEngine {
   }
 
   get state() {
-    const keys = [
-      'history',
-      'columns',
-      'activeTab',
-      'isUsingProxy',
-      'currentBlock',
-      'referenceData',
-      'childWorkflowId',
-      'startedTimestamp',
-    ];
+    const keys = ['columns', 'referenceData', 'startedTimestamp'];
     const state = {
       name: this.workflow.name,
       icon: this.workflow.icon,
@@ -556,49 +350,6 @@ class WorkflowEngine {
 
     return state;
   }
-
-  async _sendMessageToTab(payload, options = {}) {
-    try {
-      if (!this.activeTab.id) {
-        const error = new Error('no-tab');
-        error.workflowId = this.id;
-
-        throw error;
-      }
-
-      await waitTabLoaded(this.activeTab.id);
-      await executeContentScript(
-        this.activeTab.id,
-        this.activeTab.frameId || 0
-      );
-
-      const { executedBlockOnWeb, debugMode } = this.workflow.settings;
-      const messagePayload = {
-        isBlock: true,
-        debugMode,
-        executedBlockOnWeb,
-        activeTabId: this.activeTab.id,
-        frameSelector: this.frameSelector,
-        ...payload,
-      };
-
-      const data = await browser.tabs.sendMessage(
-        this.activeTab.id,
-        messagePayload,
-        { frameId: this.activeTab.frameId, ...options }
-      );
-
-      return data;
-    } catch (error) {
-      if (error.message?.startsWith('Could not establish connection')) {
-        error.message = 'Could not establish connection to the active tab';
-      } else if (error.message?.startsWith('No tab')) {
-        error.message = 'active-tab-removed';
-      }
-
-      throw error;
-    }
-  }
 }
 
 export default WorkflowEngine;

+ 1 - 1
src/background/workflow-engine/helper.js

@@ -59,7 +59,7 @@ export function convertData(data, type) {
 }
 
 export function getBlockConnection(block, index = 1) {
-  const blockId = block.outputs[`output_${index}`]?.connections[0]?.node;
+  const blockId = block.outputs[`output_${index}`];
 
   return blockId;
 }

+ 329 - 0
src/background/workflow-engine/worker.js

@@ -0,0 +1,329 @@
+import { nanoid } from 'nanoid';
+import browser from 'webextension-polyfill';
+import cloneDeep from 'lodash.clonedeep';
+import { toCamelCase, sleep, objectHasKey, isObject } from '@/utils/helper';
+import { tasks } from '@/utils/shared';
+import referenceData from '@/utils/reference-data';
+import { convertData, waitTabLoaded, getBlockConnection } from './helper';
+import executeContentScript from './execute-content-script';
+
+class Worker {
+  constructor(engine) {
+    this.id = nanoid();
+    this.engine = engine;
+    this.settings = engine.workflow.settings;
+
+    this.loopList = {};
+    this.repeatedTasks = {};
+    this.preloadScripts = [];
+
+    this.windowId = null;
+    this.currentBlock = null;
+    this.childWorkflowId = null;
+
+    this.activeTab = {
+      url: '',
+      frameId: 0,
+      frames: {},
+      groupId: null,
+      id: engine.options?.tabId,
+    };
+  }
+
+  init({ blockId, prevBlockData, state }) {
+    if (state) {
+      Object.keys(state).forEach((key) => {
+        this[key] = state[key];
+      });
+    }
+
+    const block = this.engine.blocks[blockId];
+    this.executeBlock(block, prevBlockData);
+  }
+
+  addDataToColumn(key, value) {
+    if (Array.isArray(key)) {
+      key.forEach((item) => {
+        if (!isObject(item)) return;
+
+        Object.entries(item).forEach(([itemKey, itemValue]) => {
+          this.addDataToColumn(itemKey, itemValue);
+        });
+      });
+
+      return;
+    }
+
+    const columnId =
+      (this.engine.columns[key] ? key : this.engine.columnsId[key]) || 'column';
+    const currentColumn = this.engine.columns[columnId];
+    const columnName = currentColumn.name || 'column';
+    const convertedValue = convertData(value, currentColumn.type);
+
+    if (objectHasKey(this.engine.referenceData.table, currentColumn.index)) {
+      this.engine.referenceData.table[currentColumn.index][columnName] =
+        convertedValue;
+    } else {
+      this.engine.referenceData.table.push({ [columnName]: convertedValue });
+    }
+
+    currentColumn.index += 1;
+  }
+
+  setVariable(name, value) {
+    this.engine.referenceData.variables[name] = value;
+  }
+
+  executeNextBlocks(connections, prevBlockData) {
+    connections.forEach(({ node }, index) => {
+      if (index === 0) {
+        this.executeBlock(this.engine.blocks[node], prevBlockData);
+      } else {
+        const cloneState = cloneDeep({
+          windowId: this.windowId,
+          activeTab: this.activeTab,
+          preloadScripts: this.preloadScripts,
+        });
+        const state = {
+          ...cloneState,
+          loopList: this.loopList,
+          repeatedTasks: this.repeatedTasks,
+        };
+
+        this.engine.addWorker({
+          state,
+          prevBlockData,
+          blockId: node,
+        });
+      }
+    });
+  }
+
+  async executeBlock(block, prevBlockData, isRetry) {
+    const currentState = await this.engine.states.get(this.engine.id);
+
+    if (!currentState || currentState.isDestroyed) {
+      if (this.engine.isDestroyed) return;
+
+      await this.engine.destroy('stopped');
+      return;
+    }
+
+    this.currentBlock = block;
+
+    if (!isRetry) {
+      await this.engine.updateState({
+        activeTabUrl: this.activeTab.url,
+        childWorkflowId: this.childWorkflowId,
+      });
+    }
+
+    const startExecuteTime = Date.now();
+
+    const blockHandler = this.engine.blocksHandler[toCamelCase(block.name)];
+    const handler =
+      !blockHandler && tasks[block.name].category === 'interaction'
+        ? this.engine.blocksHandler.interactionBlock
+        : blockHandler;
+
+    if (!handler) {
+      this.engine.destroy('stopped');
+      return;
+    }
+
+    const refData = {
+      prevBlockData,
+      ...this.engine.referenceData,
+      activeTabUrl: this.activeTab.url,
+    };
+    const replacedBlock = referenceData({
+      block,
+      data: refData,
+      refKeys:
+        isRetry || block.data.disableBlock
+          ? null
+          : tasks[block.name].refDataKeys,
+    });
+    const blockDelay = this.settings?.blockDelay || 0;
+    const addBlockLog = (status, obj = {}) => {
+      this.engine.addLogHistory({
+        prevBlockData,
+        type: status,
+        name: block.name,
+        description: block.data.description,
+        replacedValue: replacedBlock.replacedValue,
+        duration: Math.round(Date.now() - startExecuteTime),
+        ...obj,
+      });
+    };
+
+    try {
+      let result;
+
+      if (block.data.disableBlock) {
+        result = {
+          data: '',
+          nextBlockId: getBlockConnection(block),
+        };
+      } else {
+        result = await handler.call(this, replacedBlock, {
+          refData,
+          prevBlockData,
+        });
+
+        if (result.replacedValue) {
+          replacedBlock.replacedValue = result.replacedValue;
+        }
+
+        addBlockLog(result.status || 'success', {
+          logId: result.logId,
+        });
+      }
+
+      let nodeConnections = null;
+
+      if (typeof result.nextBlockId === 'string') {
+        nodeConnections = [{ node: result.nextBlockId }];
+      } else {
+        nodeConnections = result.nextBlockId.connections;
+      }
+
+      if (nodeConnections.length > 0) {
+        setTimeout(() => {
+          this.executeNextBlocks(nodeConnections);
+        }, blockDelay);
+      } else {
+        this.engine.destroyWorker(this.id);
+      }
+    } catch (error) {
+      const { onError: blockOnError } = replacedBlock.data;
+      if (blockOnError && blockOnError.enable) {
+        if (blockOnError.retry && blockOnError.retryTimes) {
+          await sleep(blockOnError.retryInterval * 1000);
+          blockOnError.retryTimes -= 1;
+          await this.executeBlock(replacedBlock, prevBlockData, true);
+
+          return;
+        }
+
+        const nextBlocks = getBlockConnection(
+          block,
+          blockOnError.toDo === 'continue' ? 1 : 2
+        );
+        if (blockOnError.toDo !== 'error' && nextBlocks.connections) {
+          this.executeNextBlocks(nextBlocks.connections, prevBlockData);
+          return;
+        }
+      }
+
+      addBlockLog('error', {
+        message: error.message,
+        ...(error.data || {}),
+      });
+
+      const { onError } = this.settings;
+      const nodeConnections = error.nextBlockId.connections;
+
+      if (onError === 'keep-running' && nodeConnections) {
+        setTimeout(() => {
+          this.executeNextBlocks(nodeConnections, error.data || '');
+        }, blockDelay);
+      } else if (onError === 'restart-workflow' && !this.parentWorkflow) {
+        const restartKey = `restart-count:${this.id}`;
+        const restartCount = +localStorage.getItem(restartKey) || 0;
+        const maxRestart = this.settings.restartTimes ?? 3;
+
+        if (restartCount >= maxRestart) {
+          localStorage.removeItem(restartKey);
+          this.engine.destroy();
+          return;
+        }
+
+        this.reset();
+
+        const triggerBlock = Object.values(this.engine.blocks).find(
+          ({ name }) => name === 'trigger'
+        );
+        this.executeBlock(triggerBlock);
+
+        localStorage.setItem(restartKey, restartCount + 1);
+      } else {
+        this.engine.destroy('error', error.message);
+      }
+    }
+  }
+
+  reset() {
+    this.loopList = {};
+    this.repeatedTasks = {};
+
+    this.windowId = null;
+    this.currentBlock = null;
+    this.childWorkflowId = null;
+
+    this.engine.history = [];
+    this.engine.preloadScripts = [];
+    this.engine.columns = { column: { index: 0, name: 'column', type: 'any' } };
+
+    this.activeTab = {
+      url: '',
+      frameId: 0,
+      frames: {},
+      groupId: null,
+      id: this.options?.tabId,
+    };
+    this.engine.referenceData = {
+      table: [],
+      loopData: {},
+      workflow: {},
+      googleSheets: {},
+      variables: this.engine.options.variables,
+      globalData: this.engine.referenceData.globalData,
+    };
+  }
+
+  async _sendMessageToTab(payload, options = {}) {
+    try {
+      if (!this.activeTab.id) {
+        const error = new Error('no-tab');
+        error.workflowId = this.id;
+
+        throw error;
+      }
+
+      await waitTabLoaded(this.activeTab.id);
+      await executeContentScript(
+        this.activeTab.id,
+        this.activeTab.frameId || 0
+      );
+
+      const { executedBlockOnWeb, debugMode } = this.settings;
+      const messagePayload = {
+        isBlock: true,
+        debugMode,
+        executedBlockOnWeb,
+        activeTabId: this.activeTab.id,
+        frameSelector: this.frameSelector,
+        ...payload,
+      };
+
+      const data = await browser.tabs.sendMessage(
+        this.activeTab.id,
+        messagePayload,
+        { frameId: this.activeTab.frameId, ...options }
+      );
+
+      return data;
+    } catch (error) {
+      if (error.message?.startsWith('Could not establish connection')) {
+        error.message = 'Could not establish connection to the active tab';
+      } else if (error.message?.startsWith('No tab')) {
+        error.message = 'active-tab-removed';
+      }
+
+      throw error;
+    }
+  }
+}
+
+export default Worker;

+ 2 - 5
src/components/newtab/workflow/WorkflowBuilder.vue

@@ -749,14 +749,11 @@ export default {
       editor.value.on(
         'connectionCreated',
         ({ output_id, input_id, output_class, input_class }) => {
-          const { outputs } = editor.value.getNodeFromId(output_id);
           const { name: inputName } = editor.value.getNodeFromId(input_id);
-          const { allowedInputs, maxConnection } = tasks[inputName];
+          const { allowedInputs } = tasks[inputName];
           const isAllowed = isInputAllowed(allowedInputs, inputName);
-          const isMaxConnections =
-            outputs[output_class]?.connections.length > maxConnection;
 
-          if (!isAllowed || isMaxConnections) {
+          if (!isAllowed) {
             editor.value.removeSingleConnection(
               output_id,
               input_id,