Using the AMQP Source Component

The AMQP Source Component is an SSIS data flow pipeline component that can be used to read/receive data from an AMQP-1.0 compliant broker. The component includes the following two pages of configuration:

  • General
  • Columns

General Page

The General page allows you to configure various options that will help you receive messages from AMQP.

AMQP SSIS Source Component

Connection Manager

The source component requires a connection to access a message queue. The Connection Manager drop-down will show a list of all Kingswaysoft AMQP Connection Managers and also Microsoft AMQP Connection managers that have been created in the current SSIS package.

Source

The name of the queue to receive messages from.

Durable

Specify if the Source is durable to survive when a broker restart.

Receive Mode

There are two different modes for available for receiving messages:

  • Peek: Messages will be received from the queue without deleting them or modifying the queue in any way.
  • Receive Messages and Delete: Messages will be retrieved and deleted from the queue.
Body Column Type

The Body Column Type allows you to specify whether you want to read the binary content of the message or the text content of the message from AMQP. There are two different modes available for Body column types: Binary and Text. The default setting is Text.

Encoding

The encoding to use to decode the body of received messages. This option is only available when the Body Column Type is Text.

Listener Mode

After the AMQP component has retrieved all available messages from the queue it can continue to listen for more messages for a period of time.

There are three listener modes available.

  • Fixed Time Mode: Allows the component to listen for messages for a specified period of time.
  • Wait Until Mode: Allows the component to listen for messages until a specified date and time.
  • Wait Until Variable Mode: Allows the component to listen for messages until a date and time specified in a variable.

Listener modes other than None are only compatible with the Receive and Delete receive modes.

When new messages are detected in listener mode they may not enter the SSIS pipeline immediately; this depends on the buffer settings of the SSIS task. Internally any row that is directed to an output goes to an SSIS buffer, and SSIS will not actually direct the rows in that buffer until the buffer is considered full, or the component has finished processing all of its data. While in listener mode the component is considered to be processing data until the listener time has expired, which is when the full buffer is guaranteed to be completely processed. To direct rows as fast as possible in listener mode consider lowering the DefaultBufferMaxRows property and DefaultBufferSize properties of the task to the lowest possible values.

Refresh Component Button

Clicking the Refresh Component button causes the component to retrieve the latest metadata and update each field to its most recent metadata. It will remove any custom fields that have been added to the columns page.

Expression fx Icon

Click the blue fx icon to launch SSIS Expression Editor to enable dynamic updates of the property at run time.

Generate Documentation Icon

Click the Generate Documentation icon to generate a Word document that describes the component's metadata including relevant mapping, and so on.

Columns Page

The Columns page shows you a default list of properties of messages that will be retrieved. You may indicate which columns to include in your source component by checking or unchecking the checkbox next to each attribute.

AMQP SSIS Source Component - Columns

The Columns Page grid consists of:

  • AMQP Field: Fields that will be retrieved when reading the messages.
  • Data Type: The data type of this field.

Note: As a general best practice, you should only select the fields that are needed for the downstream pipeline components.

Source Address

For sources, addresses are:


Addresses
ActiveMQ

{queue_name}

topic://{topic_name}.

RabbitMQ

{queue_name}

/queue/{queue_name}

/amq/queue/{queue_name}

/topic/{routing_key}

/exchange/{exchange_name}/{routing_key}

Azure Service Bus

{queue_name}

{topic_name}/Subscriptions/{subscription_name}

{event_hub_name}/ConsumerGroups/{consumer_group_name}/Partitions/{partition_number}