DocsData Ingestion and Processing > ContextStreams > Creating ContextStreams

Creating ContextStreams

Create I/O Streams

Create required Input streams and output streams from the I/O Streams tab.

  1. On the ContextStreams page, navigate to the I/O Streams tab (the default tab). Click on the + button to create an I/O Stream.



  2. On clicking, you will be redirected to the following page.


  3. The following fields are to be provided.
    • Name – Enter a unique name of the I/O stream.
    • Description – Add a brief description of the I/O streams.
    • Partition – The Partition setting in I/O Streams determines the number of partitions for the I/O stream. The appropriate configuration depends on factors like data volume, system resources, and processing requirements. Higher partition numbers can enhance parallelism and increase throughput, but excessive partitions may lead to resource overhead. Users should consider the data characteristics and system capabilities to find the optimal balance for efficient I/O stream processing.
    • Retention Period – The time for which the I/O stream should hold the data available. Choose based on the requirements, considering factors like analysis frequency, compliance, and storage capacity.
    • Retention Unit – The unit (in Hours/Days) of the retention period.
    • Advanced Settings (optional) – The Advanced settings allow you to configure the I/O stream configurations using a JSON format. Some of the most commonly used settings are as follows:
      1. cleanup.policy:
        • Type: list, Default: delete, Valid Values: [compact, delete]
        • Controls the retention policy for log segments. ‘delete’ discards old segments based on time or size, while ‘compact’ enables log compaction, retaining the latest value for each key.
      2. compression.type:
        • Type: string, Default: producer, Valid Values: [uncompressed, zstd, lz4, snappy, gzip, producer]
        • Specifies the final compression type for a topic. ‘producer’ retains the original compression codec set by the producer.
      3. max.message.bytes:
        • Type: int, Default: 1048588, Valid Values: [0,…]
        • Sets the largest record batch size allowed by Kafka after compression. Influences fetch size for consumers older than 0.10.2.
      4. retention.bytes:
        • Type: long, Default: -1
        • Controls the maximum size a partition can grow to before discarding old log segments using the “delete” retention policy.
      5. retention.ms:
        • Type: long, Default: 604800000 (7 days)
        • Determines the maximum time a log is retained before discarding old segments using the “delete” retention policy.
  4. Click on the Save button to add the I/O streams.

To debug I/O Streams, please follow the instructions provided here. To manage the added I/O Streams, refer to the instructions here.

Create Data Pipeline

To initiate the creation of a Data Pipeline, follow these steps:

  1. On the ContextStreams page, navigate to the Pipelines tab. Click on the + button to begin creating a new Data Pipeline.
  2. Fill in the required details:
    • Name: Provide a meaningful unique name for the pipeline.
    • Description: Enter a brief description outlining the expected processing.
    • Specify pipeline deployment details:
      1. Instances: Specify the number of instances of the pipeline to be deployed. Decide based on workload distribution, resource utilization, and future scalability needs.
      2. Threads per instance: Define the number of worker threads per instance. Consider concurrency, resource utilization, and task complexity for efficient parallel processing.

        💡Note: The default values for instance and threads per instance are both set to 1. It’s crucial to examine data lags. Compare the rate of incoming data in the input stream with the pipeline processing rate. If these numbers do not align, consider adjusting the thread count. It is recommended not to go beyond 6 threads per instance.
    • Specify Resource Constraints: Resource constraints control the memory and CPU allocation to each pipeline instance, defining both minimum and maximum limits to ensure efficient resource utilization and prevent overallocation or underallocation.
      1. Minimum Memory (Mi): The minimum amount of memory allocated to a single pipeline instance. Default is 128Mi.
      2. Maximum Memory (Mi): The maximum amount of memory allocated to a single pipeline instance. Default is 500Mi.
      3. Minimum Cores (m): The minimum amount of CPU allocated to a single pipeline instance. Default is 250m.
      4. Maximum Cores (m): The maximum amount of CPU allocated to a single pipeline instance. Default is 1000m.
    • Advanced Settings (optional): Include a JSON configuration with advanced pipeline properties if needed. For more details on advanced settings related to the data pipeline configuration, refer here.
  3. Click on the Save button to add the pipeline.



Upon clicking “Save,” you will be directed to the Pipeline Editor page, where you can further configure and customize the created pipeline.

Working with Pipeline Editor

After creating the data pipeline, the Pipeline Editor page will provide a visual representation of the pipeline with multiple blocks:

  • Input Streams: The first column of blocks is designated for adding Input Streams to the pipeline.
  • Output Streams: The last column of blocks is for adding Output Streams to the pipeline.
  • Intermediate Blocks: These blocks are used for adding processing blocks in between. To add more rows/columns, click on the “+” sign.

To configure the pipeline:

  1. Adding Input Streams: Click on the first column to select the desired input stream for the data pipeline.



  2. Adding Output Streams: Click on the last column to add the output streams.



  3. Adding Processing Blocks: Click on the intermediate blocks, provide block names, and click Save to add the blocks.



    💡Note: In the Pipeline Editor, the first and last columns representing I/O streams are highlighted in green, while intermediate processing blocks are distinguished by a pink color. This color scheme helps visually identify different block types during pipeline configuration.

  4. Block Editor: Click on any block to navigate to the Block Editor. Here, you will be able to apply the required plugins and filter expressions to contextualize the data based on your specific requirements. Refer to the Block Editor documentation in the subsequent section for comprehensive guidance on working with the Block Editor and customizing your data processing blocks.

Working with Block Editor

  1. Enter the Block Editor from the Pipeline Editor, by clicking on the particular block. This shall open a page like the following:


  2. On the Block Editor page, locate the Add Plugin button and click on it. A pop-up will appear, displaying all available plugins.



  3. Browse through the plugins and select the one that suits your processing requirements.
  4. After adding a plugin, use the arrow button to expand the plugin details.



  5. Expand the plugin field, where you can provide a description and add the settings (Plugin specific Configuration). For detailed information on each plugin & plugin-specific settings/ configuration, refer to the plugin guide here.



  6. Add additional plugins within the same block to fulfill various processing needs.

    💡Note: Each block within the Block Editor can incorporate one or multiple plugins. You have the flexibility to add as many blocks as necessary, either to accommodate multiple processing paths or to group specific processing steps within a single block. This allows for a highly customizable and versatile approach to designing your data pipeline.

  7. Optionally, you can include a Filter Expression for this block to provide routing information.

    For instance, the following example illustrates a filter expression:
    event.has('leg') && event.get('leg').asText() == 'Request to IBMB for getting current balance' ||
    event.has('leg') && event.get('leg').asText() == 'Request to CBS for getting current balance' ||
    event.has('leg') && event.get('leg').asText() == 'Response from CBS for getting current balance' ||
    event.has('leg') && event.get('leg').asText() == 'Final response from IBMB for getting current balance' ||
    event.has('log_type') && event.get('log_type').asText() == 'error'


    The condition checks whether the ‘leg’ field has specific values related to the process of obtaining the current balance from IBMB and CBS. Additionally, it verifies if the ‘log_type’ is ‘error’.

    By using this expression in a filter, you can selectively include events in the ContextStream that match these conditions, providing a way to route and process data based on the defined logic.


  8. Click Save to return to the Pipeline Editor, completing the configuration within the Block Editor.

Connecting the Blocks in the Pipeline Editor

After configuring individual blocks and saving changes in the block editor, you’ll be directed to the Pipeline Editor page. Here, you can seamlessly add more blocks. In vuSmartMaps, connecting blocks for efficient data flow is straightforward. Each block has four points: a blue point on the left for data reception, and green points on the right, top, and bottom for sending data to the next or branching blocks. Clicking and connecting these points ensures a seamless data progression, allowing users to create linear or branching configurations effortlessly. To connect blocks, click on the connection point of one block and then on the desired connection point of another block, creating a link between them.

While a straightforward configuration involves connecting blocks in a sequence, you may encounter scenarios where multiple processing paths are required. In such cases, the filter at the block level determines the accepted data for subsequent blocks. You can connect a block to multiple next blocks, configuring the receiving blocks with filter expressions to process specific data.

For instance, consider the following pipeline designed to contextualize logs coming from a typical Internet and mobile banking application:

In this pipeline, the ‘requests’, ‘response’, and ‘error’ blocks are configured with filter expressions for routing information:

Requests Block Filter Expression:

event.has('log_type') && event.get('log_type').asText() == 'request'

Similarly, Response Block Filter Expression:

event.has('log_type') && event.get('log_type').asText() == 'response'

And, Error Block Filter Expression:

event.has('log_type') && event.get('log_type').asText() == 'error'

These expressions ensure that events are routed through the respective blocks based on the ‘log_type’ field, enabling precise processing of logs for requests, responses, and errors.

💡Note: The initial processing blocks at the first level cannot incorporate a filter expression. All incoming data from the input stream must be processed entirely by these blocks. Filter expressions can be applied in subsequent stages to precisely control data routing and processing.

Saving and Publishing the Pipeline

Save and publish your Data Pipeline on the Pipeline Editor page. Click Publish to activate and deploy your configured pipeline. If needed, revert to the previously published version or click on cancel to not save changes.

There are 5 buttons available on the pipeline editor:

  1. Debug: This is for debugging the pipeline which is in Draft mode. This is explained in detail here.
  2. Publish: Convert the draft pipeline to publish and deploy the pipeline.
  3. Revert to Published: The draft will be overwritten by the published pipeline.
  4. Cancel: Discard the unsaved changes.
  5. Save: Save the current changes in draft mode.

To debug the Data Pipeline, please follow the instructions provided here. To manage the added Data Pipeline, refer to the instructions here.

Create DataStore Connectors

Create DataStore Connectors to facilitate the transfer of contextualized data to the designated data store.

  1. On the ContextStreams UI page, navigate to the DataStore Connector tab. And click on the + button to initiate the creation of a DataStore Connector.



  2. You will be redirected to the Create DataStore Connector page.



  3. First, choose the DataStore Connector type based on your requirements:
  4. Provide the specific details for the DataStore Connector and Click on the Save button to finalize the creation of the DataStore Connector.

By following these steps, you can seamlessly create a DataStore Connector tailored to your needs, whether it’s an ES Sink or JDBC Sink connector, with the option to include advanced configurations for more nuanced control.

ES Sink Connector

Select the ES sink connector and fill in the following required details:

  • Connector name: Provide a globally unique name for the connector.
  • ES URL: List the ElasticSearch HTTP Connection URLs.
  • Maximum tasks: Specify the maximum number of tasks.
  • ES index name: Enter the Elasticsearch Index Name.
  • Ignore Document ID: Choose whether to use Kafka Message Key as Document ID.
  • Data streams:  List topics to consume, separated by commas.
  • Description: Provide a description for the ES Sink connector.
  • Advanced Settings (optional): Include a JSON configuration with advanced properties if needed. The most used advanced settings ES Sink Connectors are as follows:
    1. batch.size
      • Type: int, Default: 2000, Valid Values: [1,…,1000000]
      • Number of records processed as a batch when writing to Elasticsearch.
    2. max.in.flight.requests
      • Type: int, Default: 5, Valid Values: [1,…,1000]
      • Maximum number of indexing requests allowed to be in-flight to Elasticsearch before blocking further requests.
    3. flush.timeout.ms
      • Type: long, Default: 180000 (3 minutes), Valid Values: [1000,…,604800000]
      • Timeout in milliseconds for periodic flushing and waiting for buffer space when adding records. Task fails if this timeout is exceeded.
    4. max.retries
      • Type: int, Default: 5, Valid Values: [0,…,2147483647]
      • Maximum number of retries allowed for failed indexing requests. Task fails if retry attempts are exhausted.
    5. read.timeout.ms
      • Type: int, Default: 3000 (3 seconds), Valid Values: [0,…,604800000]
      • Time in milliseconds to wait for Elasticsearch server to send a response. Task fails if any read operation times out.
    6. drop.invalid.message
      • Type: boolean, Default: false
      • Whether to drop Kafka messages when they cannot be converted to an output message.
    7. behavior.on.null.values
      • Type: string, Default: FAIL, Valid Values: [IGNORE, DELETE, FAIL]
      • How to handle records with a non-null key and a null value.
    8. behavior.on.malformed.documents
      • Type: string, Default: FAIL, Valid Values: [IGNORE, DELETE, FAIL]
      • How to handle records rejected by Elasticsearch due to malformed document exceptions.
    9. write.method
      • Type: string, Default: INSERT, Valid Values: [INSERT, UPSERT]
      • Method used for writing data to Elasticsearch (INSERT or UPSERT). UPSERT may require more time and resources.

JDBC Sink Connector

Selecting the JDBC sink connector will require the following details:

  • Connector name: Provide a globally unique name for the connector.
  • Maximum tasks: Specify the maximum number of tasks.
  • JDBC connection URL: Enter the JDBC Connection URL.
  • JDBC connection user: Provide the JDBC connection user.
  • JDBC connection password: Enter the JDBC connection password.
  • Table name: Specify the destination Table name.
  • Data streams: List topics to consume, separated by commas.
  • Description: Provide a description for the JDBC Sink connector.
  • Advanced Settings (optional): Include a JSON configuration with advanced properties if needed. The most used advanced settings ES Sink Connectors are as follows:
    1. insert.mode
      • Type: string, Default: insert, Valid Values: [insert, upsert, update]
      • The insertion mode to use. The supported modes are:
        1. insert: Use standard SQL INSERT statements.
        2. upsert: Use the appropriate upsert semantics for the target database if supported by the connector. Requires additional configuration of pk.mode and pk.fields.
        3. update: Use the appropriate update semantics for the target database if supported.
    2. batch.size
      • Type: int, Default: 3000, Valid Values: [0,…]
      • Specifies how many records to attempt to batch together for insertion into the destination table, when possible.
    3. delete.enabled
      • Type: boolean, Default: false
      • Whether to treat null record values as deletes. Requires pk.mode to be set to record_key.
    4. pk.mode
      • Type: string, Default: none, Valid Values: [none, kafka, record_key, record_value]
      • The primary key mode. Supported modes are:
        1. none: No keys are utilized.
        2. kafka: Apache Kafka coordinates are used as the primary key.
        3. record_key: Field(s) from the record key are used.
        4. record_value: Field(s) from the record value are used.
    5. pk.fields
      1. Type: list, Default: none
      2. A list of comma-separated primary key field names. The runtime interpretation depends on pk.mode.
    6. fields.whitelist
      1. Type: list, Default: “”
      2. List of comma-separated record value field names. If empty, all fields from the record value are utilized, otherwise used to filter to the desired fields. Note that pk.fields is applied independently for the primary key columns.

💡Note: The most used advanced settings, common for both ES and JDBC Sink Connectors, are as follows:

  • topics.regex
    1. Type: string, Default: “”, Valid Values: valid regex
    2. Regular expression for selecting topics to consume. Specify either ‘topics’ or ‘topics.regex,’ not both.
  • errors.tolerance
    1. Type: string, Default: none, Valid Values: [none, all]
    2. Determines the behavior for handling errors during connector operation. ‘none’ results in an immediate task failure, while ‘all’ skips problematic records.
  • errors.log.enable
    1. Type: boolean, Default: false
    2. If true, records details of errors, failed operations, and problematic records in the Connect application log.
  • errors.log.include.messages
    1. Type: boolean, Default: false
    2. Determines whether to include Connect record details in the log for failed operations. For sink records, logs topic, partition, offset, and timestamp. For source records, logs key, value, headers, timestamp, Kafka topic, Kafka partition, source partition, and source offset.

💡Note: In most cases, the default value is designed to work effectively, especially if a well-considered default is already provided.

To manage the added DataStore Connectors, refer to the instructions here.

Resources

Browse through our resources to learn how you can accelerate digital transformation within your organisation.

Quick Links