Example Plugin: Slack

In this example, we're going to walk through a plugin that posts messages into Slack.

Outbound, it illustrates an event-style scenario, where we only action newly created Snowflake records once, then we forget about them.

Inbound, it's just a simple example of fetching some useful information from a messaging system.

The full source code for the plugin is here, but I'm going to step through each section and explain what it does.

Please note, this example is illustrative only and uses a minimal amount of code. A production plugin would typically include a lot more error condition checking and logging statements.

Structure

After scrolling past the import statements, the first thing to notice is that the plugin code is in a single file containing a Python class that inherits from OmnataPlugin.

class SlackPlugin(OmnataPlugin):
    """
    An example plugin which posts messages to Slack, and retrieves the list of channels.
    """
    def __init__(self):
        OmnataPlugin.__init__(self)
        self.slack_user_email_cache = {}
        self.headers = None

You can import from as many other local files as you like, if that's your thing.

There is also an icon.svg file which contains the icon for the app. The only requirement here is that the width and height at the root of the SVG document are set to "100%".

Manifest

Then we override the get_manifest method and return information about the plugin.

def get_manifest(self) -> PluginManifest:
    return PluginManifest(
        plugin_id="slack",
        plugin_name="Slack",
        developer_id="omnata",
        developer_name="Omnata",
        docs_url="https://docs.omnata.com",
        supports_inbound=True,
        supported_outbound_strategies=[SendSyncStrategy()],
    )

The label determines what the app is named in the Omnata Sync UI:

We only support the "Send" outbound strategy because most people live in the moment in Slack. We could build a plugin that updates and deletes previous Slack messages after the original Snowflake record is updated or deleted, but that's probably overkill, right?

Connecting to the app

When a user selects an Plugin, the first thing we ask is that they connect to their instance of that application. So the connection_form method is called so that we can render a form to display to the user.

In our case, the user will create a Slack App and provide the Client ID and Client Secret.


def connection_form(self) -> List[ConnectionMethod]:
    return [
        ConnectionMethod(
            name="OAuth (User Created App)",
            fields=[],
            oauth_template=SecurityIntegrationTemplate(
                oauth_client_id="<Client ID from App Credentials>",
                oauth_client_secret="<Client Secret from App Credentials>",
                oauth_token_endpoint="https://slack.com/api/oauth.v2.access",
                oauth_authorization_endpoint="https://slack.com/oauth/v2/authorize",
                oauth_allowed_scopes=["chat:write", "users:read.email", "users:read", "channels:read"],
            ),
        )
    ]

Notice too that our form was wrapped in a ConnectionMethod and we return a list of these. If the app has multiple ways of connecting (e.g. API keys, credentials, SSH keys), we can provide a list of options to the user.

After all connection parameters have been gathered, and any OAuth flows completed, the plugin will be asked which network addresses need to be permitted:

def network_addresses(self, parameters: ConnectionConfigurationParameters) -> List[str]:
        return ["slack.com", "www.slack.com"]

The user will be shown these (just for their own peace of mind), and then a connection test will be performed by invoking the connect function:

def connect(self, parameters: ConnectionConfigurationParameters) -> ConnectResponse:
    self._get_webclient(parameters).auth_test()
    return ConnectResponse()

def _get_webclient(self, parameters: SyncConfigurationParameters) -> WebClient:
    access_token = parameters.get_connection_secret("access_token").value
    return WebClient(token=access_token)

here, we're just using the slack_sdk python package to test the access token. Note that "access_token" is a special parameter created by the OAuth connection process (in other words, you access it like any other Omnata connection form value, instead of using the Snowflake secrets SDK.

Next, we see the API limits defined:

def api_limits(self, parameters: SyncConfigurationParameters) -> List[ApiLimits]:
    return [
        ApiLimits(
            endpoint_category="Web API Tier 1",
            request_matchers=[
                HttpRequestMatcher(http_methods=["GET", "POST", "PUT"], url_regex="admin.teams.create")
            ],
            request_rates=[RequestRateLimit(request_count=5, time_unit="minute", unit_count=1)],
        ),
        ApiLimits(
            endpoint_category="Web API Tier 2",
            request_matchers=[
                HttpRequestMatcher(http_methods=["GET", "POST", "PUT"], url_regex="conversations.list")
            ],
            request_rates=[RequestRateLimit(request_count=25, time_unit="minute", unit_count=1)],
        ),
        ApiLimits(
            endpoint_category="Web API Tier 3",
            request_matchers=[
                HttpRequestMatcher(http_methods=["GET", "POST", "PUT"], url_regex="conversations.history")
            ],
            request_rates=[RequestRateLimit(request_count=55, time_unit="minute", unit_count=1)],
        ),
        ApiLimits(
            endpoint_category="Web API Tier 4",
            request_matchers=[HttpRequestMatcher(http_methods=["GET", "POST", "PUT"], url_regex="users.info")],
            request_rates=[RequestRateLimit(request_count=105, time_unit="minute", unit_count=1)],
        ),
        ApiLimits(
            endpoint_category="Posting messages",
            request_matchers=[
                HttpRequestMatcher(http_methods=["GET", "POST", "PUT"], url_regex="chat.postMessage")
            ],
            request_rates=[RequestRateLimit(request_count=1, time_unit="second", unit_count=1)],
        ),
    ]

These rate limits will appear like this to the user, and they can be overridden:

More information on rate limit can be found at Rate Limiting.

Outbound Syncs

Configuration Form

Now that the connection to the app has been established and we know our source table, we can render the form that configures outbound syncs.

The Slack scenario is pretty straightforward, we need to know which channel to post into, and design a message template.

def outbound_configuration_form(self, parameters: OutboundSyncConfigurationParameters) -> OutboundSyncConfigurationForm:
    return OutboundSyncConfigurationForm(
        fields=[
            FormDropdownField(
                name="channel",
                label="Channel to post in",
                help_text="If the channel is not visible out, you need to invite the bot user to the channel",
                required=True,
                data_source=DynamicFormOptionsDataSource(source_function=self._fetch_channel_list),
            )
        ],
        mapper=FormJinjaTemplate(label="Message Template"),
    )

First, we've defined a single form field to ask the user which channel to post the messages to, and we've used the FormJinjaTemplate mapper so that the user can design the message.

Here's what the form looks like to the user:

How was the list of channels fetched from the user's own Slack org? Notice our FormDropdownField had a DynamicFormOptionsDataSource pointing to the _fetch_channel_list function. This means that when it's time to render that dropdown, we'll call out to the plugin class to provide the options.

def _fetch_channel_list(self, parameters: OutboundSyncConfigurationParameters) -> List[FormOption]:
    slack_client = self._get_webclient(parameters)
    response = slack_client.conversations_list()
    self._raise_error_if_not_ok(response)
    fields = []
    for channel in response.data["channels"]:
        if channel["is_member"]:
            fields.append(
                FormOption(
                    value=channel["name"], label="#" + channel["name"], data_type_icon="text", metadata=channel
                )
            )
    return sorted(fields, key=lambda d: d["label"])

The function calls the conversations.list API method to fetch the channels. For each channel in the response, we shape it into a FormOption so that the Omnata UI knows how to display it.

The user can choose their own message template and interpolate the Snowflake column values. using Jinja functions.

Sync the Records

Now for the exciting part!

Routinely, according to the schedule the user chooses, the sync_outbound method on our plugin class will be called. It's the job of that method to load the records into the app, and report back on the outcome.

First, it's important to know that before this function is invoked, the plugin runtime has already modifed python's http package to enforce the rate limits which have been set. This means that you don't need to worry about waiting in-between HTTP requests in your code.

In our function, we have:

def sync_outbound(self, parameters: OutboundSyncConfigurationParameters, outbound_sync_request: OutboundSyncRequest):
    records = outbound_sync_request.get_records(batched=True)
    channel = parameters.get_sync_parameter("channel").value
    webclient = self._get_webclient(parameters)
    self.record_upload(data_frame=records,channel=channel,webclient=webclient)

Here we fetch the records into a batched dataframe, which is passed into the record_upload function along with the channel name and the Slack webclient.

Let's take a look at that function:

@managed_outbound_processing(concurrency=2, batch_size=1)
def record_upload(self, data_frame: pandas.DataFrame, channel: str, webclient: WebClient) -> pandas.DataFrame:
    load_records_frame = None
    logger.info(f"record_upload given {len(data_frame)} records")
    for index, row in data_frame.iterrows():
        transformed_record = row["TRANSFORMED_RECORD"]
        response = webclient.chat_postMessage(channel=f"#{channel}", text=transformed_record)
        self._raise_error_if_not_ok(response)
        response_df = pandas.DataFrame(
            [
                {
                    "IDENTIFIER": row["IDENTIFIER"],
                    "APP_IDENTIFIER": None,
                    "SUCCESS": response.data["ok"],
                    "RESULT": response.data,
                }
            ]
        )
        load_records_frame = pandas.concat([load_records_frame, response_df])
    return load_records_frame

The first thing to notice is that the method has the @managed_outbound_processing decorator. This decorator is provided by the Omnata Plugin Runtime, and takes care of invoking the function in parallel (if needed), with the appropriate batch size.

In our case, we've set a concurrency of 2 so that up to two requests can hit Slack concurrently (still subject to the rate limits), and that the method is invoked with a single record (batch_size 1) at a time in the dataframe. This might seem a bit pointless here, but it's convenient for APIs that accept multiple records at a time (e.g. batches of 10), it means you don't need to split it yourself.

The advantage of working in batches is that it's memory efficient. The plugin runtime will request more data from Snowflake only when needed, so you can process very large payloads with only an XS warehouse.

Inside the method, we simply post the the 'TRANSFORMED_RECORD' value into the channel - the plugin runtime will have automatically rendered the jinja template.

The sync_outbound method is responsible for returning a dataframe containing:

  • IDENTIFIER - the identifier provided in the source dataframe

  • APP_IDENTIFIER - the identifier from the remote system. A value is only necessary if you plan to update/delete using this

  • SUCCESS - a boolean flag to indicate success or failure. In our Slack plugin, we just set it to the value returned in the 'ok' field

  • RESULT - a JSON object containing further details if necessary, including an error messages

Normally you can use the enqueue_results function on the outbound_sync_request parameter to send these results back as you process them. Because we're doing our work inside the @managed_outbound_processing-annotated method, we can just pass back a response for the records we're given on each iteration.

One last thing

You might have noticed the lookup_slack_user_by_email method with the jinja_filter decorator. That annotation provides the user with a custom jinja filter to use in their template.

@jinja_filter
def lookup_slack_user_by_email(self, value):
    if value in self.slack_user_email_cache:
        return self.slack_user_email_cache[value]
    logger.info(f"looking up user by email {value}")
    # we need to access the configuration parameters from within a jinja filter, we can use the _configuration_parameters attribute
    response = self._get_webclient(self._configuration_parameters).users_lookupByEmail(email=value)
    self._raise_error_if_not_ok(response)
    user_id = response["user"]["id"]
    logger.info(f"storing ID {user_id} in cache, for email {value}")
    self.slack_user_email_cache[value] = user_id
    return user_id

In this case, you can convert an email address into a user ID and tag them in a message like so:

<@{{ 'my_email@domain.com' | lookup_slack_user_by_email }}>

or

<@{{ row['EMAIL_ADDRESS'] | lookup_slack_user_by_email }}>

The jinja template is rendered after the record actions have been determined. This means that if you're syncing updates to records that use a jinja template, changes to filter results won't trigger an update. Only changes to the source record will trigger an update.

Inbound Syncs

Let's take a look at fetching data from Slack. In this example, we'll just support fetching a list of the channel names.

Inbound syncs work differently to outbound syncs, and typically their configuration is a lot simpler. Instead of describing how source records relate to entities/events in the target app, you're just fetching data from the app in whatever form it's in, to be stored in local tables in Snowflake.

Configuration Form

In this case, we don't need any initial information from the user:

def inbound_configuration_form(self, parameters: InboundSyncConfigurationParameters) -> InboundSyncConfigurationForm:
    return InboundSyncConfigurationForm(fields=[])

Inbound Stream List

Every plugin which supports inbound syncs must also implement the inbound_stream_list function:

def inbound_stream_list(self, parameters: InboundSyncConfigurationParameters) -> List[StreamConfiguration]:
    streams_to_return: List[StreamConfiguration] = []
    streams_to_return.append(
        StreamConfiguration(
            stream_name="conversations_list",
            supported_sync_strategies=[InboundSyncStrategy.FULL_REFRESH],
            source_defined_cursor=True,
            default_cursor_field="updated",
            source_defined_primary_key="id",
            json_schema=CHANNEL_SCHEMA,
        )
    )
    return streams_to_return

This can be a dynamic list of things from the app, but in our case we're just defining "conversations_list" (Slack's internal term for Channels) as the only available stream.

This will be presented to the user like so:

Similarly to sync_outbound, there's a function named sync_inbound which is invoked whenever new records are to be read from the other app.

def sync_inbound(self, parameters: InboundSyncConfigurationParameters, inbound_sync_request: InboundSyncRequest):
    for stream in inbound_sync_request.streams:
        if stream.stream.stream_name == "conversations_list":
            response = self._get_webclient(parameters).conversations_list()
            self._raise_error_if_not_ok(response)
            # 'channels' contains a list of channels, these are the records we want to return
            inbound_sync_request.enqueue_results(stream.stream.stream_name, response["channels"], {})

Here we're just using the Slack webclient to fetch the list, and returning the channels property in the JSON response.

There is a managed_inbound_processing decorator if we wanted to be fetching streams in parallel, but we don't need it here.

Next Steps

Now that you've seen how a complete plugin can work, be sure to check out our tutorial on the Omnata Plugin Devkit. It'll show you how to upload a plugin, interactively develop in a notebook, and automate creation of test cases.

Last updated