Layer Operations
Overview
A layer contains semantically related data of same type and format. Layers are subdivided into smaller units called partitions which are reasonably-sized units for caching and processing. Partitions can store any binary data - it will be published and retrieved without modification. However, you can optionally define the structure and encoding of the data by associating a content type and schema with the layer. The content type, such as application/geo+json
, describes the encoding of the data stored in each partition according to internet standards. The schema, for content types that need one such as application/x-protobuf
, defines the exact data structure for the partitions in the layer so that data producers and consumers know how to generate and interpret encoded data.
See the Data API for more information on layers and partitions.
The HERE Data SDK for Python allows for interaction with these layer types:
- Versioned layer
- Volatile layer
- Index layer
- Stream layer
- Interactive Map layer
- Object Store layer
Each layer type has its own usage and storage patterns, and separate classes with methods for that type. For information about all methods, see the layer module API documentation.
- In the Read from Layer section following, you can find details on how to read data from each of the supported layer types.
- For information on how to write data to each of these layer types, see the Write to Layer section.
- For information on writing to multiple layers of a catalog simultaneously, see the Catalog Operations section.
- For information on creating new and modifying existing layers, see Catalog Operations section.
Adapters
Adapters are used to encode and decode data and convert data between different representations, for example pandas DataFrame
, geopandas GeoDataFrame
, Python list
and dict
objects, GeoJSON objects of the geojson Python library. Additional adapters can be added in the future, or users can develop and use their own, to interface HERE Data SDK for Python to a variety of systems and content representations.
Included in the HERE Data SDK for Python there are the following adapters:
- Default adapter, automatically used in case no other adapter is specified
- GeoPandas Adapter
A default adapter is configured by default. Users can also specify a different adapter in various functions. However, not every adapter supports every content type and representation. It's always possible to specify the parameters encode=False
or decode=False
when reading or writing to disable encoding, decoding, and format transformation. In this case, data in the form of raw bytes
is returned to the user when reading layers; similarly users have to provide already-encoded data when writing.
The following table summarizes the data formats currently supported natively by HERE Data SDK for Python, and via the use of additional data adapters.
Layer and content type | encode=False decode=False | DefaultAdapter | GeoPandasAdapter |
Versioned Layer | | | |
any | read/write bytes | | |
application/(x-)protobuf | | read/write Message | read/write DataFrame |
application/x-parquet | | | read/write DataFrame |
application/json | | read/write dict /list | read/write DataFrame |
application/(vnd.)geo+json | | read/write FeatureCollection | read/write GeoDataFrame |
text/csv | | read/write List[dict] | read/write DataFrame |
Volatile Layer | | | |
any | read/write bytes | | |
application/(x-)protobuf | | read/write Message | read/write DataFrame |
application/x-parquet | | | read/write DataFrame |
application/json | | read/write dict /list | read/write DataFrame |
application/(vnd.)geo+json | | read/write FeatureCollection | read/write GeoDataFrame |
text/csv | | read/write List[dict] | read/write DataFrame |
Index Layer | | | |
any | read/write bytes | | |
application/(x-)protobuf | | read/write Message | read/write DataFrame |
application/x-parquet | | | read/write* DataFrame |
application/json | | read/write dict /list | read/write DataFrame |
application/(vnd.)geo+json | | read/write FeatureCollection | read/write GeoDataFrame |
text/csv | | read/write List[dict] | read/write DataFrame |
Stream Layer | | | |
any | read/write bytes | | |
application/(x-)protobuf | | read/write Message | read/write DataFrame |
application/x-parquet | | | read/write DataFrame |
application/json | | read/write dict /list | read/write DataFrame |
application/(vnd.)geo+json | | read/write FeatureCollection | read/write GeoDataFrame |
text/csv | | read/write List[dict] | read/write DataFrame |
Interactive Map Layer | | | |
application/(vnd.)geo+json | | read/write FeatureCollection | read/write GeoDataFrame |
Object Store Layer | | | |
any | | read/write bytes |
(*) writing is supported for only one partition at a time
Read from Layer
You can read partition metadata and the content of each partition (referred to as partition data), from the following layers:
- Versioned
- Volatile
- Index
- Stream
The functions read_partitions
and read_stream
return iterators, to consume one value at a time.
Note
Partition metadata - and the concept of 'partitions' in general - do not apply to Interactive Map nor Object Store layers. Look to sections below on this page specific to these layer types for the read methods applicable to each.
To execute the read examples below, you would first need to run the following code that defines some example catalogs and layers:
from here.platform import Platform
platform = Platform()
oma_catalog = platform.get_catalog('hrn:here:data::olp-here:oma-3')
versioned_layer = oma_catalog.get_layer('topology_geometry_segment')
wx_catalog = platform.get_catalog('hrn:here:data::olp-here:live-weather-eu')
volatile_layer = wx_catalog.get_layer('latest-data')
sdii_catalog = platform.get_catalog('hrn:here:data::olp-here:olp-sdii-sample-berlin-2')
index_layer = sdii_catalog.get_layer('sample-index-layer')
traffic_catalog = platform.get_catalog('hrn:here:data::olp-here:olp-traffic-1')
stream_layer = traffic_catalog.get_layer('traffic-incidents-delta-stream')
samples_catalog = platform.get_catalog('hrn:here:data::olp-here:here-geojson-samples')
objectstore_layer = samples_catalog.get_layer('objectstore')
test_catalog = platform.get_catalog(...)
interactive_map_layer = test_catalog.get_layer(...)
Read Partitions/Data from Versioned Layer
To read data stored in a versioned layer, please refer to the read_partitions function of the VersionedLayer
class. Provided a set of partition identifiers, the function returns the data associated with the partitions specified. If no partitions are specified, the content for whole layer is returned.
It's possible to specify which version of the catalog to query, by default the latest.
While read_partitions
queries layer metadata and downloads corresponding data associated with each partition in one single call, it's also possible to query just the partition metadata and obtain and decode the associated data manually, if needed at a later time. To query just the partition metadata, please use to the get_partitions_metadata function.
For additional information, an exhaustive list of parameters and adapter-specific parameters, please consult the documentation of VersionedLayer.
Example: reading two partitions
The read_partitions
method returns an iterator over objects for each requested partition. Each object is a tuple
, the first element being of type VersionedPartition, and the second being the contents of that partition. The partition content is decoded according to the content type (and schema, if applied) specified in the layer configuration, according to the matrix above.
partitions = versioned_layer.read_partitions(partition_ids=[19377307, 19377333])
for p in partitions:
versioned_partition, partition_content = p
print(f"{versioned_partition.id}: {type(partition_content)}")
19377307: <class 'SegmentPartition'>
19377333: <class 'SegmentPartition'>
Example: reading a specific catalog version and skipping decoding
In this example, data is not decoded and partition_content
is of type bytes
.
partitions = versioned_layer.read_partitions(partition_ids=[19377307, 19377333], version=10, decode=False)
for p in partitions:
versioned_partition, partition_content = p
print(f"{versioned_partition.id}: {type(partition_content)}")
19377307: <class 'bytes'>
19377333: <class 'bytes'>
Example: obtaining the metadata and fetching the data manually
partitions = versioned_layer.get_partitions_metadata(partition_ids=[19377307, 19377333])
for partition in partitions:
print(partition.id)
data = partition.get_blob()
print(data)
Read Partitions/Data from Volatile Layer
To read data stored in a volatile layer, please refer to the read_partitions function of the VolatileLayer
class. Provided a set of partition identifiers, the function returns the data associated with the partitions specified. If no partitions are specified, the content for whole layer is returned.
While read_partitions
queries layer metadata and downloads corresponding data associated with each partition in one single call, it's also possible to query just the partition metadata and obtain and decode the associated data manually, if needed at a later time. To query just the partition metadata, please use to the get_partitions_metadata function.
For additional information, exhaustive list of parameters and adapter-specific parameters, please consult the documentation of VolatileLayer.
Example: reading two partitions
The read_partitions
method returns an iterator over objects for each requested partition. Each object is a tuple
, the first element being of type VolatilePartition, and the second being the contents of that partition. The partition content is decoded according to the content type (and schema, if applied) specified in the layer configuration, according to the matrix above.
partitions = volatile_layer.read_partitions(partition_ids=[92259, 92262])
for p in partitions:
volatile_partition, partition_content = p
print(f"{volatile_partition.id}: {type(partition_content)}")
92259: <class 'WeatherConditionPartition'>
92262: <class 'WeatherConditionPartition'>
Example: skipping decoding
In this example, data is not decoded and partition_contents
is of type bytes
.
partitions = volatile_layer.read_partitions(partition_ids=[92259, 92262], decode=False)
for p in partitions:
volatile_partition, partition_content = p
print(f"{volatile_partition.id}: {type(partition_content)}")
92259: <class 'bytes'>
92262: <class 'bytes'>
Example: obtaining the metadata and fetching the data manually
partitions = volatile_layer.get_partitions_metadata(partition_ids=[92259, 92262])
for partition in partitions:
print(partition.id)
data = partition.get_blob()
print(data)
Read Partitions/Data from Index Layer
To read data stored in an index layer, please refer to the read_partitions function of the IndexLayer
class. The function returns all the data associated with index partitions matching a RSQL query.
Data is decoded according to the content type specified in the layer configuration. The option of automatic decoding is supported only for content types indicated in the adapter support matrix above. In situations where content type is not supported, you should use decode=False
and will need to decode the data separately after reading.
While read_partitions
queries via RSQL index metadata and downloads corresponding data associated with each partition in one single call, it's also possible to query via RSQL just the partition metadata and obtain and decode the associated data manually, if needed at a later time. To query just the partition metadata, please use to the get_partitions_metadata function.
For additional information, exhaustive list of parameters and adapter-specific parameters, please consult the documentation of IndexLayer.
Example: query partitions and read all returned data
Each returned partition
is of type IndexPartition, and contains custom fields, as defined by the user who created the index layer. In this example, data is not decoded and partition_data
is bytes
.
partitions = index_layer.read_partitions(query="hour_from=ge=10", decode=False)
for partition, partition_data in partitions:
print(partition.fields, partition_data)
Example: obtaining the metadata and fetching the data manually
partitions = index_layer.get_partitions_metadata(query="hour_from=ge=10")
for partition in partitions:
print(partition.fields)
data = partition.get_blob()
print(data)
Example: obtaining the part ids for fetching data by parts
parts = index_layer.get_parts(num_requested_parts=4, billing_tag="billing-tag-value")
Example: obtaining the metadata using part id and fetching the data manually
resp = index_layer.get_parts(num_requested_parts=4, billing_tag="billing-tag-value")
for val in resp['parts']:
partitions = index_layer.get_partitions_metadata(query="hour_from=ge=10", part=val['partId'])
for partition in partitions:
print(partition.fields)
data = partition.get_blob()
print(data)
Read Partitions/Data from Stream Layer
To read from a stream layer, a subscription must be created first via the subscribe function of the StreamLayer
class. The function instantiates on the HERE platform a Kafka consumer, that is later queried via its REST API to read messages from the layer. The subscribe
function returns a StreamSubscription
. Please unsubscribe when reading is complete to free resources on the platform.
To consume data stored in a stream layer, please refer to the read_stream function. The function consumes the stream and returns the messages and corresponding content. This method requires a StreamSubscription
.
While read_stream
consumes the messages and downloads corresponding data associated with each message in one single call, it's also possible to consume and retrieve just the messages (partition metadata) and obtain and decode the associated data manually, if needed at a later time. To consume just the metadata, please use to the get_stream_metadata function. This function also requires a StreamSubscription
.
For additional information, exhaustive list of parameters and adapter-specific parameters, please consult the documentation of StreamLayer.
Example: consuming content
Data is decoded according to the content type specified in the layer configuration. The type of what is actually returned in partition_data
depends on the content type and adapter used, according to the matrix above.
Each returned partition
is of type StreamPartition. When the with
block terminates either successfully or unsuccessfully, subscription.unsubscribe()
will be called internally.
with stream_layer.subscribe() as subscription:
partitions = stream_layer.read_stream(subscription=subscription)
for partition, content in partitions:
print(f"{partition.id}: {content}")
Example: skipping decoding
In this example, data is not decoded and partition_data
is bytes
.
subscription = stream_layer.subscribe()
try:
partitions = stream_layer.read_stream(subscription=subscription, decode = False)
for partition, content in partitions:
print(f"{partition.id}: {content}")
finally:
subscription.unsubscribe()
Example: obtaining the metadata and fetching the data manually
A distinguishing characteristics of the stream layer, compared for example to versioned and volatile layer, is that partition metadata (messages on the Kafka stream) can contain the data inline if small enough. The data may be included directly in each message, instead of being stored through the Blob API.
get_blob
returns the data by retrieving it from the Blob API. get_data
, specific to the stream layer, returns the data if inline and, only when needed, retrieves it from the Blob API. It is therefore recommended to use get_data.
subscription = stream_layer.subscribe()
try:
partitions = stream_layer.get_stream_metadata(subscription=subscription)
for partition in partitions:
print(partition.id, partition.timestamp)
data = partition.get_data()
print(data)
finally:
subscription.unsubscribe()
Example: consuming and producing content using direct Kafka
Direct Kafka support enables the users to retrieve instance of Kafka Consumer and Kafka Producer. Using these instances user can write and read in stream layer. These instances are configurable.
For more details on topic level consumer configuration https://kafka.apache.org/11/documentation.html#newconsumerconfigs.
For more details on topic level producer configuration https://kafka.apache.org/11/documentation.html#producerconfigs.
producer = stream_layer.kafka_producer(value_serializer=lambda x:json.dumps(x).encode('utf-8'))
topic = stream_layer.get_kafka_topic()
for x in range(10):
data = {'x': x, '2x': x*2}
producer.send(topic, value=data)
producer.close()
consumer = stream_layer.kafka_consumer(value_deserializer=lambda x:json.loads(x.decode('utf-8'))
for message in consumer:
print(f"Message is {message.value}")
consumer.close()
Read Features from Interactive Map Layer
This layer type does not have the concept of partitions and encoded data. There are no functions that read raw data or support decode
parameters. Interactive Map layers are designed around the concept of features, in the sense of GeoJSON FeatureCollection
, instead of partitions.
When using the default adapter, FeatureCollection
or iterator of Feature
(both GeoJSON concepts) are returned directly.
Some of the functions to retrieve the feature from the layer:
get_features
search_features
iter_features
get_features_in_bounding_box
spatial_search
spatial_search_geometry
Example: Read multiple features from an Interactive Map layer using get_features
features = interactive_map_layer.get_features(feature_ids=["feature_id1", "feature_id2", "feature_id3"])
Example: Search for and retrieve features based on properties using search_features
features = interactive_map_layer.search_features(params={"p.name": "foo", "p.type": "bar"})
Example: Retrieve all features in a layer using iter_features
for feature in interactive_map_layer.iter_features():
print(feature)
Example: Find and retrieve all features in a bounding box using get_features_by_bbox
bbox_features = interactive_map_layer.get_features_in_bounding_box(bounds=(-171.791110603, 18.91619, -66.96466, 71.3577635769))
Example: Find and retrieve all features within given radius of input point using spatial_search
features = interactive_map_layer.spatial_search(lng=-95.95417, lat=41.6065, radius=1000)
Example: Find and retrieve all features within arbitrary geometry using spatial_search_geometry
from shapely.geometry import Polygon
polygon = Polygon([(0, 0), (1, 1), (1, 0)])
features = interactive_map_layer.spatial_search_geometry(geometry=polygon)
Interactive Map Layer Subscriptions to Destination Stream Layer.
Some of the functions for IML Subscriptions to Destination Stream Layer:
get_all_subscriptions
subscribe
subscription_status
subscription_exists
get_subscription
unsubscribe
Example: List All Subscriptions get_all_subscriptions
subscriptions_gen = interactive_map_layer.get_all_subscriptions(limit=10)
while True:
try:
subscriptions_list = next(subscriptions_gen)
print([subscription.subscription_hrn for subscription in subscriptions_list])
except StopIteration:
break
Example: Subscribe to Destination Stream Layer using subscribe
import uuid
from here.platform.layer import InteractiveMapSubscriptionType
x_idempotency_key=str(uuid.uuid4())
interactive_map_subscription = interactive_map_layer.subscribe(subscription_name="test-iml-subscription-name-123",
description="this is a test iml subscription",
destination_catalog_hrn="hrn:here:data::olp-cs:pysdk-test-catalog-for-iml-subs",
destination_layer_id="stream-raw-demo",
interactive_map_subscription_type=InteractiveMapSubscriptionType.PER_FEATURE)
interactive_map_subscription
Example: Get Subscription Status subscription_status
subscription_status = interactive_map_layer.subscription_status(status_token=interactive_map_subscription.status_token)
subscription_status, subscription_status.status
Example: Subscription Exists subscription_exists
subscription_exists = interactive_map_layer.subscription_exists(subscription_hrn=interactive_map_subscription.subscription_hrn)
subscription_exists
Example: Get Subscription get_subscription
subscription = interactive_map_layer.get_subscription(subscription_hrn=interactive_map_subscription.subscription_hrn)
subscription, subscription.subscription_name, subscription.subscription_hrn
Example: Unsubscribe to Destination Stream Layer using unsubscribe
import uuid
x_idempotency_key=str(uuid.uuid4())
unsubscribed = unsubscribe(subscription_hrn=interactive_map_subscription.subscription_hrn)
unsubscribed, unsubscribed.status_token
Read Data from Object Store Layer
Object Store layers offer a key/value store which mimics the behavior of a filesystem. Keys equate to the file path+name (relative to root of the layer). Corresponding values are bytes
equating to the contents of said file. Layers of this type do not have the concept of partitions.
Functions provided for retrieving key/value information from Object Store layers include:
key_exists
list_keys
iter_keys
get_object_metadata
read_object
Example: Check if layer contains a given key with key_exists
key_found = objectstore_layer.key_exists(key = "berlin/districts_of_berlin_tiled/23618356")
Example: List all keys (subdirectories and files) under a given parent key (directory) with list_keys
The optional parent
parameter specifies which level in the layer's key hierarchy to use as root for the listing. If omitted, the root of the layer itself is used. The deep
parameter indicates whether you want to retrieve the entire hierarchy of descendants (True
) or only direct descendants (False
). If omitted, deep
is assumed to be False
. This method will return a list of keys as strings.
everything_under_berlin_folder = objectstore_layer.list_keys(parent = "berlin", deep = True)
Example: Get an Iterator over all keys under a given parent key (directory) with iter_keys
This method is analogous to list_keys
, but returns an Iterator instead of a List.
berlin_files_iter = objectstore_layer.iter_keys(parent = "berlin", deep = True)
Example: Get metadata for an object with get_object_metadata
This method returns metadata associated with a given key. The information contained includes:
- Last-Modified date
- Data size
- Content type
- Content encoding
obj_metadata = objectstore_layer.get_object_metadata("berlin/districts_of_berlin_tiled/23618355")
Example: Read object for a given key with read_object
The read_object
method offers an optional include_metadata
parameter which specifies whether or not to also return the metadata associated with an object. By default, the value is False
and read_object
will return only the content requested. If True
, a tuple containing content plus object_metadata
will be returned.
geojson_content = objectstore_layer.read_object("berlin/districts_of_berlin_tiled/23618355")
Write to Layer
Below examples show how to publish data to each supported layer type.
The functions write_partitions
and write_stream
accept iterators, to produce one value at a time. In case an Adapter
is used, the type to pass to the write functions is adapter-specific.
In the examples below, actual data is represented by the placeholder ...
while partition identifiers are plausible.
Note (when running within Jupyter environment)
When calling write_partitions
with a payload of less than 50 MB, all data will be published in a single transaction. When data size exceeds 50 MB, a multipart upload is used to improve performance. The parallelization involved with multipart uploads will interfere with Jupyter's internal asynchronous behavior if not explicitly handled. In case you will be uploading more than 50 MB to a single partition, you will need to include the following instructions in your notebook before attempting to publish:
import nest_asyncio
import asyncio
nest_asyncio.apply(loop=asyncio.get_event_loop())
Write to Versioned Layer
To write to one or more versioned layers, a Publication
must be first created. Publications for versioned layers works like transactions. It's possible to complete
a publication or cancel
it to drop all the metadata uploaded until that moment.
Once a Publication
is available, one or more write_partitions
function calls can be used to write data to a layer. Each write function is layer specific. It's possible call each write function more than once for the same or multiple layers. See write_partitions for additional details.
Use set_partitions_metadata
to update and delete metadata of partitions of a layer without uploading the content at the same time. The content has to be uploaded separately before. The function also provide a way to delete partitions as part of a publication.
Completing a publication involving one or more versioned layers creates a new version of the catalog. Please see init_publication for additional details.
Example: writing multiple layers and partitions with publication context manager
The following snippet creates a new catalog version in which partitions of multiple layers are added or modified. The content of each partition is encoded and uploaded. The transaction is committed when the with
block terminates successfully which calls Publication.complete()
internally. If the with
block terminates unsuccessfully, Publication.cancel()
will be called internally.
Data is encoded according to the content type set in the layer configuration.
layerA = catalog.get_layer("A")
layerB = catalog.get_layer("B")
with catalog.init_publication(layers=[layerA, layerB]) as publication:
layerA.write_partitions(publication, { "a1": ..., "a2": ..., "a3": ... })
layerB.write_partitions(publication, { 377893751: ..., 377893752: ... })
layerB.write_partitions(publication, { 377893753: ..., 377893754: ... })
Note: If the content type of your layer is not supported, or if reading or writing raw content is preferred, please pass encode=False
to the write_* functions to skip encoding or decoding and deal with raw bytes instead. This applies e.g. to layer content type text/plain
.
Example: writing multiple layers and partitions skipping encoding
Users can provide already-encoded data in the form of bytes
compatible with the content type configured in each layer. In this case, encode=False
should be specified.
layerA = catalog.get_layer("A")
layerB = catalog.get_layer("B")
publication = catalog.init_publication(layers=[layerA, layerB])
try:
layerA.write_partitions(publication, { "a1": bytes(...), "a2": bytes(...) }, encode=False)
layerB.write_partitions(publication, { 377893751: bytes(...), 377893752: bytes(...) }, encode=False)
publication.complete()
except:
publication.cancel()
Write to Volatile Layer
To write to one or more volatile layers, a Publication
must be first created. Publications for volatile layers should be closed when not needed anymore via the complete
function to free resources. The cancel
function, due to the nature of volatile layer, has no effect as the layer is not versioned, it doesn't support transactions and succeeded writes cannot be rolled back.
Once a Publication
is available, one or more write_partitions
function calls can be used to write data to a layer. Each write function is layer specific. It's possible call each write function more than once for the same or multiple layers. See write_partitions for additional details.
Use set_partitions_metadata
to update and delete metadata of partitions of a layer without uploading the content at the same time. The content has to be uploaded separately before. The function also provide a way to delete partitions. Another way to delete partitions is delete_partitions.
Example: writing multiple layers and partitions with publication context manager
The following snippet write to two volatile layers. The content of each partition is encoded and uploaded. When the with
block terminates successfully it will call Publication.complete()
internally. If the with
block terminates unsuccessfully, Publication.cancel()
will be called internally.
Data is encoded according to the content type set in the layer configuration.
layerA = catalog.get_layer("A")
layerB = catalog.get_layer("B")
with catalog.init_publication(layers=[layerA, layerB]) as publication:
layerA.write_partitions(publication, { "a1": ..., "a2": ..., "a3": ... })
layerB.write_partitions(publication, { 377893751: ..., 377893752: ... })
Example: writing multiple layers and partitions skipping encoding
Users can provide already-encoded data in the form of bytes
compatible with the content type configured in each layer. In this case, encode=False
should be specified.
layerA = catalog.get_layer("A")
layerB = catalog.get_layer("B")
publication = catalog.init_publication(layers=[layerA, layerB])
try:
layerA.write_partitions(publication, { "a1": bytes(...), "a2": bytes(...) }, encode=False)
layerB.write_partitions(publication, { 377893751: bytes(...), 377893752: bytes(...) }, encode=False)
finally:
publication.complete()
Write to Index Layer
Writing to index layer does not need a publication. Writing to index layer is currently supported for one partition at the time.
Use the function write_single_partition to add a data and corresponding metadata to the index layer. Use the functino delete_partitions to remove partitions of the index layer that match a RSQL query.
It is also possible to operate with the index layer metadata only via the set_partitions_metadata. This adds and delete index partitions at once. The content for the added partitions has to be uploaded separately before.
Example: adding one partition to a layer
Data is encoded according to the content type set in the layer configuration.
fields = {
"f1": 100,
"f2": 500
}
index_layer.write_single_partition(data=..., fields=fields)
Example: adding one partition to a layer skipping encoding
Users can provide already-encoded data in the form of bytes
compatible with the content type configured in each layer. In this case, encode=False
should be specified.
fields = {
"f1": 100,
"f2": 500
}
index_layer.write_single_partition(data=bytes(...), fields=fields, encode=False)
Write to Stream Layer
To write to one or more stream layers, a Publication
must be first created. Publications for stream layers should be closed when not needed anymore via the complete
function to free resources. The cancel
function, due to the nature of stream layer, has no effect as it is not possible to delete messages from a Kafka stream once written.
Once a Publication
is available, one or more write_stream
function calls can be used to write data to a layer. Each write function is layer specific. It's possible call each write function more than once for the same or multiple layers. See write_stream for additional details.
Use append_stream_metadata to write to the stream just the metadata (messages) of a layer without uploading the content at the same time. The content has to be uploaded separately before, or included in the data
field of the messages when small enough.
Example: writing to streams with context manager
The content of each partition is encoded and uploaded. When the with
block terminates successfully which calls Publication.complete()
internally. If the with
block terminates unsuccessfully, Publication.cancel()
will be called internally.
Data is encoded according to the content type set in the layer configuration.
layerA = catalog.get_layer("A")
layerB = catalog.get_layer("B")
with catalog.init_publication(layers=[layerA, layerB]) as publication:
layerA.write_stream(publication, { "a1": ..., "a2": ..., "a3": ... })
layerB.write_stream(publication, { 377893751: ..., 377893752: ... })
Example: writing to streams skipping encoding
Users can provide already-encoded data in the form of bytes
compatible with the content type configured in each layer. In this case, encode=False
should be specified.
layerA = catalog.get_layer("A")
layerB = catalog.get_layer("B")
publication = catalog.init_publication(layers=[layerA, layerB])
try:
layerA.write_stream(publication, { "a1": bytes(...), "a2": bytes(...) }, encode=False)
layerB.write_stream(publication, { 377893751: bytes(...), 377893752: bytes(...) }, encode=False)
finally:
publication.complete()
Write to Interactive Map Layer
This layer type does not have the concept of partitions and data. There are no functions that write raw data or support encode
parameters. Interactive Map Layer API is modeled on the concept of GeoJSON FeatureCollection
. Consult the API documentation for full details.
When using the default adapter, FeatureCollection
or iterator of Feature
(both GeoJSON concepts) are passed directly as parameters.
Example: writing GeoJSON features
from geojson import FeatureCollection, Feature, Point, Polygon
f1 = Polygon(coordinates=[(0, 0), (0, 1), (1, 0), (0, 0)], properties={"a": 100, "b": 200})
f2 = Point(coordinates=(-1.5, 2.32), properties={"a": 50, "b": 95})
features = FeatureCollection(features=[f1, f2])
interactive_map_layer.write_features(features)
Example: writing GeoJSON features form a file
geojson_file_path = "~/example.geojson"
interactive_map_layer.write_features(from_file=geojson_file_path)
Write to Object Store Layer
Writing data to an Object Store layer requires that you define a key and provide the data to be associated with that key.
- Keys are strings composed of any of the following characters:
a-zA-Z0-9.[]=(){}/_-`.
- Within a key, the
/
(slash) character will be interpreted as a separator to define folder-like structures - If specified key already exists in the layer, the associated data will be overwritten with new
- The data to be associated with a key can be given as either a local file or a bytes object
When writing data, you may optionally specify the content type of the data being uploaded. If not specified, the type is assumed to be application/octet-stream
.
Example: write contents of a file
layerA = catalog.get_layer("A")
layerA.write_object(key = "dir1/name_1", path_or_data = "localdatafile.txt", content_type = "text/plain")
Example: write bytes object
layerA = catalog.get_layer("A")
layerA.write_object(key = "dir1/name_2", path_or_data = a_bytes_object )
Example: delete an existing object
To remove an existing object from layer, use the delete_object
method. This method provides an optional strict
parameter which, if True
, will raise an exception in case the key does not exist.
layerA = catalog.get_layer("A")
layerA.delete_object(key = "dir1/name_1")
Example: copying an existing object
To copy an existing object from layer, use the copy_object
method. This method provides an optional replace
parameter which, if True
, will replace the object in the target key.
layerA = catalog.get_layer("A")
layerA.copy_object(key = "dir1/name_2", copy_from = "dir1/name_1")
layerA.copy_object(key = "dir1/name_2", copy_from = "dir1/name_1", replace = True)