Get Data from a Stream Layer

Getting data from a stream layer involves using the stream API to create a subscription to the layer, then reading messages from the layer.

The process for getting data from a stream layer is:

  1. Obtain an authorization token.
  2. Get the API base URLs.
  3. Create a subscription.
  4. Read messages.
  5. Commit offsets.

Each of these steps is described in detail below.

Note

Some stream API endpoints use the base URL returned by the subscription request instead of the base URL returned by the API Lookup service. Pay close attention to the URL mentioned in the stream API documentation for different endpoints.

Obtain an Authorization Token

Obtain an authorization token for your HTTP requests. For instructions, see the Authentication and Authorization Developer Guide.

Get API Base URLs

Use the API Lookup service to get the API endpoints for the catalog you want to get data from. For instructions, see the API Lookup Developer Guide.

Create a Subscription

Once you have an authorization token and the API base URLs, you can create a subscription to the stream layer. There are two subscription modes: serial and parallel. The following table describes each mode.

Mode Description
Serial Use serial subscription if your application will read smaller volumes of data using a single subscription. This is the default subscription mode. With serial subscription, the application subscribes to a stream layer by making a subscription request, and the server issues a nodeBaseURL and subscriptionId. Then the application reads messages and commits offsets as many times as needed using the provided nodeBaseURL.

For a message size of 8 KB and an ingestion rate of 175 messages per second, this mode is expected to yield a throughput of 1.37 MBps.
Parallel Use parallel subscription if your application will read large volumes of data in a parallel manner. The subscription and message reading workflow is similar to serial subscription except that you are allowed to create multiple subscriptions for the same aid, catalog, layer, group.id combination using multiple processes/threads. This allows you to read and commit messages for each subscription in parallel.

You can optionally provide a consumerId when you subscribe to identify your consumer in a group. If not provided, the system will create a unique consumerId. For a given aid, catalog, layer, group.id, consumerId combination, the subscriptionId will always be the same. Hence instead of persisting a subscriptionId you can always get back the same subscriptionId by providing the same values for aid, catalog, layer, group.id, consumerId during a subscribe request and resume from where they left (assuming the subscription's time-to-live has not expired).

For a message size of 10 KB and layer created with 64 MBps (Mega bytes per second) egress, the system yielded throughput of 64 MBps (Mega bytes per second), reading with 32 subscriptions in parallel Note: The amount of data available in the layer can be a limitation.

Caution

Do not use serial subscription for creating multiple subscriptions for the same aid, catalog, layer, group.id combination. There can be only one consumer created in the server for this combination, so two or more applications/processes/threads trying to access the server with different tokens may lead to subscriptions not making any progress. This is because the system might classify it as a token refresh event and might return the same set of messages.

Once you have decided which subscription mode to use, create the subscription using this request:

Serial Subscription

POST /<Base path for the stream API from the API Lookup Service>/layers/<Layer ID>/subscribe?mode=serial HTTP/1.1
Host: <Hostname for the stream API from the API Lookup Service>
Authorization: Bearer <Authorization Token>
Content-Type: application/json

Parallel Subscription

One unit of parallelism currently equals 1 MBps (Megabytes per second) inbound or 2 MBps (Megabytes per second) outbound whichever is greater, rounded up to the nearest integer. The number of subscriptions within the same group cannot exceed the parallelism allowed.

You can calculate the maximum number of subscriptions in a group by looking at the stream layer's inbound and outbound throughput, which you can find in the platform portal in the layer's configuration page. First, divide the configured outbound throughput by 2 and round the result up to the nearest integer. Compare the result to the configured maximum inbound throughput. Whichever number is greater is the maximum number of subscriptions.

For example, say you have a stream layer configured as follows:

Inbound throughput: 1 MBps Outbound throughput: 3 MBps

To calculate the maximum number of subscriptions per group:

  1. 3 / 2 = 1.5
  2. 1.5 rounded up to the nearest integer: 2
  3. 2 is greater than the inbound throughput (1) so the maximum number of subscriptions in a group is 2.

Another example:

Inbound throughput: 5 MBps Outbound throughput: 2 MBps

To calculate the maximum number of subscriptions per group:

  1. 2 / 2 = 1
  2. 1 is already an integer so no need to round up.
  3. 1 is less than the inbound throughput (5), so the maximum number of subscriptions in a group is 5.
POST /<Base path for the stream API from the API Lookup Service>/layers/<Layer ID>/subscribe?mode=parallel&consumerId=<consumerId> HTTP/1.1
Host: <Hostname for the stream API from the API Lookup Service>
Authorization: Bearer <Authorization Token>
Content-Type: application/json

The response contains the base URL and subscription ID to use to read messages.

{
  "nodeBaseURL": "<The base URL to use to access the subscription.>",
  "subscriptionId": "<The unique ID for this subscription>"
}

Subscription Properties

The following Kafka Consumer properties can be overridden by passing them in HTTP body of the subscribe request.

  • auto.commit.interval.ms
  • auto.offset.reset
  • enable.auto.commit
  • fetch.max.bytes
  • fetch.max.wait.ms
  • fetch.min.bytes
  • group.id
  • max.partition.fetch.bytes
  • max.poll.records

All these are optional. If not provided, the defaults will be used. For information about the defaults, see the API Reference for the stream API.

Read Messages

Once you have created a subscription you can read messages from the stream using the subscription's nodeBaseURL and subscriptionId obtained in the Create a Subscription step.

GET /layers/<Layer ID>/partitions?subscriptionId=<subscriptionId from prior step> HTTP/1.1
Host: <nodeBaseURL from prior step>
Authorization: Bearer <Authorization Token>
Accept: application/json

The response contains an array of messages, each containing a metadata and offset object. The contents of the metaData object vary depending on the message size:

  • If the message size is greater than 1 MB, metaData contains the dataSize and dataHandle fields.
  • If the message size is less than or equal to 1 MB, metaData contains only the data field.

For example, in the following response the first message is less than 1 MB and the second message is greater than 1 MB.

{
  "messages": [
    {
      "metaData": {
        "partition": "<UTF-8 byte encoded string of HERE Partition ID>",
        "data": "<UTF-8 byte encoded string of the actual data>",
        "timestamp": <The time the message was accepted by the HERE platform>
      },
      "offset": {
        "partition": <Kafka Topic partition ID>,
        "offset": <Kafka Message offset ID>
      }
    },
    {
      "metaData": {
        "partition": "<UTF-8 byte encoded string of HERE Partition ID>",
        "checksum": "<Checksum of payload (default SHA1) encoded as hex>",
        "compressedDataSize": <compressed (gzipped) size in bytes of the blob>,
        "dataSize": <un-compressed data size in bytes of the blob>,
        "dataHandle": "<reference to blob in the Blob service>",
        "timestamp": <The time the message was accepted by the HERE platform>
      },
      "offset": {
        "partition": <Kafka Topic partition ID>,
        "offset": <Kafka Message offset ID>
      }
    }
  ]
}

For more information about the response, see the API Reference for the stream API.

You should unsubscribe when you are no longer consuming data from the stream. For more information, see Unsubscribe from a Stream Layer.

Commit Offsets

After reading data, you should commit the offset of the last message read from each partition so that your application can resume reading new messages from the correct partition in the event that there is a disruption to the subscription, such as an application crash. An offset can also be useful if you delete a subscription then recreate a subscription for the same layer, because the new subscription can start reading data from the offset.

To commit offsets, use the stream API's /offset endpoint, specifying the subscription's nodeBaseURL and subscriptionId obtained in the Create a Subscription step. The request body should contain the offset of the last message read in each partition. Do not pass offset +1 as mentioned in the Kafka Consumer Documentation. HERE platform adds 1 to the offset in the request body as required.

PUT /layers/fake-layer-name/offsets?subscriptionId=<subscriptionId>
Host: <nodeBaseURL>
Authorization: Bearer <Authorization Token>
Accept: application/json

{ "offsets":
  [
    {
      "partition": <Partition ID>,
      "offset": <Offset Number>
    },
    {
      "partition": <Partition ID>,
      "offset": <Offset Number>
    }
  ]
}

For example, if you specified partition 0 and offset 1 for the first offset, and partition 1 offset 1 for the second, the next time you read data using this subscription it will return messages starting from offset 7 in partition 0 and offset 1 in partition 1.

Seeking Stream Data

When you read data from a stream layer you will typically want to read messages starting from the first message following the last one you read. However, you can instead read data from a different offset, allowing you to skip forward or backward in the stream.

To read data from a specific offset:

  1. Make a request to the /seek endpoint to specify the offsets from which you want to start reading data. In the request, the values for nodeBaseURL and subscriptionId are the ones returned in the response of the subscription request described in Create a Subscription. For example:

     PUT /layers/fake-layer-name/seek?subscriptionId=<subscriptionId> HTTP/1.1
     Host:  <nodeBaseURL>
     Authorization: Bearer <Authorization Token>
     Content-Type: application/json
     Accept: application/json
    
     {
         "offsets": 
             [
                 {
                     "partition": <Partition ID>,
                     "offset": <Offset Number>
                 },
                 {
                     "partition": <Partition ID>,
                     "offset": <Offset Number>
                 }
             ]
         }
    
  2. Make a request to the /partitions endpoint to read data as described in Read Messages.

Refreshing Authentication Tokens

In order to continually read messages from a stream layer you will need to refresh the authentication token used by the GET /layers/<LayerID>/partitions request before it expires. There is no need to un-subscribe and subscribe again during a token refresh.

Expect a small drop in performance during the token change as the server has to perform more work. We recommended that you minimize the number of token refresh operations.

Note

Token refresh is only permitted on the /partitionsand /seek endpoints.

Message Ordering Guarantee

Systems requiring strong message ordering guarantee (at a Streaming partition level) should follow the recommendations below:

  1. Use 'serial' subscription mode, instead of 'parallel' subscription mode as partitions can be distributed between Subscriptions belonging to the same group.id in 'parallel' subscription mode and upon any system failures or scale out/in, the partitions can be re-balanced across subscriptions, leading to a potential ordering guarantee failure during the re-balancing.
  2. The system should keep the number of in-flight HTTP requests for a given subscription to a maximum of one, i.e. make the first request and wait to make the second request until after getting the response to the first request.

results matching ""

    No results matching ""