How-To: Transfer Files using MQTT

What Does This Article Cover?

This How-To article will provide an overview to a File replication use case. The use case centers on reading files from a primary location and forwarding them to an MQTT broker. Next, it entails replicating the files along with their directory structures at a secondary location.

  • Prerequisites

    • Project Import
    • Sources Files to MQTT
    • Writing Files from MQTT
    • Summary

Prerequisites

  1. You have completed the initial Installation of the Intelligence Hub
  2. You have applied your Intelligence Hub license to your environment
  3. You have an MQTT broker configured
  4. It is recommended to read the Getting Started Series before completing this tutorial exercise
  5. It is recommended to complete “Tutorial: Connections”

Project Import

  1. If required, enable Intelligence Hub MQTT broker

    • In the left-hand navigation panel, navigate to Manage, and click Settings
    • Under the MQTT Broker section enable the broker, if ports 1885 and 1886 are being utilized on your Intelligence Hub server, update to ports of your choosing, otherwise accept the defaults and click save
  2. Import the project file

    • In the left-hand navigation panel, navigate to Manage, and click Project
    • Within the Import screen, ensure Full Project is off (otherwise your existing project will be overwritten)
    • Change the Import Type to JSON and paste in the following with the Project box and click the import button
    {
    	"productInfo": {
    		"company": "HighByte",
    		"product": "IntelligenceHub",
    		"version": "3.3.0",
    		"build": "2023.12.21.1209",
    		"stage": "Released"
    	},
    	"project": {
    		"version": 6,
    		"connections": [
    			{
    				"name": "MQTT",
    				"uri": "mqtt://0.0.0.0:1885",
    				"tags": [],
    				"writes": {
    					"flattenModeledValues": false
    				},
    				"storeForward": {
    					"enabled": false,
    					"maxEntries": 100,
    					"waitOnFailureInterval": {
    						"duration": 1,
    						"units": "Seconds"
    					}
    				},
    				"settings": {
    					"connectionTimeoutSeconds": 10,
    					"keepAliveSeconds": 60,
    					"requestTimeoutMS": 5000,
    					"cleanSession": true,
    					"ssl": false,
    					"redundantBrokers": [],
    					"inputDiscovery": ""
    				}
    			},
    			{
    				"name": "Source_File",
    				"uri": "file://Source_File",
    				"tags": [],
    				"writes": {
    					"flattenModeledValues": false
    				},
    				"storeForward": {
    					"enabled": false,
    					"maxEntries": 100,
    					"waitOnFailureInterval": {
    						"duration": 1,
    						"units": "Seconds"
    					}
    				},
    				"settings": {
    					"communicationProtocol": {
    						"type": "FILE_PROTOCOL"
    					},
    					"directory": "C:\\HighByte\\Files\\KB",
    					"processedDirectory": "C:\\HighByte\\Files\\KB\\Proc",
    					"errorDirectory": "C:\\HighByte\\Files\\KB\\Error"
    				}
    			},
    			{
    				"name": "Target_File",
    				"uri": "file://Target_File",
    				"description": "Note this Connection would typically exist on a separate Intelligence Hub environment within another network segment. ",
    				"tags": [],
    				"writes": {
    					"flattenModeledValues": false
    				},
    				"storeForward": {
    					"enabled": false,
    					"maxEntries": 100,
    					"waitOnFailureInterval": {
    						"duration": 1,
    						"units": "Seconds"
    					}
    				},
    				"settings": {
    					"communicationProtocol": {
    						"type": "FILE_PROTOCOL"
    					},
    					"directory": "C:\\HighByte\\Files\\KB_Output"
    				}
    			}
    		],
    		"inputs": [
    			{
    				"name": "File_Input",
    				"connection": "MQTT",
    				"type": "mqtt",
    				"qualifier": {
    					"payloadType": "json",
    					"qos": 0,
    					"topic": "Output/#",
    					"includeTopic": false
    				},
    				"cacheLifetime": {
    					"enabled": false
    				}
    			},
    			{
    				"name": "Read_All_Text_CSV",
    				"connection": "Source_File",
    				"type": "file",
    				"qualifier": {
    					"fileName": ".*\\.(txt|TXT|csv|CSV)",
    					"includeMetadata": true,
    					"indexFile": true
    				},
    				"cacheLifetime": {
    					"enabled": false
    				}
    			}
    		],
    		"outputs": [
    			{
    				"name": "File_Output",
    				"connection": "MQTT",
    				"type": "mqtt",
    				"qualifier": {
    					"qos": 0,
    					"namedRoot": false,
    					"breakupArrays": false,
    					"topic": "Output/{{this.filename}}",
    					"retained": false
    				}
    			},
    			{
    				"name": "File_Output",
    				"connection": "Target_File",
    				"type": "file",
    				"qualifier": {
    					"breakupArrays": false,
    					"base64Decoding": true,
    					"fileName": "{{this.filename}}",
    					"payloadReference": "{{this.filevalue}}"
    				}
    			}
    		],
    		"modeling": {
    			"models": [
    				{
    					"name": "File_Model",
    					"tags": [],
    					"attributes": [
    						{
    							"name": "filename",
    							"type": "Any",
    							"array": false,
    							"required": true
    						},
    						{
    							"name": "filevalue",
    							"type": "Any",
    							"array": false,
    							"required": true
    						}
    					]
    				}
    			],
    			"instances": [
    				{
    					"name": "File_Read_Target_Model_Instance",
    					"tags": [],
    					"model": "File_Model",
    					"rootValueAs": "Object",
    					"attributes": [
    						{
    							"name": "filename",
    							"expression": "{{Connection.MQTT.File_Input}}.filename"
    						},
    						{
    							"name": "filevalue",
    							"expression": "{{Connection.MQTT.File_Input}}.filevalue"
    						}
    					]
    				},
    				{
    					"name": "File_Write_Source_Model_Instance",
    					"tags": [],
    					"model": "File_Model",
    					"rootValueAs": "Object",
    					"attributes": [
    						{
    							"name": "filename",
    							"expression": "{{Connection.Source_File.Read_All_Text_CSV}}._filename.replace(/\\\\/g,'/')"
    						},
    						{
    							"name": "filevalue",
    							"expression": "{{Connection.Source_File.Read_All_Text_CSV}}.value"
    						}
    					]
    				}
    			]
    		},
    		"flows": [
    			{
    				"name": "MQTT_To_Target_File",
    				"tags": [],
    				"inReferences": [
    					"{{Instance.File_Read_Target_Model_Instance}}"
    				],
    				"outReferences": [
    					"{{Connection.Target_File.File_Output}}"
    				],
    				"enabled": false,
    				"publishMode": "All",
    				"trigger": {
    					"type": "Exception",
    					"interval": {
    						"duration": 1,
    						"units": "Seconds"
    					},
    					"mode": "Always",
    					"expression": "{{Connection.MQTT.File_Input}}",
    					"delay": {
    						"duration": 0,
    						"units": "Seconds"
    					}
    				}
    			},
    			{
    				"name": "Source_File_To_MQTT",
    				"tags": [],
    				"inReferences": [
    					"{{Instance.File_Write_Source_Model_Instance}}"
    				],
    				"outReferences": [
    					"{{Connection.MQTT.File_Output}}"
    				],
    				"enabled": false,
    				"publishMode": "All",
    				"trigger": {
    					"type": "Polled",
    					"interval": {
    						"duration": 1,
    						"units": "Seconds"
    					},
    					"mode": "WhileTrue",
    					"expression": "{{Connection.Source_File.Read_All_Text_CSV}}!== null",
    					"delay": {
    						"duration": 0,
    						"units": "Seconds"
    					}
    				}
    			}
    		],
    		"conditions": [],
    		"functions": [],
    		"tags": [],
    		"pipelines": []
    	},
    	"network": {
    		"groups": [],
    		"hubs": []
    	}
    }
  3. Update the imported Connections as required

    • Click Connections, Click MQTT and Update the MQTT settings as required based on the prior preparation step #1

    • Click Connections, Click Source_File and Update the following directories to a local directory on the Intelligence Hub server. In this project file, we use the following:

      • File Directory: C:\HighByte\Files\KB
      • Processed File Directory: C:\HighByte\Files\KB\Proc
      • Error Directory: C:\HighByte\Files\KB\Error
    • Click Connections, Click Target_File and Update the File Directory to a local directory on the Intelligence Hub server. In this project file, we use the following:

      • File Directory: C:\HighByte\Files\KB_Output
  4. Ensure your source directory has a CSV or Text file

Ensure the account starting the Intelligence Hub runtime has read and write access to the relevant directories. Verify that the files can be read by clicking the “test read” on the input page. A test write can be conducted on the Output as well.

  1. Setup UNS Client

    • In the left-hand navigation panel, navigate to Tools and right click UNS client and open Link in New Tab
    • Enter login information
    • For Connection select MQTT
    • For Subscribed Topics remove the default wildcard entry # and Subscribe to Topic Output/#
    • Click Add
    • Click Connect and confirm UNS client says “Connected to MQTT”
    • Return to the previous tab

Source Files to MQTT

  1. Login to your Intelligence Hub environment

  2. Review the source data

    • Navigate to Connections and select Source_File

    • Navigate to Inputs

    • Select Read_All_Text_CSV and click Test Input

      • Take note of the file name structure, this utilizes Regex to only read files with the following extensions: .txt, .TXT, .csv and .CSV
      • This input utilizes the “Process File” functionality, when Files are read from the source and a flow is attempted, Intelligence Hub will migrate the file from the specific file directory to the processed file directory even if the write fails.
      • This input utilizes the “Include Metadata” setting, this enables us to utilize the _filename metadata which includes the relevant subpath folders

  1. Navigate to Instances, select File_Write_Source_Model_Instance, click Test Instance

    • Take note we have mapped the _filename metadata value to the filename attribute. We’ve also mapped the binary value of the file to the filevalue attribute.

    • We will utilize a simple JavaScript replace function, this will allow us to replace the "" characters with ‘/’ and enable MQTT ‘/’ delamination to take effect replicating and retaining our sub-folder structure

      * .replace(/\\/g,'/')
      
      Without replace expression:	"filename": "Subfolder2\\file3.txt"
      With replace expression: "filename": "Subfolder2/file3.txt"
    • Note: For Linux based File systems, you do not need to utilize this .replace expression, since ‘/’ is typically used as a delimiter.

Note that characters recognized as delimiters for the topic structure may be different and/or configurable with a third-party MQTT service.

  1. Let’s review our Output, navigate to Connections, select MQTT, select outputs and open File_Output. Take note of the specified output Topic Output/{{this.filename}}, we are using Dynamic Referencing to extract the filename attribute from the source Instance.

Dynamic Referencing plays a critical role here, we are designing this use case to read and process many files with unique names.

  1. Navigate to Flows, open Source_File_To_MQTT

    • Take note we are utilziing our previously reviewed Source and Target
    • To prevent false errors with this use case, we are utilizing a Polled Mode of While True with the following expression: {{Connection.Source_File.Read_All_Text_CSV}}!== null

This utilizes a simple JavaScript expression to ensure a write to target is attempted when a file appears in the source file directory.

  1. Enable the Source_File_To_MQTT and review the results within your UNS client

    • Assuming you have .txt or .csv files in the Source_File directory, you should see the root and nested files replicated to the Output MQTT topic space
    • Take note the Process Files were moved from the specified Source_File File Directory to the Processed File Directory, this should also be logged in the Intelligence Hub log file

Writing Files from MQTT

Take note this section of the use case would typically be configured on a separate instance of the Intelligence Hub, usually within a separate network segment to read the Files from MQTT and write to a new target directory.

  1. Login to your Intelligence Hub environment

  2. Review the source MQTT Input

    • Navigate to Connections
    • Select MQTT
    • Navigate to Inputs
    • Select File_Input
    • Take note we are utilizing MQTT wildcards and reading everything within Output/#
    • Perform a Test Input and review the results

  1. Review the target File directory

    • Navigate to Connections
    • Select Target_File, navigate to Outputs, select File_Output
    • Take note we are utilizing dynamic referencing for the File Name and Payload Reference settings to utilize our Instance attribute values
    • Also note we need to utilize the Base64 Decoding to properly decode the incoming filevalue back to the corresponding File extension
  2. Navigate to Flows

    • Select MQTT_To_Target_File
    • Take note of the Source and Targets
    • Take note we are utilizing an Event type Flow as our Source is an MQTT input
    • To prevent false errors with this use case, we are utilizing the Event/Always mode with the following expression: {{Connection.MQTT.File_Input}}

This utilizes a simple JavaScript expression to ensure a write to target is not attempted when the source MQTT input is null or empty.

  1. Enable the MQTT_To_Target_File Flow

    • Within your operating system’s file explorer, review the results in your target directory
    • If there are no files, you may need to copy the processed files back to the source file directory in order to satisfy the flow condition.

Summary

This How-To article shows you how to read in source files, replicate the file name and sub-folder directories to another target file directory. We utilize MQTT as our intermediator to retain and replicate the file names and sub-folder directories. This use cases enables secure file movements utilizing the hub and spoke model.

Here we reflected the source file structure in our MQTT topic path, but having a different topic structure for your MQTT hierarchy may be better suited in some cases. In that case, the file structure can be published as a value instead, but still applied at the consumer file output using dynamic referencing to reach the same outcome.

Additional Resources