Example Plugin: Zoho CRM

In this example, we're going to walk through a plugin that loads records into Zoho CRM.

Outbound, it illustrates a record-style scenario, where the aim is to keep records in an external SaaS product in sync with a Snowflake table/view.

Similarly, inbound we are keeping warehouse tables in sync with changes being made to CRM records.

Zoho has a complex API Limits system, so we also get a chance to do a more advanced configuration of these in the plugin.

The full source code for the plugin is here, but I'm going to step through each section and explain what it does.

Please note, this example is illustrative only and uses a minimal amount of code. A production plugin would typically include a lot more error condition checking and logging statements.

Structure

After scrolling past the import statements, the first thing to notice is that the plugin is a single file containing a Python class that inherits from OmnataPlugin.

class ZohoCrmPlugin(OmnataPlugin):
    """
    A plugin for the Zoho CRM product.
    Uses the bulk write API to load records into Zoho modules.
    """

Manifest

Then we override the get_manifest method and return information about the plugin.

def get_manifest(self) -> PluginManifest:
    return PluginManifest(
        plugin_id="zoho_crm",
        plugin_name="Zoho CRM",
        developer_id="omnata",
        developer_name="Omnata",
        docs_url="https://docs.omnata.com",
        supports_inbound=True,
        supported_outbound_strategies=[CreateSyncStrategy(), UpdateSyncStrategy(), UpsertSyncStrategy()],
        )
```

The label determines what the app is named in the Omnata Sync UI:

We support the Create, Update, and Upsert strategies because those are the record import operations that Zoho's bulk write API supports. Since we capture the Zoho record ID during insert and update, it would also be possible to extend the plugin to support deletes via the 'mirror' strategy, but that's beyond the scope of this example.

We only support the "Send" outbound strategy because most people live in the moment in Slack. We could build a plugin that updates and deletes previous Slack messages after the original Snowflake record is updated or deleted, but that's probably overkill, right?

Connecting to the app

When a user selects an Plugin, the first thing we ask is that they connect to their instance of that application. So the connection_form method is called so that we can render a form to display to the user.

In our case, the user will create a Server Based Application in the Zoho API console and provide the Client ID and Client Secret.

Here's the full function:

def connection_form(self) -> List[ConnectionMethod]:
    return [
        ConnectionMethod(
            name="OAuth",
            fields=[
                FormDropdownField(name='data_center_domain',label='Data Center',
                required=True,
                data_source=StaticFormOptionsDataSource(values=[
                    FormOption(value='www.zohoapis.com',label='USA',default=True),
                    FormOption(value='www.zohoapis.com.au',label='Australia'),
                    FormOption(value='www.zohoapis.eu',label='Europe'),
                    FormOption(value='www.zohoapis.in',label='India'),
                    FormOption(value='www.zohoapis.com.cn',label='China'),
                    FormOption(value='www.zohoapis.jp',label='Japan')
                ])),
                FormDropdownField(name='product_edition',label='Product Edition',
                help_text="Used to calculate rate limit quota",
                required=True,
                data_source=StaticFormOptionsDataSource(values=[
                    FormOption(value='free',label='Free Edition',default=True),
                    FormOption(value='standard_starter',label='Standard/Starter Edition'),
                    FormOption(value='professional',label='Professional'),
                    FormOption(value='enterprise',label='Enterprise/Zoho One'),
                    FormOption(value='ultimate',label='Ultimate/CRM Plus'),
                ])),
            FormInputField(name='user_count',label='Purchased Users Count',default_value="100",help_text="Used to calculate rate limit quota",required=True),
            FormInputField(name='addon_credits',label='Addon API credits purchased',default_value="0",help_text="Used to calculate rate limit quota",required=True)
            ],
            oauth_template=SecurityIntegrationTemplate(
                oauth_client_id="<Client ID>",
                oauth_client_secret="<Client Secret>",
                oauth_token_endpoint="https://<Account domain (accounts.zoho.x)>/oauth/v2/token",
                oauth_authorization_endpoint="https://<Account domain (accounts.zoho.x)>/oauth/v2/auth",
                oauth_allowed_scopes=["ZohoCRM.bulk.ALL",
                                      "ZohoCRM.settings.modules.READ",
                                      "ZohoCRM.settings.fields.READ",
                                      "ZohoCRM.org.READ",
                                      "ZohoFiles.files.ALL",
                                      "ZohoCRM.modules.ALL"],
            ),
        )
    ]

There's quite a lot going on here, mainly because we have to gather enough information to determine the rate limits, and also which data centre we're hosting (this determines the URLs we hit).

Notice too that our form was wrapped in a ConnectionMethod and we return a list of these. If the app has multiple ways of connecting (e.g. API keys, credentials, SSH keys), we can provide a list of options to the user.

After all connection parameters have been gathered, and any OAuth flows completed, the plugin will be asked which network addresses need to be permitted:

def network_addresses(self, parameters: ConnectionConfigurationParameters) -> List[str]:
    data_center_domain=parameters.get_connection_parameter('data_center_domain').value
    return [data_center_domain,
        data_center_domain.replace('www','content'),
        data_center_domain.replace('www','download').replace('.zohoapis.','.zoho.')]

Notice that the network address varies based on which data center the user is in. Of course we could have just added all of the possible URLs, but it's a good practise to limit what's permitted.

The user will be shown these (just for their own peace of mind), and then a connection test will be performed by invoking the connect function:

def connect(self, parameters: ConnectionConfigurationParameters) -> ConnectResponse:
    (base_url,headers) = self.get_auth_details(parameters)
    response = requests.get(f"{base_url}/crm/v5/org",headers=headers)
    if response.status_code != 200:
        raise ValueError(f"Error connecting to Zoho CRM: {response.text}")
    return ConnectResponse(connection_parameters={
        "org_id": response.json()['org'][0]['zgid'],
    })

def get_auth_details(self, parameters: ConnectionConfigurationParameters) -> Tuple[str,Dict]:
    data_center_domain=parameters.get_connection_parameter('data_center_domain').value
    access_token=parameters.get_connection_secret('access_token').value
    return (f"https://{data_center_domain}",{
        "Authorization": f"Zoho-oauthtoken {access_token}"
    })

here, we're querying organisation information as a connection test. And because the 'zgid' is used by some other API requests, we stored it against the connection so that we don't have to keep fetching it each sync run.

Note that "access_token" is a special parameter created by the OAuth connection process (in other words, you access it like any other Omnata connection form value, instead of using the Snowflake secrets SDK.

Next, we see the API limits defined:

def api_limits(self, parameters: SyncConfigurationParameters) -> List[ApiLimits]:
    product_edition=parameters.get_connection_parameter('product_edition').value
    user_count=int(parameters.get_connection_parameter('user_count').value)
    addon_credits=int(parameters.get_connection_parameter('addon_credits').value)
    api_credits = self.api_credits(product_edition,user_count,addon_credits)
    concurrency_limit = self.concurrency_limit(product_edition)
    return [
        ApiLimits(
            endpoint_category="Bulk Write Initialize",
            request_matchers=[HttpRequestMatcher(http_methods=["POST"],url_regex="/crm/bulk/v\d/write")],
            request_rates=[RequestRateLimit(request_count=api_credits / 500, time_unit="day", unit_count=1)],
        ),
        ApiLimits(
            endpoint_category="Bulk Read Initialize",
            request_matchers=[HttpRequestMatcher(http_methods=["POST"],url_regex="/crm/bulk/v\d/read")],
            request_rates=[RequestRateLimit(request_count=api_credits / 50, time_unit="day", unit_count=1)],
        )
        ,
        ApiLimits(
            endpoint_category="Record Insert/Update/Read",
            request_matchers=[HttpRequestMatcher(http_methods=["GET","POST","PUT"],url_regex="/crm/v\d/\w+$")],
            # this is approximate, the limit is actually 1 credit per 10 records, with 100 records max per request
            request_rates=[RequestRateLimit(request_count=api_credits / 10, time_unit="day", unit_count=1)],
        )
    ]

Here we are calculating the rate limits based on which Zoho licenses the account has, for the different endpoints we use (the api_credits function contains this logic)

These rate limits will appear like this to the user, and they can be overridden:

More information on rate limit can be found at Rate Limiting.

Outbound Syncs

Configuration Form

Now that the connection to the app has been established and we know our source table, we can render the form that configures outbound syncs.

The Slack scenario is pretty straightforward, we need to know which channel to post into, and design a message template.

def outbound_configuration_form(self, parameters: OutboundSyncConfigurationParameters) -> OutboundSyncConfigurationForm:
    fields_list = [
        FormDropdownField(name='module',label='Module',
            data_source=DynamicFormOptionsDataSource(
                source_function=self.fetch_module_form_options))
    ]
    update_or_upsert = parameters.sync_strategy in [UpdateSyncStrategy(),UpsertSyncStrategy()]
    if update_or_upsert:
        fields_list.append(FormDropdownField(name='find_by',label='Unique Field',help_text="Matches existing records when updating or upserting",
            depends_on='module',
            data_source=DynamicFormOptionsDataSource(
                source_function=self.fetch_module_fields_form_options)))
    
    return OutboundSyncConfigurationForm(
        fields=fields_list,
        mapper=FormFieldMappingSelector(depends_on='find_by' if update_or_upsert else 'module',
            data_source=DynamicFormOptionsDataSource(
                source_function=self.fetch_module_fields_form_options)))

With the resulting form, we'll first ask which Zoho module the user wants to load records into. These are populated into a dropdown field via the fetch_module_form_options function.

Additionally, if Upsert or Update is used as a strategy, the user must choose a unique field to match existing records on. Note that because it depends on the module field, it won't be rendered until after that first selection is made, so the fetch_module_fields_form_option function will be provided with that module value as a parameter.

Finally, we use a FormFieldMappingSelector to present the user with a mapping of columns to fields. The list of fields to map onto is sourced via the same fetch_module_fields_form_option function from the find_by dropdown.

Here's what the whole form looks like to the user, if they choose the Upsert strategy:

Sync the Records

Now for the exciting part!

Routinely, according to the schedule the user chooses, the sync_outbound method on our plugin class will be called. It's the job of that method to load the records into the app, and report back on the outcome.

First, it's important to know that before this function is invoked, the plugin runtime has already modifed python's http package to enforce the rate limits which have been set. This means that you don't need to worry about waiting in-between HTTP requests in your code.

In our function, we have:

def sync_outbound(self, parameters: OutboundSyncConfigurationParameters, outbound_sync_request: OutboundSyncRequest):
    (base_url, headers) = self.get_auth_details(parameters)
    # bulk APIs require the org id, this was stored during initial connection
    org_id = parameters.get_connection_parameter('org_id').value
    # determine which module we're writing data for
    module_name = parameters.get_sync_parameter('module').value
    # find_by will have been provided if the sync strategy is update/upsert
    find_by:str = None
    if parameters.sync_parameter_exists('find_by'):
        find_by = parameters.get_sync_parameter('find_by').value
    
    # our plugin only supports insert, update and upsert so we should only ever be given Create and Update actions
    records = outbound_sync_request.get_records(batched=True,sync_actions=[CreateSyncAction(),UpdateSyncAction()])
    self.record_upload(data_frame=records,base_url=base_url,headers=headers,org_id=org_id,
                       module_name=module_name,sync_strategy=parameters.sync_strategy,find_by=find_by)

Here we fetch the records into a batched dataframe, which is passed into the record_upload function along with the channel name and the Slack webclient.

The record_upload function is quite long, so I won't show the whole thing here, but we can discuss it.

The first thing to notice is that the method has the @managed_outbound_processing decorator. This decorator is provided by the Omnata Plugin Runtime, and takes care of invoking the function in parallel (if needed), with the appropriate batch size.

In our case, we've set a concurrency of 5 so that up to five requests can hit Zoho's APIs concurrently (still subject to the rate limits), and that the method is invoked with up to 25,000 records at a time in the dataframe, which is the maximum that the Zoho bulk APIs will accept.

The advantage here is twofold:

  • We don't have to split up the data ourselves

  • Working in batches batches is memory efficient. The plugin runtime will request more data from Snowflake only when needed, so you can process very large payloads with only an XS warehouse.

Inside the method, we:

  1. Convert the dataframe's 'TRANSFORMED_RECORD' json column to CSV and zip it up

  2. Upload the zip file to the Zoho content API

  3. Create a Bulk Write job, pointing to the uploaded file

  4. Wait until the job completes

  5. Download the import report (a CSV file inside a zip file), parse it into a dataframe, and match the results back onto the original dataframe

The sync_outbound method is responsible for returning a dataframe containing:

  • IDENTIFIER - the identifier provided in the source dataframe

  • APP_IDENTIFIER - the identifier from the remote system. A value is only necessary if you plan to update/delete using this

  • SUCCESS - a boolean flag to indicate success or failure. In our Zoho plugin, we look for a STATUS csv column value of ADDED or UPDATED to indicate success.

  • RESULT - a JSON object containing further details if necessary, including an error messages

Normally you can use the enqueue_results function on the outbound_sync_request parameter to send these results back as you process them. Because we're doing our work inside the @managed_outbound_processing-annotated method, we can just pass back a response for the records we're given on each iteration.

Inbound Syncs

Let's take a look at fetching data from Slack. In this example, we'll just support fetching a list of the channel names.

Inbound syncs work differently to outbound syncs, and typically their configuration is a lot simpler. Instead of describing how source records relate to entities/events in the target app, you're just fetching data from the app in whatever form it's in, to be stored in local tables in Snowflake.

Configuration Form

In this case, we don't need any initial information from the user:

def inbound_configuration_form(self, parameters: InboundSyncConfigurationParameters) -> InboundSyncConfigurationForm:
    return InboundSyncConfigurationForm(fields=[])

Inbound Stream List

Every plugin which supports inbound syncs must also implement the inbound_stream_list function:

def inbound_stream_list(self, parameters: InboundSyncConfigurationParameters) -> List[StreamConfiguration]:
    modules:List[Dict] = self.fetch_modules_from_zoho(parameters)
    return [
        StreamConfiguration(
            stream_name=m['module_name'],
            supported_sync_strategies=[InboundSyncStrategy.FULL_REFRESH],
            source_defined_cursor=True,
            default_cursor_field="Modified_Time",
            source_defined_primary_key="Id",
            json_schema=self.json_schema_for_module(parameters,m['module_name']) \
                if parameters.currently_selected_streams is not None and m['module_name'] in parameters.currently_selected_streams else None,
        ) for m in modules
    ]

Here we fetch a list of modules from the Zoho API, and add them all to the list of streams.

This will be presented to the user like so:

You might have noticed this section:

json_schema=self.json_schema_for_module(parameters,m['module_name']) \
                    if parameters.currently_selected_streams is not None and m['module_name'] in parameters.currently_selected_streams else None

When the inbound_stream_list function is invoked, initially the currently_selected_streams list is None, which means we just want a full list of all possible streams. When this happens, it's OK to return a json_schema of None, as it could be very slow and expensive to ask the system for every possible field for every single object.

However, subsequently (after the user has made stream selections), that list may contain the names of streams, and in that case you should return their schema.

Whenever a json_schema is provided, it allows the Omnata engine to generate views over the raw JSON and extract the values, which is a nicer experience for the user.

Similarly to sync_outbound, there's a function named sync_inbound which is invoked whenever new records are to be read from the other app.

def sync_inbound(self, parameters: InboundSyncConfigurationParameters, inbound_sync_request: InboundSyncRequest):
    self.fetch_records(inbound_sync_request.streams,
                        parameters=parameters,
                        inbound_sync_request=inbound_sync_request)

Here we're just passing the list of streams over to the fetch_records function, which is decorated with managed_inbound_processing.

This method will be invoked in parallel (up to 5 at a time), each invocation passing in a single stream to fetch.

The process uses the bulk read API in a similar fashion to the outbound sync (CSVs inside zip files), however after the data is fetched, we use enqueue_results to provide the stream data back to the engine, along with the latest state information.

The state information (max_modified_time in our case) isn't really necessary, but we'd need it if we were doing incremental loads.

Next Steps

Now that you've seen how a complete plugin can work, be sure to check out our tutorial on the Omnata Plugin Devkit. It'll show you how to upload a plugin, interactively develop in a notebook, and automate creation of test cases.

Last updated