# Example Plugin: Zoho CRM

In this example, we're going to walk through a plugin that loads records into Zoho CRM.

Outbound, it illustrates a record-style scenario, where the aim is to keep records in an external SaaS product in sync with a Snowflake table/view.

Similarly, inbound we are keeping warehouse tables in sync with changes being made to CRM records.

Zoho has a complex [API Limits](https://www.zoho.com/crm/developer/docs/api/v5/api-limits.html) system, so we also get a chance to do a more advanced configuration of these in the plugin.

The full source code for the plugin is [here](https://github.com/omnata-labs/omnata-plugin-examples/blob/main/zoho_crm/src/plugin.py), but I'm going to step through each section and explain what it does.

Please note, this example is illustrative only and uses a minimal amount of code. A production plugin would typically include a lot more error condition checking and logging statements.

### Structure

After scrolling past the import statements, the first thing to notice is that the plugin is a single file containing a Python class that inherits from OmnataPlugin.

{% code overflow="wrap" lineNumbers="true" fullWidth="true" %}

```python
class ZohoCrmPlugin(OmnataPlugin):
    """
    A plugin for the Zoho CRM product.
    Uses the bulk write API to load records into Zoho modules.
    """
```

{% endcode %}

### Manifest

Then we override the `get_manifest` method and return information about the plugin.

{% code overflow="wrap" lineNumbers="true" fullWidth="true" %}

````python
def get_manifest(self) -> PluginManifest:
    return PluginManifest(
        plugin_id="zoho_crm",
        plugin_name="Zoho CRM",
        developer_id="omnata",
        developer_name="Omnata",
        docs_url="https://docs.omnata.com",
        supports_inbound=True,
        supported_outbound_strategies=[CreateSyncStrategy(), UpdateSyncStrategy(), UpsertSyncStrategy()],
        )
```
````

{% endcode %}

The label determines what the app is named in the Omnata Sync UI:

<figure><img src="https://2119005510-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FedNbhp7XNeTdK7we4Ka5%2Fuploads%2Fgit-blob-cb6d5ae94dd532270f8190e351361b77c452b7cd%2FScreenshot%20from%202023-10-27%2014-05-46.png?alt=media" alt=""><figcaption></figcaption></figure>

We support the Create, Update, and Upsert strategies because those are the record import operations that Zoho's [bulk write API](https://www.zoho.com/crm/developer/docs/api/v3/bulk-write/create-job.html) supports. Since we capture the Zoho record ID during insert and update, it would also be possible to extend the plugin to support deletes via the 'mirror' strategy, but that's beyond the scope of this example.

We only support the "Send" outbound strategy because most people live in the moment in Slack. We could build a plugin that updates and deletes previous Slack messages after the original Snowflake record is updated or deleted, but that's probably overkill, right?

## Connecting to the app

When a user selects an Plugin, the first thing we ask is that they connect to their instance of that application. So the `connection_form` method is called so that we can render a form to display to the user.

In our case, the user will create a Server Based Application in the Zoho API console and provide the Client ID and Client Secret.

Here's the full function:

{% code fullWidth="true" %}

```python
def connection_form(self) -> List[ConnectionMethod]:
    return [
        ConnectionMethod(
            name="OAuth",
            fields=[
                FormDropdownField(name='data_center_domain',label='Data Center',
                required=True,
                data_source=StaticFormOptionsDataSource(values=[
                    FormOption(value='www.zohoapis.com',label='USA',default=True),
                    FormOption(value='www.zohoapis.com.au',label='Australia'),
                    FormOption(value='www.zohoapis.eu',label='Europe'),
                    FormOption(value='www.zohoapis.in',label='India'),
                    FormOption(value='www.zohoapis.com.cn',label='China'),
                    FormOption(value='www.zohoapis.jp',label='Japan')
                ])),
                FormDropdownField(name='product_edition',label='Product Edition',
                help_text="Used to calculate rate limit quota",
                required=True,
                data_source=StaticFormOptionsDataSource(values=[
                    FormOption(value='free',label='Free Edition',default=True),
                    FormOption(value='standard_starter',label='Standard/Starter Edition'),
                    FormOption(value='professional',label='Professional'),
                    FormOption(value='enterprise',label='Enterprise/Zoho One'),
                    FormOption(value='ultimate',label='Ultimate/CRM Plus'),
                ])),
            FormInputField(name='user_count',label='Purchased Users Count',default_value="100",help_text="Used to calculate rate limit quota",required=True),
            FormInputField(name='addon_credits',label='Addon API credits purchased',default_value="0",help_text="Used to calculate rate limit quota",required=True)
            ],
            oauth_template=SecurityIntegrationTemplate(
                oauth_client_id="<Client ID>",
                oauth_client_secret="<Client Secret>",
                oauth_token_endpoint="https://<Account domain (accounts.zoho.x)>/oauth/v2/token",
                oauth_authorization_endpoint="https://<Account domain (accounts.zoho.x)>/oauth/v2/auth",
                oauth_allowed_scopes=["ZohoCRM.bulk.ALL",
                                      "ZohoCRM.settings.modules.READ",
                                      "ZohoCRM.settings.fields.READ",
                                      "ZohoCRM.org.READ",
                                      "ZohoFiles.files.ALL",
                                      "ZohoCRM.modules.ALL"],
            ),
        )
    ]
```

{% endcode %}

There's quite a lot going on here, mainly because we have to gather enough information to determine the rate limits, and also which data centre we're hosting (this determines the URLs we hit).

Notice too that our form was wrapped in a ConnectionMethod and we return a list of these. If the app has multiple ways of connecting (e.g. API keys, credentials, SSH keys), we can provide a list of options to the user.

After all connection parameters have been gathered, and any OAuth flows completed, the plugin will be asked which network addresses need to be permitted:

{% code fullWidth="true" %}

```python
def network_addresses(self, parameters: ConnectionConfigurationParameters) -> List[str]:
    data_center_domain=parameters.get_connection_parameter('data_center_domain').value
    return [data_center_domain,
        data_center_domain.replace('www','content'),
        data_center_domain.replace('www','download').replace('.zohoapis.','.zoho.')]
```

{% endcode %}

Notice that the network address varies based on which data center the user is in. Of course we could have just added all of the possible URLs, but it's a good practise to limit what's permitted.

The user will be shown these (just for their own peace of mind), and then a connection test will be performed by invoking the `connect` function:

{% code fullWidth="true" %}

```python
def connect(self, parameters: ConnectionConfigurationParameters) -> ConnectResponse:
    (base_url,headers) = self.get_auth_details(parameters)
    response = requests.get(f"{base_url}/crm/v5/org",headers=headers)
    if response.status_code != 200:
        raise ValueError(f"Error connecting to Zoho CRM: {response.text}")
    return ConnectResponse(connection_parameters={
        "org_id": response.json()['org'][0]['zgid'],
    })

def get_auth_details(self, parameters: ConnectionConfigurationParameters) -> Tuple[str,Dict]:
    data_center_domain=parameters.get_connection_parameter('data_center_domain').value
    access_token=parameters.get_connection_secret('access_token').value
    return (f"https://{data_center_domain}",{
        "Authorization": f"Zoho-oauthtoken {access_token}"
    })
```

{% endcode %}

here, we're querying organisation information as a connection test. And because the 'zgid' is used by some other API requests, we stored it against the connection so that we don't have to keep fetching it each sync run.

Note that "access\_token" is a special parameter created by the OAuth connection process (in other words, you access it like any other Omnata connection form value, instead of using the Snowflake secrets SDK.

Next, we see the API limits defined:

{% code fullWidth="true" %}

```python
def api_limits(self, parameters: SyncConfigurationParameters) -> List[ApiLimits]:
    product_edition=parameters.get_connection_parameter('product_edition').value
    user_count=int(parameters.get_connection_parameter('user_count').value)
    addon_credits=int(parameters.get_connection_parameter('addon_credits').value)
    api_credits = self.api_credits(product_edition,user_count,addon_credits)
    concurrency_limit = self.concurrency_limit(product_edition)
    return [
        ApiLimits(
            endpoint_category="Bulk Write Initialize",
            request_matchers=[HttpRequestMatcher(http_methods=["POST"],url_regex="/crm/bulk/v\d/write")],
            request_rates=[RequestRateLimit(request_count=api_credits / 500, time_unit="day", unit_count=1)],
        ),
        ApiLimits(
            endpoint_category="Bulk Read Initialize",
            request_matchers=[HttpRequestMatcher(http_methods=["POST"],url_regex="/crm/bulk/v\d/read")],
            request_rates=[RequestRateLimit(request_count=api_credits / 50, time_unit="day", unit_count=1)],
        )
        ,
        ApiLimits(
            endpoint_category="Record Insert/Update/Read",
            request_matchers=[HttpRequestMatcher(http_methods=["GET","POST","PUT"],url_regex="/crm/v\d/\w+$")],
            # this is approximate, the limit is actually 1 credit per 10 records, with 100 records max per request
            request_rates=[RequestRateLimit(request_count=api_credits / 10, time_unit="day", unit_count=1)],
        )
    ]

```

{% endcode %}

Here we are calculating the rate limits based on which Zoho licenses the account has, for the different endpoints we use (the `api_credits` function contains this logic)

These rate limits will appear like this to the user, and they can be overridden:

<div data-full-width="true"><figure><img src="https://2119005510-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FedNbhp7XNeTdK7we4Ka5%2Fuploads%2Fgit-blob-a7e60a94374ab1f6c953737874f503e5d8f96f16%2Fimage.png?alt=media" alt=""><figcaption></figcaption></figure></div>

More information on rate limit can be found at [rate-limiting](https://docs.omnata.com/omnata-product-documentation/omnata-sync-for-snowflake/how-it-works/rate-limiting "mention").

## Outbound Syncs

### Configuration Form

Now that the connection to the app has been established and we know our source table, we can render the form that configures outbound syncs.

The Slack scenario is pretty straightforward, we need to know which channel to post into, and design a message template.

{% code lineNumbers="true" fullWidth="true" %}

```python
def outbound_configuration_form(self, parameters: OutboundSyncConfigurationParameters) -> OutboundSyncConfigurationForm:
    fields_list = [
        FormDropdownField(name='module',label='Module',
            data_source=DynamicFormOptionsDataSource(
                source_function=self.fetch_module_form_options))
    ]
    update_or_upsert = parameters.sync_strategy in [UpdateSyncStrategy(),UpsertSyncStrategy()]
    if update_or_upsert:
        fields_list.append(FormDropdownField(name='find_by',label='Unique Field',help_text="Matches existing records when updating or upserting",
            depends_on='module',
            data_source=DynamicFormOptionsDataSource(
                source_function=self.fetch_module_fields_form_options)))
    
    return OutboundSyncConfigurationForm(
        fields=fields_list,
        mapper=FormFieldMappingSelector(depends_on='find_by' if update_or_upsert else 'module',
            data_source=DynamicFormOptionsDataSource(
                source_function=self.fetch_module_fields_form_options)))

```

{% endcode %}

With the resulting form, we'll first ask which Zoho module the user wants to load records into. These are populated into a dropdown field via the `fetch_module_form_options` function.

Additionally, if Upsert or Update is used as a strategy, the user must choose a unique field to match existing records on. Note that because it depends on the `module` field, it won't be rendered until after that first selection is made, so the `fetch_module_fields_form_option` function will be provided with that module value as a parameter.

Finally, we use a FormFieldMappingSelector to present the user with a mapping of columns to fields. The list of fields to map onto is sourced via the same `fetch_module_fields_form_option` function from the `find_by` dropdown.

Here's what the whole form looks like to the user, if they choose the Upsert strategy:

<div data-full-width="true"><figure><img src="https://2119005510-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FedNbhp7XNeTdK7we4Ka5%2Fuploads%2Fgit-blob-d6ee231f482a07083dc0e5f876c73d3a637ff27b%2Fimage.png?alt=media" alt=""><figcaption></figcaption></figure></div>

### Sync the Records

Now for the exciting part!

Routinely, according to the schedule the user chooses, the `sync_outbound` method on our plugin class will be called. It's the job of that method to load the records into the app, and report back on the outcome.

First, it's important to know that before this function is invoked, the plugin runtime has already modifed python's `http` package to enforce the rate limits which have been set. This means that you don't need to worry about waiting in-between HTTP requests in your code.

In our function, we have:

{% code lineNumbers="true" fullWidth="true" %}

```python
def sync_outbound(self, parameters: OutboundSyncConfigurationParameters, outbound_sync_request: OutboundSyncRequest):
    (base_url, headers) = self.get_auth_details(parameters)
    # bulk APIs require the org id, this was stored during initial connection
    org_id = parameters.get_connection_parameter('org_id').value
    # determine which module we're writing data for
    module_name = parameters.get_sync_parameter('module').value
    # find_by will have been provided if the sync strategy is update/upsert
    find_by:str = None
    if parameters.sync_parameter_exists('find_by'):
        find_by = parameters.get_sync_parameter('find_by').value
    
    # our plugin only supports insert, update and upsert so we should only ever be given Create and Update actions
    records = outbound_sync_request.get_records(batched=True,sync_actions=[CreateSyncAction(),UpdateSyncAction()])
    self.record_upload(data_frame=records,base_url=base_url,headers=headers,org_id=org_id,
                       module_name=module_name,sync_strategy=parameters.sync_strategy,find_by=find_by)

```

{% endcode %}

Here we fetch the records into a batched dataframe, which is passed into the `record_upload` function along with the channel name and the Slack webclient.

The `record_upload` function is quite long, so I won't show the whole thing here, but we can discuss it.

The first thing to notice is that the method has the `@managed_outbound_processing` decorator. This decorator is provided by the Omnata Plugin Runtime, and takes care of invoking the function in parallel (if needed), with the appropriate batch size.

In our case, we've set a `concurrency` of 5 so that up to five requests can hit Zoho's APIs concurrently (still subject to the rate limits), and that the method is invoked with up to 25,000 records at a time in the dataframe, which is the maximum that the Zoho bulk APIs will accept.

The advantage here is twofold:

* We don't have to split up the data ourselves
* Working in batches batches is memory efficient. The plugin runtime will request more data from Snowflake only when needed, so you can process very large payloads with only an XS warehouse.

Inside the method, we:

1. Convert the dataframe's 'TRANSFORMED\_RECORD' json column to CSV and zip it up
2. Upload the zip file to the Zoho content API
3. Create a Bulk Write job, pointing to the uploaded file
4. Wait until the job completes
5. Download the import report (a CSV file inside a zip file), parse it into a dataframe, and match the results back onto the original dataframe

The `sync_outbound` method is responsible for returning a dataframe containing:

* `IDENTIFIER` - the identifier provided in the source dataframe
* `APP_IDENTIFIER` - the identifier from the remote system. A value is only necessary if you plan to update/delete using this
* `SUCCESS` - a boolean flag to indicate success or failure. In our Zoho plugin, we look for a STATUS csv column value of ADDED or UPDATED to indicate success.
* `RESULT` - a JSON object containing further details if necessary, including an error messages

Normally you can use the `enqueue_results` function on the `outbound_sync_request` parameter to send these results back as you process them. Because we're doing our work inside the `@managed_outbound_processing`-annotated method, we can just pass back a response for the records we're given on each iteration.

## Inbound Syncs

Let's take a look at fetching data from Slack. In this example, we'll just support fetching a list of the channel names.

Inbound syncs work differently to outbound syncs, and typically their configuration is a lot simpler. Instead of describing how source records relate to entities/events in the target app, you're just fetching data from the app in whatever form it's in, to be stored in local tables in Snowflake.

### Configuration Form

In this case, we don't need any initial information from the user:

{% code fullWidth="true" %}

```python
def inbound_configuration_form(self, parameters: InboundSyncConfigurationParameters) -> InboundSyncConfigurationForm:
    return InboundSyncConfigurationForm(fields=[])

```

{% endcode %}

### Inbound Stream List

Every plugin which supports inbound syncs must also implement the `inbound_stream_list` function:

{% code fullWidth="true" %}

```python
def inbound_stream_list(self, parameters: InboundSyncConfigurationParameters) -> List[StreamConfiguration]:
    modules:List[Dict] = self.fetch_modules_from_zoho(parameters)
    return [
        StreamConfiguration(
            stream_name=m['module_name'],
            supported_sync_strategies=[InboundSyncStrategy.FULL_REFRESH],
            source_defined_cursor=True,
            default_cursor_field="Modified_Time",
            source_defined_primary_key="Id",
            json_schema=self.json_schema_for_module(parameters,m['module_name']) \
                if parameters.currently_selected_streams is not None and m['module_name'] in parameters.currently_selected_streams else None,
        ) for m in modules
    ]
```

{% endcode %}

Here we fetch a list of modules from the Zoho API, and add them all to the list of streams.

This will be presented to the user like so:

<div data-full-width="true"><figure><img src="https://2119005510-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FedNbhp7XNeTdK7we4Ka5%2Fuploads%2Fgit-blob-497506894e0011701dc81925b0fca74ac690cd18%2Fimage.png?alt=media" alt=""><figcaption></figcaption></figure></div>

You might have noticed this section:

{% code fullWidth="true" %}

```python
json_schema=self.json_schema_for_module(parameters,m['module_name']) \
                    if parameters.currently_selected_streams is not None and m['module_name'] in parameters.currently_selected_streams else None
```

{% endcode %}

When the `inbound_stream_list` function is invoked, initially the `currently_selected_streams` list is None, which means we just want a full list of all possible streams. When this happens, it's OK to return a `json_schema` of None, as it could be very slow and expensive to ask the system for every possible field for every single object.

However, subsequently (after the user has made stream selections), that list may contain the names of streams, and in that case you should return their schema.

Whenever a `json_schema` is provided, it allows the Omnata engine to generate views over the raw JSON and extract the values, which is a nicer experience for the user.

Similarly to `sync_outbound`, there's a function named `sync_inbound` which is invoked whenever new records are to be read from the other app.

{% code fullWidth="true" %}

```python
def sync_inbound(self, parameters: InboundSyncConfigurationParameters, inbound_sync_request: InboundSyncRequest):
    self.fetch_records(inbound_sync_request.streams,
                        parameters=parameters,
                        inbound_sync_request=inbound_sync_request)
```

{% endcode %}

Here we're just passing the list of streams over to the `fetch_records` function, which is decorated with `managed_inbound_processing`.

This method will be invoked in parallel (up to 5 at a time), each invocation passing in a single stream to fetch.

The process uses the bulk read API in a similar fashion to the outbound sync (CSVs inside zip files), however after the data is fetched, we use `enqueue_results` to provide the stream data back to the engine, along with the latest state information.

The state information (`max_modified_time` in our case) isn't really necessary, but we'd need it if we were doing incremental loads.

### Next Steps

Now that you've seen how a complete plugin can work, be sure to check out our tutorial on the Omnata Plugin Devkit. It'll show you how to upload a plugin, interactively develop in a notebook, and automate creation of test cases.
