PlanetData D1.7 Distributed Streaming RDF Storage Engine

PlanetData
Network of Excellence
FP7 257641
D1.7 Distributed Streaming RDF Storage
Engine
Coordinator: Alejandro Llaves (UPM), Oscar Corcho (UPM),
Javier D. Fernández (UPM)
1st Quality Reviewer: Irini Fundulaki (FORTH)
2nd Quality Reviewer: Carolina Fortuna (IJS)
Deliverable nature:
Report (R)
Dissemination level:
Public (PU)
Contractual
date:
M48
(Condentiality)
delivery
Actual delivery date:
M48
Version:
1.0
Total number of pages:
37
Keywords:
RDF Stream Processing, RDF Stream Compression, Scalability
PlanetData
Deliverable D1.7
Abstract
In this deliverable, we describe two main contributions towards the development of a scalable RDF
stream processing engine, which can be reused as well by other ongoing RDF stream processing engine
implementations. First, we present a set of parallelizable Storm operators together with an architecture overview of the prototype under development. Second, we introduce a novel approach for RDF
stream compression called Ecient RDF Interchange (ERI) format. At the end of the document, we
recommend an evaluation plan and the future work directions in our agenda. A short paper and a full
paper related to this work have been accepted at ISWC 2014's OrdRing workshop and ISWC 2014's
research track, respectively.
Page 2 of (37)
Deliverable D1.7
PlanetData
Executive Summary
In the last years, there has been an increase in the amount of real-time data generated. Sensors
attached to things and persons are transforming how we interact with our environment. Moreover,
there is a growing number of applications that depend on the usage of real-time data, and which need
to complement the usual three levels of decision making (strategic, tactical, and operational) with
real-time decision making. Extracting meaningful information from these streams of data is essential
for some application areas and requires processing systems that scale to varying conditions in data
sources, complex queries, and system failures. To tap these data sources accordingly and get relevant
information, scalable processing infrastructures are required, as well as ecient approaches to enable
data integration and fusion.
This deliverable describes two contributions that can be seen as foundations for the development
of a scalable RDF stream processing engine: a set of physical operators implemented using Storm,
which provide support for the most typical types of operators required in stream-extended SPARQL
query processing; and a binary format for the compression of RDF streams that allows reducing the
overhead of data transmission on this otherwise verbose format.
Page 3 of (37)
PlanetData
Deliverable D1.7
Document Information
IST Project Number
Full Title
Project URL
Document URL
EU Project Ocer
FP7 257641
Acronym
PlanetData
http://www.planet-data.eu/
Deliverable
Number
D1.7
Title
Work Package
Number
WP1
Title
Date of Delivery
Status
Nature
Dissemination Level
Contractual
M48
version 1.0
Authors (Partner)
UPM
Name
Responsible Author
Leonhard Maqua
Keywords
Page 4 of (37)
Distributed Streaming RDF Storage
Engine
Data Streams and Dynamicity
Actual
nal M48
Report (R) Prototype (P) Demonstrator (D) Other (O) Public (PU) Consortium (CO)
Partner
Abstract
(for dissemination)
PlanetData
Alejandro
Llaves (UPM)
UPM
E-mail
[email protected]
Phone
(+34) 913363670
In this deliverable, we describe two main contributions towards the
development of a scalable RDF stream processing engine, which can be
reused as well by other ongoing RDF stream processing engine
implementations. First, we present a set of parallelizable Storm operators
together with an architecture overview of the prototype under
development. Second, we introduce a novel approach for RDF stream
compression called Ecient RDF Interchange (ERI) format. At the end of
the document, we recommend an evaluation plan and the future work
directions in our agenda. A short paper and a full paper related to this
work have been accepted at ISWC 2014's OrdRing workshop and ISWC
2014's research track, respectively.
RDF Stream Processing, RDF Stream Compression, Scalability
Deliverable D1.7
PlanetData
Version Log
Issue Date
05/09/2014
Rev. No.
0.1
12/09/2014
0.2
17/09/2014
0.3
Alejandro Llaves and
Oscar Corcho
19/09/2014
0.4
Alejandro Llaves
24/09/2014
0.5
29/09/2014
1.0
Alejandro Llaves and
Oscar Corcho
Alejandro Llaves and
Oscar Corcho
Author
Alejandro Llaves, Javier D.
Fernández, and Oscar
Corcho
Alejandro Llaves
Change
Input based on work submitted to
ISWC 2014.
Content rened and extended after the
plenary meeting.
Content modications in sections 1 and
2 according to comments from second
reviewer.
Overall content modications
according to comments from rst
reviewer. Evaluation plan moved to
conclusions and future work section.
Added full paper (ISWC 2014) on RDF
stream compression to the Annex.
Content rened and extended in
sections 2, 3, and 4.
Final draft. Examples enriched with
more illustrative gures in section 3.
Better integration of sections.
Page 5 of (37)
PlanetData
Deliverable D1.7
Table of Contents
Executive Summary
3
Document Information
4
1 Introduction
7
2 Background
2.1 Lambda Architecture . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
2.2 Storm 101 . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
8
8
9
3 Towards an ecient RDF Stream Processing Engine
3.1 A catalogue of Storm-based operators for queries over RDF streams .
3.1.1 Triple2Graph operator . . . . . . . . . . . . . . . . . . . . . .
3.1.2 Time-windowing operator . . . . . . . . . . . . . . . . . . . .
3.1.3 Simple Join operator . . . . . . . . . . . . . . . . . . . . . . .
3.1.4 Project operator . . . . . . . . . . . . . . . . . . . . . . . . .
3.1.5 A Storm topology example to resolve a SPARQL query . . .
3.2 Architecture Overview . . . . . . . . . . . . . . . . . . . . . . . . . .
3.3 RDF Stream Compression . . . . . . . . . . . . . . . . . . . . . . . .
4 Conclusions and Future Work
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
.
11
11
11
11
12
12
12
14
15
17
5 ANNEX: Ecient RDF Interchange (ERI) Format for RDF Data Streams. ISWC2014 paper. 21
Page 6 of (37)
Deliverable D1.7
PlanetData
1 Introduction
The origins of Linked Stream Data [15, 10] are in the enrichment of the Sensor Web with spatial,
temporal, and thematic metadata [16]. Standardization initiatives such as the W3C Semantic Sensor
Network Incubator Group1 and its SSN ontology [4] have fostered the publication of sensor datasets
as Linked Stream Data and its consequent integration with other datasets. A more recent initiative
is the W3C RDF Stream Processing (RSP) Community Group, which works on dening a common
model for producing, transmitting, and continuously querying data streams encoded in RDF, and
where PlanetData members are active contributors.2
Extracting information from data streams is complex because of the heterogeneity of the data,
the rate of data generation, and the high volumes emitted. Additionally, the data provenance is often
unclear because of the lack of proper metadata. One potential use case is real-time monitoring of public
transportation in a city. Here, decisions on unexpected events, such as a car crash, should be taken on
short time slots based on a set of spatio-temporal data streams coming from dierent providers. This
means reasoning over data in the temporal order that it is consumed by the processing system. Linked
Stream Data may help to integrate datasets from dierent providers and to solve interoperability
problems. Yet, remaining challenges require solutions from a stream processing approach.
The development of scalable services for real-time stream processing involves support for high
throughput, management of complex queries, low latency response, fault-tolerance, and statistics extraction, among others. Nowadays, cloud services oer solutions to some of these problems, e.g. by
applying load balancing in the presence of input data bursts. We mainly focus on the ecient processing of user queries over RDF streams (C.1), which requires parallelization at the physical query
operator level.
Distributed computing refers to the processing of data in distributed systems. The continuous
transmission of data (C.2) between sources and processing nodes, and among processing nodes may
cause response delays that should be minimized when possible. We also focus here on reducing latency
by addressing the size of data exchange.
Finally, the integration of historical and real-time data with background knowledge (C.3) is challenging in Web-scale environments. Many RSP systems combine background knowledge with real-time
processing, but historical data management is often overlooked [11]. Traditional systems tend to store
data aggregates in order to save space, but with the capacity of current systems it is possible (and
recommended) to store all data in raw format and dene views on data batches [12]. The ecient
management of historical data is essential to detect trends in data, extract statistics, or compare old
data with current data to identify anomalies [11]. For instance, by aggregating metro users on the last
hours and compare the numbers to the average number of users during the last days.
In this deliverable, we present two main contributions that address the challenges described above.
First, a preliminary set of join operators to eciently resolve queries over RDF streams in a distributed
environment. Second, a compression format for RDF streams that alleviate data tranmission times.
The remainder of the document is structured as follows: section 2 introduces background information
about the architecture model and the proposed real-time processing system, i.e. Storm;3 section 3 is the
main section of the deliverable and explains the design and implementation of the parallelizable Storm
operators, an architecture overview of the real-time processing layer, and the RDF stream compression
format; and nally, section 4 concludes the deliverable with the evaluation plan and some hints for
future work.
1
http://www.w3.org/2005/Incubator/ssn/
http://www.w3.org/community/rsp/
3
http://storm.incubator.apache.org/
2
Page 7 of (37)
PlanetData
Deliverable D1.7
2 Background
This section explains basic concepts related to the architecture of the RDF streaming engine, as well
as the real-time processing technology used for the prototype implementation.
2.1
Lambda Architecture
The Lambda Architecture [12], see gure 2.1, addresses the problem of computing arbitrary functions
on arbitrary data in real-time by using three layers: a batch layer (e.g. based on Hadoop1 ) stores
all the incoming data in an immutable master dataset and pre-computes batch views on historic data
(section 1, C.3); a serving layer (e.g. based on SQL/NoSQL database) indexes views on the master
dataset; and a speed layer manages the real-time processing issues and requests data views depending
on incoming queries [12].
Figure 2.1: Diagram of Lambda Architecture (from [12]). (A) Data input is consumed by the batch
layer and the speed layer. The batch layer stores all the raw data (B) and continuously computes query
functions to create batch views (C). The serving layer indexes batch views to access them quickly (D).
The speed layer only processes recent data to produce real-time views (E). Batch and real-time views
are integrated to respond queries (F).
The reasons for implementing a Lambda Architecture to process huge amounts of data are based on
problems with scalability and complexity in traditional systems. Conventional architectures deployed
for this task, i.e. which are often built upon a web server and a centralized database, were not designed
to deal with the huge volumes of data that are common nowadays. First, fault-tolerance mechanisms
had to be implemented manually in traditional systems, whereas the Lambda Architecture provides
built-in fault-tolerance because of its immutable master dataset. Moreover, the distributed nature of
1
http://hadoop.apache.org/
Page 8 of (37)
Deliverable D1.7
PlanetData
the database used in the Lambda Architecture facilitates scaling the storage capacity. However, scaling
a centralized database requires an expensive maintenance cost.
The cloud platform (or cluster) chosen to deploy our engine should provide computing resources
that scale on demand, as well as load balancing. Because of these requirements, our main option is the
AWS EC2 platform, which oers scalable and highly congurable cloud computing services, as well as
elastic load balancing.2 On August 2014, our project received an Amazon Research Grant for 2.500$
in AWS credits.
2.2
Storm 101
Storm is a distributed system for real-time processing of streams. A stream is an innite sequence
of tuples. A tuple is a list of named values (elds) that can be objects of any type. Storm oers
primitives to do transformations on streams, such as ltering or aggregations, among others. Spouts
act as stream sources, whereas bolts represent processing nodes. Networks of spouts and bolts are
packed into topologies, which are acyclic and directed computation graphs. Figure 2.2 shows an
example of a Storm topology.
Figure 2.2: Example of Storm topology (from http://storm.incubator.apache.org/).
As a previous step to working with Storm, we analyzed the state of the art in distributed real-time
data processing. Many promising projects arose in the last years, such as Spark Streaming,3 S4,4 and
Samza,5 among others. However, in most cases we found they were still immature and not widely
adopted technologies at the time of starting with the implementation. Currently, Spark Streaming is
getting relevance due to its integration with the Spark batch processing system, and it is one of the
candidate technologies to consider for future work.
In the eld of RDF stream processing, there are also some systems that are worth to highlight:
CQELS Cloud [9] and ETALIS6 [1] (both using Storm), INSTANS [13], and morph-streams [3]. On the
one hand, INSTANS and morph-streams are not distributed, which hinders their scaling capabilities.
On the other hand, ETALIS is an event-oriented system, but our goal is to process pure data streams
2
http://aws.amazon.com/es/ec2/
http://spark.apache.org/streaming/
4
http://incubator.apache.org/s4/
5
http://samza.incubator.apache.org/
6
http://code.google.com/p/etalis/
3
Page 9 of (37)
PlanetData
Deliverable D1.7
and avoid event modeling where possible. To the best of our knowledge, CQELS Cloud is the reference
system in terms of distributed RDF stream processing engines. Our purpose is to improve CQELS
Cloud performance by using adaptive processing techniques (see section 4), but this is out of the scope
of the document at the current stage of implementation of our engine.
We decided to use Storm because of their simple processing model, the ease for parallelizing realtime computations, the open source community backing the project, and the number of relevant companies and projects using the system (e.g. Twitter). Storm oers fault tolerance mechanisms by
acknowledging every tuple that is processed. It provides high throughput7 and parallelism conguration parameters to scale topologies easily. More information about Storm can be found in the online
documentation.8
In the next section, we describe a set of Storm operators for executing queries over RDF streams.
These real-time operators will be part of the speed layer in our Lambda architecture.
7
8
Up to a million tuples processed per second/node, see http://storm.incubator.apache.org/about/scalable.html.
http://storm.incubator.apache.org/documentation/Tutorial.html
Page 10 of (37)
Deliverable D1.7
PlanetData
3 Towards an ecient RDF Stream Processing Engine
Heterogeneous data streams are generated from dierent sources, at dierent rates, and include multiple
domains. Our goal is to develop a distributed RDF stream processing engine capable of parallelizing
continuous queries. We address three aspects of stream processing in this prototype: parallelism, data
compression, and a modular architecture design that will help to integrate historical data in future
versions of the system.
3.1
A catalogue of Storm-based operators for queries over RDF streams
In section 2.2, we introduced the basic features of Storm. In the following subsections, we describe the
implementation of Storm bolts for specic tasks, such as triple aggregation, time-windowing, joining,
and projecting. At the end of the section, we illustrate how these bolts can operate together with an
example of Storm topology.
3.1.1
Triple2Graph operator
The W3C RDF Stream Processing (RSP) Community Group has the goal of dening a common model
and best practices for RDF Stream Processing.1 Recently, there has been an agreement on the model
to use for representing RDF streams. The proposed model includes sets of triples in a graph and the
graph production timestamp: <Graph, t>. For this reason, we developed a Storm operator (bolt) that
consumes streams of triples and returns streams of timestamped graphs, the Triple2Graph operator,
see gure 3.1.2 The parameter of the operator is a triple pattern that denes the starting of each
graph.
Figure 3.1: Triple2Graph operator I/O schema.
3.1.2
Time-windowing operator
One common aspect in real-time processing queries is the continuous generation of results. Querying
an unbounded stream is technically not possible because of the innite nature of the stream. In stream
processing engines, this is achieved by means of time windows that delimit streams temporally, e.g. a
30-seconds window includes all the messages in the stream from the past 30 seconds until now.
This operator, see gure 3.2, takes as input a stream of timestamped graphs and returns a set of
graphs within the window size time period dened in the parameters.3 To congure the frequency of
emission, a second parameter called emission time, is needed.
1
http://www.w3.org/community/rsp/
Source code available at https://github.com/allaves/storm-query-operators/blob/master/src/bolt/
Triple2GraphBolt.java
3
Source code available at https://github.com/allaves/storm-query-operators/blob/master/src/bolt/
RollingWindowBolt.java
2
Page 11 of (37)
PlanetData
Deliverable D1.7
Figure 3.2: Time windowing operator I/O schema.
3.1.3
Simple Join operator
The Simple Join operator, see gure 3.3, processes pairs of tuple sets and produces a stream of joined
tuple sets.4 We are not discussing at this point which kind of join operation is used. It can be a nested
loop join, merge join, or hash join, among many others. The focus here is on the operator input and
output, and how this aects the design of the topology.
Figure 3.3: Simple join operator I/O schema.
3.1.4
Project operator
The Project operator, see gure 3.4, is the simplest one of the operators described in this section. Its
function consists in selecting a subset of elds from the input tuples and emit them as output tuples.5
Actually, it is a direct implementation of the SELECT clause in SPARQL.
Figure 3.4: Project operator I/O schema.
3.1.5
A Storm topology example to resolve a SPARQL query
A topology is a directed and acyclic graph where spouts and bolts are the vertices, and streams are the
links. To illustrate the transformation from queries to topologies, we selected for this section a simple
RDF model for sensor observations, see gure 3.5.
4
Source
code
available
at
https://github.com/allaves/storm-query-operators/blob/master/src/bolt/
Source
code
available
at
https://github.com/allaves/storm-query-operators/blob/master/src/bolt/
RDFSimpleJoinBolt.java
5
ProjectionBolt.java
Page 12 of (37)
Deliverable D1.7
PlanetData
Figure 3.5: Concept map of the RDF model for the topology example, based on the SSN Ontology [4].
The data is splitted into two RDF data streams: <obs> and <sensors>. <obs> is a stream that
emits triples describing sensor observations. Each observation has a value and a timestamp, and is
produced by a sensor. The following listing is an example of <obs> triples:
: obs1 a s s n : O b s e r v a t i o n ;
s s n : o b s e r v a t i o n R e s u l t : s en sor Ou tp ut 1 ;
s s n : o b s e r v a t i o n S a m p l i n g T i m e : timestamp1 ;
s s n : observedBy : s e n s o r 1 .
: sen so rO ut pu t1 s s n : hasValue : obsValue1 .
: obsValue1 d u l : hasDataValue "1.4"^^ xsd : f l o a t .
: timestamp1 time : inXSDDateTime "Sun Sep 01 0 1 : 0 2 : 0 2 2013"^^ xsd : d a t e t i m e .
<sensors> is a stream of sensor locations. The tuples in the stream contain sensor identiers
attached to a spatial location. The triples emitted by <sensors> follow this structure:
: sensor1 a ssn : Sensor ;
geo : hasGeometry : geometry1 .
: geometry1 geo :asWKT "<EPSG:4258 > POINT( 2 7 . 7 − 18.1)"^^ geo : w k t L i t e r a l .
A client submits a query over the two RDF streams, <obs> and <sensors>, with a time window
of 60 seconds each and a sliding period of 10 seconds. This listing in SPARQLStream [3] denes the
query, which uses some of the operators explained in previous section:
SELECT ? v a l u e ? l o c a t i o n ? time
FROM NAMED STREAM <h t t p : / / s t r e a m r e a s o n i n g . o r g / data / obs . s r d f > [NOW − 60 S ]
FROM NAMED STREAM <h t t p : / / s t r e a m r e a s o n i n g . o r g / data / s e n s o r . s r d f > [NOW − 60 S ]
WHERE {
? obs a s s n : Ontology .
? obs s s n : o b s e r v a t i o n R e s u l t ? s en so rOu tp ut .
? obs s s n : o b s e r v a t i o n S a m p l i n g T i m e ? timestamp .
? obs s s n : observedBy ? s e n s o r .
? se ns or Ou tpu t s s n : hasValue ? obsValue .
? obsValue d u l : hasDataValue ? v a l u e .
? timestamp time : inXSDDateTime ? time .
? s e n s o r geo : hasGeometry ? geometry .
? geometry geo :asWKT ? l o c a t i o n . }
Page 13 of (37)
PlanetData
Deliverable D1.7
Figure 3.6: Topology example for the query described above.
The query returns every 10 seconds the values observed at the corresponding sensor locations during
the last 60 seconds. The graph in gure 3.6 represents one possible topology for the query described
above. On the left side, two spouts generate the RDF data streams for <sensors> and <obs>. The
Triple2Graph bolt transforms triples into graphs and send the results to the Windowing bolts. The
results of <t0, t1, t2,..., tn> are joined together in alternative Simple Join bolts, using the sensor
identier as the join attribute. Finally, the elds obs.value and sensors.location are projected
from the joined tuples to the output.
3.2
Architecture Overview
The architecture for our distributed RDF stream processing engine is an extension of the architecture
of morph-streams [3]. Figure 3.7 includes a data ow diagram of the real-time processing layer.
Figure 3.7: Architecture diagram of the real-time processing layer.
The prototype takes as input client queries in SPARQLStream [3]. The Query Rewriting module
transforms each query into an algebra tree. This algebra tree denes the operations that has to execute
Page 14 of (37)
Deliverable D1.7
PlanetData
our engine to get query responses. The current version of the system reuses the Query Rewriting
modules from morph-streams, which is a centralized engine. The plan for future versions is to improve
the Query Rewriting module in order to obtain algebra trees optimized to be resolved in a distributed
environment.
The Query Processing module manages the data streaming input, via Storm spouts or other streaming engines, and an algebra tree for each query registered in our engine. The result of the query
processing is a mapping between the algebra tree and the query operators available. This mapping is
represented as a Storm topology. Currently, our system is able to resolve SPARQLStream queries that
involve the operators presented in the previous section, with a maximum number of two input streams.
We are working to extend the range of queries that can be submitted.
Storm topologies generated in the Query Processing module are sent to the Topology Manager.
The Topology Manager handles the topology deployment details, such as the level of parallelization,
and submits a distributed task to be executed in the Amazon AWS Cloud. The results of the topology
execution are returned to the Result Handler. The communication between the Result Handler and the
client can be congured a priori, for instance, to receive results as they are generated or periodically.
3.3
RDF Stream Compression
In section 1, we mentioned the latency increase caused by the continuous transmission of data in
distributed environments (C.2), such as the execution environment of our system. This section presents
an overview on a compression format that alleviates the cost of publishing and interchanging RDF data
streams: the Ecient RDF Interchange (ERI) format. ERI exploits the structure regularity and the
redundancy of values in RDF data streams. We summarize in this section the main features of the
ERI format and point to [6] (included in the Annex) for more details.
To date, universal compressors (e.g. gzip) and specic RDF compressors (e.g. HDT [7]) are
commonly used to reduce RDF exchange costs and delays on the network. These approaches, though,
consider a static view of RDF datasets, disregarding the dynamic nature of RDF stream management.
A recent work [8] points out the importance of ecient RDF stream compression and proposes an
initial solution leveraging the inherent data redundancy of the RDF data streams. The ERI format is
based on this proposal and has proven to generate optimized streaming compression, while remaining
ecient in performance.
Figure 3.8: ERI processing model.
The ERI Processor consists of two modules that encode an RDF data stream into a compressed
data stream, or decode a compressed data stream into an RDF data stream. Figure 3.8 shows a
Page 15 of (37)
PlanetData
Deliverable D1.7
diagram of the ERI processing model. The ERI processor leverages on two information sets to enhance
compactness: the Structural Dictionary and the Presets. The Structural Dictionary stores a dynamic
catalog of structural patterns found for a given set of triples. The Presets encode data stream and
take advantage of the common predicates that are repeated over the stream. We assume that these
predicates are identied by data providers. Otherwise, they can be statistically detected at run time.
ERI follows an encoding procedure similar to the W3C Recommendation EXI [14]: each set of
triples is multiplexed into channels. On the one hand, structural channels hold the information related
to the structure of the triple set and keep the Structural Dictionary updated. On the other hand, value
channels manage the information of the values in the triple set.
For the evaluation, which can be seen in detail in the Annex and at http://dataweb.infor.uva.
es/wp-content/uploads/2014/07/iswc14.pdf, we used three types of datasets: streaming datasets,
statistical datasets, and general static datasets. We tested dierent congurations of ERI, regarding
the size of the transmitted data blocks, and compared them to other stream compression approaches,
i.e. HDT [7] and RDSZ [8]. More details can be found in the Annex.
Experiments show that ERI produces state-of-the-art compression for RDF streams and it excels
for regularly-structured static RDF datasets. Additionally, ERI compression ratios remain competitive
in general datasets. Last but not least, the resulting time overheads for ERI processing are relatively
low and can be assumed in many scenarios.
Page 16 of (37)
Deliverable D1.7
PlanetData
4 Conclusions and Future Work
In this deliverable, we have presented the rst steps towards a distributed RDF streaming engine,
including (i) a set of Storm operators for processing RDF streams according to the needs from streambased extensions of SPARQL (e.g. SPARQLStream) and (ii) an extensible architecture proposal. Our
medium-term goal is to build an engine capable of adapting to changing conditions while serving
complex continuous queries. To reach an ecient query processing over data streams (section 1, C.1)
we will focus on query execution planning. Traditional databases include a query optimizer that designs
an execution plan based on the registered query and data statistics. In a distributed stream processing
environment, there are several aspects to contemplate: changing rates of the input data, failure of
processing nodes, and distribution of workload, among others. Adaptive Query Processing (AQP)
techniques [2, 5] allow adjusting the query execution plan to varying conditions of the data input, the
incoming queries, and the system. Additionally, it is used to correct query optimizer mistakes and
cope with unknown statistics [2]. While new data is entering the system, a dedicated bolt will manage
stream data statistics to reassign a dierent topology if data stream conditions vary. In Storm, the
coordination between the master node, which controls the assignment of tasks to spouts and bolts,
and the worker nodes is managed by Zookeeper.1 However, Zookeeper does not provide elastic load
balancing o-the-shelf. The use of cloud services for this purpose, such as the one oered by Amazon
EC2, will be addressed in the near future.
The implementation described in this deliverable has not been evaluated yet because development
is still ongoing. However, the preliminary evaluation plan includes the following steps:
• Select a subset of SRBench queries.2 This benchmark collects 17 queries of dierent complexity
and in three diverse SPARQL-like languages for streams. For the evaluation, we will select a
subset of these queries in SPARQLStream.
• Create the corresponding topologies. For each query, we will generate one or more Storm topologies and check that the stream processing ow corresponds to the query purpose.
• Test them in a distributed environment with synthetic data. The initial plan is to use a synthetic
stream data source, such as the SRBench data generator. In case of having problems with the
SRBench generator, another data source would be considered and the queries correspondingly
adapted.
• Comparison to other RDF streaming engines. The nal step of tour evaluation plan would be
benchmarking our system and compare it to CQELS Cloud [9], which is the current reference in
terms of distributed RDF stream processing engines.
Additionally, we described a new RDF stream compression format named Ecient RDF Interchange
(ERI) that i) minimizes the data exchange among processing nodes (section 1, C.2) while ii) serving
a small set of operators on the compressed data. An ERI processor will be integrated as part of the
distributed engine in the near future.
We focused in this document on real-time processing, which corresponds to the speed layer in the
Lambda Architecture. To address the challenge of integrating real-time and historical data (section 1,
C.3), we plan to extend our engine with a serving layer and a batch layer, as described in section 2.1.
1
2
http://zookeeper.apache.org/
http://www.w3.org/wiki/SRBench
Page 17 of (37)
PlanetData
Deliverable D1.7
Acknowledgements
The presented research has been supported by an AWS in Education Research Grant award.
Page 18 of (37)
Deliverable D1.7
PlanetData
References
[1] Darko Anicic, Paul Fodor, Sebastian Rudolph, and Nenad Stojanovic. EP-SPARQL: A Unied
Language for Event Processing and Stream Reasoning. In Proceedings of the 20th International
Conference on World Wide Web, WWW '11, pages 635644, New York, NY, USA, 2011. ACM.
[2] Shivnath Babu and Pedro Bizarro. Adaptive Query Processing in the Looking Glass. In Proceedings of the Second Biennial Conference on Innovative Data Systems Research (CIDR), Jan. 2005,
2005.
[3] Jean-Paul Calbimonte, Hoyoung Jeung, Oscar Corcho, and Karl Aberer. Enabling Query Technologies for the Semantic Sensor Web. International Journal on Semantic Web and Information
Systems, 8(1):4363, 2012.
[4] Michael Compton, Payam Barnaghi, Luis Bermudez, Raul Garcia-Castro, Oscar Corcho, Simon
Cox, John Graybeal, Manfred Hauswirth, Cory Henson, Arthur Herzog, Vincent Huang, Krzysztof
Janowicz, W. David Kelsey, Danh Le Phuoc, Laurent Lefort, Myriam Leggieri, Holger Neuhaus,
Andriy Nikolov, Kevin Page, Alexandre Passant, Amit Sheth, and Kerry Taylor. The ssn ontology
of the w3c semantic sensor network incubator group. Web Semantics: Science, Services and Agents
on the World Wide Web, 17(0), 2012.
[5] Amol Deshpande, Zachary Ives, and Vijayshankar Raman. Adaptive Query Processing. Foundations and Trends in Databases, 1(1):1140, January 2007.
[6] Javier D Fernández, Alejandro Llaves, and Oscar Corcho. Ecient RDF Interchange (ERI) Format
for RDF Data Streams (accepted). In ISWC 2014, 2014.
[7] Javier D Fernández, Miguel A Martínez-Prieto, Claudio Gutiérrez, Axel Polleres, and Mario Arias.
Binary RDF representation for publication and exchange (HDT). Web Semantics: Science, Services and Agents on the World Wide Web, 19(0):2241, 2013.
[8] N. Fernández García, J. Arias Fisteus, L. Sánchez Fernández, D. Fuentes-Lorenzo, and O. Corcho.
RDSZ : An approach for lossless RDF stream compression. In In 11th European Semantic Web
Conference (ESWC 2014), pages 116, Crete, Greece, 2014.
[9] Danh Le-phuoc, Hoan Nguyen, Mau Quoc, Chan Le Van, and Manfred Hauswirth. Elastic and
Scalable Processing of Linked Stream Data in the Cloud. In Harith Alani, Lalana Kagal, Achille
Fokoue, Paul Groth, Chris Biemann, Josiane Xavier Parreira, Lora Aroyo, Natasha Noy, Chris
Welty, and Krzysztof Janowicz, editors, The Semantic Web – ISWC 2013, volume 287305,
pages 280297. Springer Berlin Heidelberg, 2013.
[10] Danh Le-phuoc, Josiane Xavier Parreira, and Manfred Hauswirth. Linked Stream Data Processing.
In Reasoning Web. Semantic Technologies for Advanced Query Answering, pages 245289. Springer
Berlin Heidelberg, 2012.
[11] Alessandro Margara, Jacopo Urbani, Frank van Harmelen, and Henri Bal. Streaming the web:
Reasoning over dynamic data. Web Semantics: Science, Services and Agents on the World Wide
Web, 0(0), 0.
[12] Nathan Marz and James Warren. Big Data: principles and best practices of scalable realtime
data systems. Manning Publications, 2013.
[13] Mikko Rinne, Esko Nuutila, and T Seppo. INSTANS: High-Performance Event Processing with
Standard RDF and SPARQL. In Cory Henson, Kerry Taylor, and Oscar Corcho, editors, Proceedings of the 5th International Workshop on Semantic Sensor Networks, pages 8196, Boston,
Massachusetts, USA, 2012. CEUR-WS.
Page 19 of (37)
PlanetData
Deliverable D1.7
[14] J. Schneider, T. Kamiya, D. Peintner, and R. Kyusakov. Ecient XML Interchange (EXI) Format
1.0 (Second Edition). W3C Recommendation, 2014.
[15] Juan F Sequeda and Oscar Corcho. Linked Stream Data: A Position Paper. In Proceedings of
the 2nd International Workshop on Semantic Sensor Networks, SSN 09, Washington, USA, 2009.
CEUR-WS.
[16] Amit Sheth, Cory Henson, and Satya S Sahoo. Semantic Sensor Web. IEEE Internet Computing,
12(4):7883, 2008.
Page 20 of (37)
Deliverable D1.7
PlanetData
5 ANNEX: Ecient RDF Interchange (ERI) Format for RDF Data Streams.
ISWC2014 paper.
Page 21 of (37)
Efficient RDF Interchange (ERI) Format for
RDF Data Streams
Javier D. Fern´
andez, Alejandro Llaves, Oscar Corcho
Ontology Engineering Group (OEG), Univ. Polit´ecnica de Madrid (Spain)
{jdfernandez,allaves,ocorcho}@fi.upm.es
Abstract. RDF streams are sequences of timestamped RDF statements
or graphs, which can be generated by several types of data sources (sensors, social networks, etc.). They may provide data at high volumes and
rates, and be consumed by applications that require real-time responses.
Hence it is important to publish and interchange them efficiently. In this
paper, we exploit a key feature of RDF data streams, which is the regularity of their structure and data values, proposing a compressed, efficient
RDF interchange (ERI) format, which can reduce the amount of data
transmitted when processing RDF streams. Our experimental evaluation
shows that our format produces state-of-the-art streaming compression,
remaining efficient in performance.
1
Introduction
Most of the largest RDF datasets available so far (e.g. Bio2RDF1 , LinkedGeoData2 , DBpedia3 ) are released as static snapshots of data coming from one
or several data sources, generated with some ETL (Extract-Transform-Load)
processes according to scheduled periodical releases. That is, data are mostly
static, even when they contain temporal references (e.g. the Linked Sensor Data
dataset, which contains an historical archive of data measured by environmental sensors). Typical applications that make use of such datasets include those
performing simulations or those training numerical models.
In contrast, some applications only require access to the most recent data,
or combine real-time and historical data for different purposes. In these cases
a different approach has to be followed for RDF data management, and RDF
streams come into play. RDF streams are defined as potentially unbounded sequences of time varying RDF statements or graphs, which may be generated
from any type of data stream, from social networks to environmental sensors.
Several research areas have emerged around RDF streams, e.g. temporal representation in RDF [12, 6, 11], or RDF stream query languages and processing
engines (C-SPARQL [2], SPARQLStream and morph-streams [4], CQELS Cloud
[15], Ztreamy [1]). The recently-created W3C community group on RDF Stream
1
2
3
http://bio2rdf.org/.
http://linkedgeodata.org/.
http://www.dbpedia.org/.
Processing is working on the provision of “a common model for producing, transmitting and continuously querying RDF Streams”4 .
In this paper we focus on the efficient transmission of RDF streams, a necessary step to ensure higher throughput for RDF Stream processors. Previous
work on RDF compression [9, 14] shows important size reductions of large RDF
datasets, hence enabling an efficient RDF exchange. However, these solutions
consider static RDF datasets, and need to read the whole dataset to take advantage of data regularities. A recent proposal, RDSZ [10], shows the benefits
of applying the general-purpose stream compressor Zlib [8] to RDF streams,
and provides a compression algorithm based on the difference of subject groups
(provided in Turtle [17]), with some gains in compression (up to 31% w.r.t. Zlib).
Our work sets on the basis of RDSZ and exploits the fact that in most RDF
streams the structural information is well-known by the data provider, and the
number of variations in the structure are limited. For instance, the information
provided by a sensor network is restricted to the number of different measured
properties, and in an RDF context the SSN ontology [5] will be probably used for
representing such data. Furthermore, given that “regularities” are also present
in very structured static datasets (e.g. statistical data using the RDF Data
Cube Vocabulary [7]), our approach may be also applicable to those datasets.
Thus, our preliminary hypothesis states that our proposed RDF interchange
format can optimize the space and time required for representing, exchanging,
and parsing RDF data streams and regularly-structured static RDF datasets.
In this paper, we propose a complete efficient RDF interchange (ERI) format
for RDF streams. ERI considers an RDF stream as a continuous flow of blocks
(with predefined maximum size) of triples. Each block is modularized into two
main sets of channels to achieve large spatial savings:
– Structural channels: They encode the subjects in each block and, for each
one, the structural properties of the related triples, using a dynamic dictionary of structures.
– Value channels: They encode the concrete data values held by each predicate
in the block in a compact fashion.
We also provide a first practical implementation with some decisions regarding the specific compression used in each channel. An empirical evaluation over
a heterogeneous corpora of RDF streaming datasets shows that ERI produces
state-of-the-art compression, remaining competitive in processing time. Similar
conclusions can be drawn for very regular datasets (such as statistical data) and
general datasets in which the information is strongly structured.
Our main contributions are: (i) the design of an efficient RDF interchange
(ERI) format as a flexible, modular and extensible representation of RDF streams;
(ii) a practical implementation for ERI which can be tuned to cope with specific
dataset regularities; and (iii) an evaluation that shows our gains in compactness
w.r.t. current compressors, with low processing overheads.
The rest of the paper is organized as follows. Section 2 reviews basic foundations of RDF streaming and compression. Our approach for efficient RDF
4
http://www.w3.org/community/rsp/.
interchange (ERI) is presented in Section 3, as well as a practical deployment for
ERI encoding and decoding. Section 4 provides an empirical evaluation analyzing
compactness and processing efficiency. Finally, Section 5 concludes and devises
future work and application scenarios.
2
Background and Related Work
A key challenge for stream processing systems is the ability to consume large volumes of data with varying and potentially large input rates. Distributed stream
processing systems are a possible architectural solution. In these systems, the
circulation of data between nodes takes an amount of time that depends on
parameters like data size, network bandwidth, or network usage, among others.
Hence it is crucial to minimize data transmission time among processing nodes.
To reach this goal, our work focuses on RDF stream compression techniques.
RDF compression is an alternative to standard compression such as gzip. It
leverages the skewed structure of RDF graphs to get large spatial savings. The
most prominent approach is HDT [9], a binary format that splits and succinctly
represents an RDF dataset with two main components: the Dictionary assigns
an identifier (ID) to all terms in the RDF graph with high levels of compression,
and the Triples uses the previous mapping and encodes the pure structure of
the underlying RDF graph. HDT achieves good compression figures while providing retrieving features to the compressed data [9]. However, these are at the
cost of processing the complete dataset and spending non-negligible processing
time. The same applies to other recent RDF compression approaches based on
inferring a grammar generating the data [14] or providing other dictionary-based
compression on top of MapReduce [19].
Streaming HDT [13] is a deviation from HDT that simplifies the associated
metadata and restricts the range of available dictionary IDs. Thus, the scheme
is a simple dictionary-based replacement which does not compete in compression but allows operating in constrained devices. RDSZ [10] is the first specific
approach for RDF streaming compression. RDSZ takes advantage of the fact
that items in an RDF stream usually follow a common schema and, thus, have
structural similarities. Hence it uses differential encoding to take advantage of
these similarities, and the results of this process are compressed with Zlib to
exploit additional redundancies. Experiments show that RDSZ produces gains
in compression (17% on average) at the cost of increasing the processing time.
The increasing interest on RDF compression over streaming data has also
been recently highlighted by RDF stream processing systems such as CQELS
Cloud [15] and Ztreamy [1]. The first one uses a basic dictionary-based approach
to process and move fixed-size integers between nodes. The latter exploits the
Zlib compressor with similar purposes. In addition, it is also relevant to detect
trends in data, extract statistics, or compare historic data with current data
to identify anomalies, although historical data management is not considered in
most of stream processing systems [16]. A potential use case of RDF compression
may be the integration of historical data and real-time data streams.
Fig. 1. ERI processing model.
3
Efficient RDF Interchange (ERI) Format
The ERI format is a compact RDF representation designed to leverage the inherent structural and data redundancy in RDF streams. In the following, we
introduce the basic concepts behind the ERI format, and we present a practical
implementation for encoding and decoding RDF data.
3.1
Basic Concepts
In ERI, we consider the generic processing model depicted in Figure 1. In this
scheme, RDF data, potentially in the form of a data stream, is encoded or
decoded to ERI, resulting in a compressed data stream. We refer to an ERI
processor as any application able to encode such RDF data to ERI or to decode
the ERI compressed stream (defined below) to make the RDF data accessible. A
processor mainly leverages on two information sets to improve compactness: (i)
the Structural Dictionary and (ii) the Presets, defined as follows.
The Structural Dictionary holds a dynamic catalog of all different structural patterns found for a given set of triples called Molecules.
Definition 1 (RDF (general) molecule). Given an RDF graph G, an RDF
molecule M ⊆ G is a set of triples {t1 , t2 , · · · , tn }.
Molecules are the unit elements for encoding; each molecule will be codified as
its corresponding identifier (ID) in the dictionary of structures and the concrete
data values held by each predicate.
The most basic (but inefficient) kind of grouping is at the level of triples (one
group per triple), i.e. having as many molecules as the total number of triples in
the RDF data. In this case, the Structural Dictionary will assign an ID to each
structure which is just the predicate in the triple. Trying to set up larger groups
sharing regularities is much more appropriate.
A straightforward approach is to consider the list of all triples with the
same subject (similar to abbreviated triple groups in Turtle [17]). We take this
grouping as the method by default, then managing RDF subject-molecules:
Definition 2 (RDF subject-molecule). Given an RDF graph G, an RDF
subject-molecule M ⊆ G is a set of triples {t1 , t2 , · · · , tn } in which subject(t1 ) =
subject(t2 ) = · · · = subject(tn ).
Fig. 2. Example of molecules and their Structural Dictionary.
Note that an RDF stream can be seen as a sequence of (potentially not disjoint) RDF subject-molecules5 . Figure 2 illustrates two molecules in a sequence
of weather sensor data and their entry in the Structural Dictionary. This data
excerpt (inspired by the data examples in SRBench [20]) represents temperature measures of a sensor at two sampling times. As can be seen, the lists of
predicates is exactly the same for both molecules. In addition, we can observe
regularities in certain property values (in different color). In particular, the values for rdf:type, om-owl:observedProperty and om-owl:procedure are exactly the
same, and will be repeated throughout the data stream for all the air temperature observations of the same sensor. We call this type of predicates producing
massive data repetitions, discrete predicates. Thus, we avoid these repetitions
and save space codifying the concrete values for discrete predicates as part of the
structural patterns, as shown in Figure 2 (right). In this example, the structure
in the dictionary is encoded as the list of related predicates and, for each one,
it counts the number of objects for the predicate and the aforementioned fixed
property value if the predicate is discrete.
We assume that discrete predicates can be easily identified by streaming
data providers and set up before encoding, or they can be statistically detected
at run time. In any case, this kind of information that must be shared between
encoders and decoders, is kept in the information set called Presets. Presets
include all the configuration and compression-oriented metadata supplied by
the data provider or inferred at run time. We distinguish between (a) mandatory
features in Presets, which include the aforementioned set of discrete predicates
and the selected policy for grouping triples into molecules, and (b) applicationspecific configurations. The latter opens up the format for extensions as long as
the concrete application clearly states the decisions and available configurations.
For instance, specific features could include common prefixes, suffixes or infixes
in URIs and BNodes, or a set of common datatypes in some concrete predicates.
5
For simplicity, we will use the term molecules hereinafter, assuming that they are
subject-molecules by default.
3.2
ERI Streams
At a high level, an ERI Stream is a sequence of contiguous blocks of molecules,
as depicted in Figure 3. That is, ERI first splits the incoming RDF data into
contiguous blocks of a maximum predefined blockSize, measured in number of
triples and set up in the encoder. Then, the molecules (groups) within each
block are identified according to the established grouping policy. Note that the
grouping by default could slightly alter the original order of triples once it groups
triples by subject. Other grouping policies may be established for those scenarios
with specific ordering needs.
ERI follows an encoding procedure similar to that of the Efficient XML Interchange (EXI) format [18]: each molecule is multiplexed into channels:
Definition 3 ((general) Channel). A channel is a list of lower entropy items
(similar values), which is well suited for standard compression algorithms.
The idea is to maintain a channel per different type of information, so that a
standard compressor can be used in each channel, leveraging its data regularities
to produce better compression results. In ERI we distinguish between two types
of channels: (i) structural channels and (ii) value channels.
Structural channels hold the information of the structure of the molecules
in the block and keep the Structural Dictionary updated. We define the following
high-level minimum channels:
– Main Terms of molecules: In the grouping by default, it states the subject
of the grouping. Other kinds of groupings may assign different values.
– ID-Structures: It lists the ID of the structure of each molecule in the block.
The ID points to the associated structural entry in the Structural Dictionary.
– New Structures: It includes new entries in the Structural Dictionary.
Value channels organize the concrete values in the molecules of the block
for each non-discrete predicate. In short, ERI mainly considers one channel per
different predicate, listing all objects with occurrences in the molecules related
to it. Having property values of a predicate grouped together may help parsers
to directly retrieve information corresponding to specific predicates.
The complete ERI stream consists of an ERI header followed by an ERI body,
as shown in Figure 3 (bottom). The ERI header includes the identification of the
stream and the initial Presets, as previously described. The ERI body carries
the content of the streaming representing each block as (i) a set of metadata
identifying the block and updating potential changes in Presets, and (ii) its
compressed channels, using standard compression for each specific channel.
The decoding process is the exact opposite: the stream body is decompressed
by channels, and demultiplexed into blocks containing the molecules.
3.3
Practical ERI Encoding and Decoding
Now we describe our current deployment for ERI encoding and decoding. For
simplicity, we obviate citing the representation of metadata as it is relatively
easy to define a key set of keywords, and we focus on channel representations.
stream
Block
Block
Block
Mole Mole … Mole
cule cule
cule
Mole Mole … Mole
cule cule
cule
…
Mole Mole … Mole
cule cule
cule
Multiplex / Demultiplex
Structural Channels
Channels
…
Value Channels
Compression/Decompression
(per channel)
Stream
Header
META COMP COMP COMP COMP COMP COMP
DATA CHAN. CHAN. CHAN. CHAN. CHAN. CHAN.
META COMP COMP COMP COMP
DATA CHAN. CHAN. CHAN. CHAN.
META COMP COMP COMP COMP COMP COMP
DATA CHAN. CHAN. CHAN. CHAN. CHAN. CHAN.
Stream Body
Fig. 3. Overview of the ERI format.
Figure 4 illustrates our practical decisions over the previous example in Figure 2. The figure represents the structural and value channels of the data excerpt,
as well as the potential standard compression that could be used to produce each
compressed channel. Regarding structural channels, we first follow a straightforward approach for Main Terms of Molecules and list main terms (subjects) in
plain. Advanced representations could consider the usage of an additional dictionary mapping terms to identifiers, and using the corresponding identifier to
refer to a main term previously seen in the input streaming data. However, our
experience with streaming data suggests that main terms are rarely repeated
because they refer to a certain timestamp.
The ID-Structures channel lists integer IDs representing the entry in the
Structural Dictionary. New entries are identified by means of an additional channel called New Structure Marker. This channel has a bit for each ID in the IDStructures channel: a 0-bit states that the corresponding ID is already in the
Structural Dictionary, whereas a 1-bit shows that the ID points to a new entry
that is retrieved in the New Structures channel. In Figure 4, the first molecule
is related to the structure having the ID-30, which is marked as new. Then, the
concrete structure can be found in New Structures. Similarly to the example
in Figure 2, we codify each dictionary entry as the predicates in the structure,
the number of objects for each predicate and the concrete property values for
discrete predicates. To improve compactness in the representation we use a dictionary of predicates, hence the predicate in the structure is not a term but an
ID pointing to the predicate entry in this dictionary. If there is a new predicate
never seen before in a block, it is listed in an additional New Predicates channel,
as shown in Figure 4.
The decoder will maintain a pointer to the next entry to be read in New
Structures (and increment it after reading), and to hold and update the dictionary of structures and predicates. Given that the number of predicates is
relatively low in RDF datasets, we consider a consecutive list of IDs in the predicate dictionary for the encoder and decoder. For the dictionary of structures,
we use a Least Recently Used (LRU) policy for the dictionary in the encoder.
That is, whenever the maximum capacity is reached, the LRU entry is erased
Structural
Channels
Main Terms of!Molecules
ID Structures
….
sens obs:Observation_AirTemperature...55_00
sens obs:Observation_AirTemperature...45_00
….
Potential
Compression
[Strings]
Prefix compression
Zlib
Snappy
…
Value
Channels
ID pred2
…
Air!temperature at!6:55:00
Verified
Air!temperature at!7:45:00
Not Verified
…
Potential
Compression
[Object Values]
[Meta:!strings]
Zlib
Snappy
…
New
Structure
Marker
…
30
30
…
…
1
0
…
[IDs of
Structures]
[Bits]
New Structures
ID
ID
ID
ID
ID
ID
ID
New Predicates
pred1!!!!!!!!!!!
1 weather:!TemperatureObservation
pred2!!!!!2
1 weather:_AirTemperature
pred3!!!!!!!!!!!
1 sensobs:!System_4UT01
pred4!!!!!!!!!
pred5 1
pred6 1
pred7 1
Differential
…
New Object
Marker
ID pred5 ID!pred5 New!Terms
…
om owl:samplingTime
ex:CelsiusValue
…
[Encoded Structures]
[Strings]
Zlib
Snappy
…
Prefix compression
Zlib
Snappy
…
New Object
Marker
ID pred6
ID!pred6
ID!pred7
…
101
245
…
…
0
1
…
…
sens-obs:MeasureData_Air…55_00
sens-obs:Instant_2003…55_00
sens-obs:MeasureData_Air…45_00
sens-obs:Instant_2003…55_00
…
1
1
…
1
2
…
…
7.7
9.4
….
[Term IDs]
[Meta:!IDs]
[Bits]
[Strings]
[Bits]
[Term IDs]
[Meta:!IDs]
[Object Values]
[Meta: xsd:float]
Differential
…
Prefix compression
Zlib
Snappy
…
Differential
…
Differential
…
Fig. 4. Example of ERI channels.
and the ID is available to encode the next entry, which must be marked as new
in the New Structure Marker channel. Therefore, the decoder can make use of
simple hashing for the dictionary, as it always knows if the entry is new.
Regarding value channels, Figure 4 illustrates several options. The channel
ID-pred2 storing the values of the second predicate (rdfs:label ) simply lists all
the values. In contrast, ID-pred5 and ID-pred6 make use of a dictionary of objects. This case is similar to the dictionary of structures: the channels hold the
ID of the entry, and an associated list of bits (New Object Marker ID-pred5 and
New Object Marker ID-pred6, respectively) describes if this corresponds to a
new entry in the dictionary. The ERI processor maintains an object dictionary
per predicate. This decision produces shorter IDs per predicate w.r.t. maintaining one general dictionary of objects. In contrast, the processor manages more
dictionaries, although the number of different predicates always remains proportionally low, and so the number of dictionaries. In our implementation we
maintain one channel (New Terms) with all the new terms in the dictionaries.
As this list is coordinated with the IDs, there are no overlaps; the decoder must
keep a pointer to the next entry to be decoded when a 1-bit in a marker indicates
that there is a new term.
Finally, ID-pred7 also holds the object values directly, as in ID-pred2. However, as shown in the figure, it extracts the datatype of all values (xsd:float). We
assume that all property values for a given predicate are of the same kind. In
practice, this means that (i) every channel holds whether URIs/BNodes or liter-
Category Dataset
Mix
Identica
Wikipedia
AEMET-1
AEMET-2
Streaming
Petrol
Flickr Event Media
LOD Nevada
LOD Charley
LOD Katrina
Eurostat migr reschange
Eurostat tour cap nuts3
Statistics
Eurostat avia paexac
LinkedMDB
General Faceted DBLP
Dbpedia 3-8
Triples
Nt Size
(MB)
93,048
234,000
359,028
1,018,815
2,788,429
3,356,616
49,107,168
36,767,060
104,737,213
172,997,931
2,570,652
2,849,187
66,023,172
6,147,996
60,139,734
431,440,396
12
25
33
133
494
485
6,714
7,494.5
21,470
35,548
467
519
12,785
850
9,799
63,053
Subjects Predicates
17,153
56,967
119,676
33,095
398,347
419,577
5,490,007
8,188,411
23,306,816
38,479,105
285,629
316,576
7,335,909
694,400
3,591,091
24,791,728
89
12
3
59
7
8
23
10
10
10
16
17
16
222
27
57,986
Objects
36,279
116,065
215,382
5,812
403,824
355,122
15,041,664
8,201,935
23,325,858
38,503,088
2,376
47,473
235,253
2,052,959
25,154,979
108,927,201
Table 1. Description of the evaluation framework.
als and (ii) all literal values of a given predicate are of the same data type (float,
string, dateTime, etc.). We refer to this assumption as consistent predicates. Although this is common for data streams and other well-structured datasets, it
is more difficult to find general datasets in which this assumption remains true.
Thus, we set a parameter in Presets to allow or disallow consistent predicates.
Regarding potential channel compressions, Figure 4 includes some standard
compression techniques and tools for each type of data. In practice, to simplify
the following evaluation, our ERI processor uses Zlib whenever textual information is present, i.e. in the main terms of molecules, new structures, new predicates
and new terms channels. As for those channels managing IDs, each ID is encoded
with log(n) bits, n being the maximum ID in the current channel.
4
Evaluation
We implemented a first prototype of an ERI processor in Java, following the
aforementioned practical decisions. We used some tools provided by the HDTJava library 1.1.26 , and the default Deflater compressor provided by Zlib. Tests
were performed on a computer with an Intel Xeon X5675 processor at 3.07
GHz and 48 GB of RAM, running Ubuntu/Precise 12.04.2 LTS. The network is
regarded as an ideal communication channel for a fair comparison.
4.1
Datasets
Table 1 lists our experimental datasets7 , reporting: number of triples, size in NTriples (Nt herinafter) format, and the different number of subjects, predicates
and objects. We choose representative datasets based on the number of triples,
topic coverage, availability and, if possible, previous uses in benchmarking.
6
7
https://code.google.com/p/hdt-java.
We have set a Research Object with all the datasets as well as the prototype source
code at http://purl.org/net/ro-eri-ISWC14.
We define three different categories of datasets: streaming (10), statistics (3)
and general (3). Obviously, Streaming datasets are our main application focus;
the first six datasets in Table 1 have been already used in the evaluation of RDSZ
[10] and correspond to RDF messages in the public streamline of a microblogging site (Identica), Wikipedia edition monitoring (Wikipedia), information from
weather stations in Spain (AEMET-1 and AEMET-2), credit card transactions in
petrol stations (Petrol) and a random mix of these datasets (Mix). We complete
the corpora with information of media events (e.g. concerts and other performances) in Flickr (Flickr Event Media), and weather measurements of a blizzard
(LOD Nevada) and two hurricanes (LOD Charley and LOD Katrina) extracted
from the Linked Observation Data dataset which is the core of SRBench [20].
Statistical datasets are the prototypical case of other (non-streaming)
data presenting clear regularities that ERI can take advantage of. We consider three datasets8 (Eurostat migr-reschange, Eurostat tour cap nuts3 and Eurostat avia paexac) using the RDF Data Cube Vocabulary [7], providing population, tourism and transport statistics respectively.
Finally, we experiment with general static datasets, without prior assumptions on data regularities. We use well-known datasets in the domains of films
(LinkedMDB) and bibliography (Faceted DBLP), as well as Dbpedia 3-8.
4.2
Compactness Results
ERI allows multiple configurations for encoding, providing different space/time
tradeoffs for different scenarios. In this section we focus on evaluating three different configurations: ERI-1K (blocksize - 1024), ERI-4k (blocksize - 4096) and
ERI-4k-Nodict (blocksize - 4096). ERI-1K and ERI-4K include a LRU dictionary
for each value channel whereas ERI-4k-Nodict does not. We allow the consistent
predicates option (i.e. we save datatype tag repetitions) in all datasets except
for the mix dataset and all the general category in which the aforementioned
assumption is not satisfied. In turn, we manually define a set of common discrete predicates in Presets. Finally, according to previous experiences [10], the
blockSize selection introduces a tradeoff between space and delays: the bigger
the blocks, the more regular structures can be found. This implies better compression results, but with longer waiting times in the decoder. Based on this, we
select two configurations, 1K and 4K triples providing different tradeoffs.
We compare our proposal with diverse streaming compression techniques.
Table 2 analyzes the compression performance providing compression ratios as
Compressed size
Original size , taking Nt as the Original size. First, we test standard deflate
over Nt (Nt Deflate-4K), flushing the compression internal buffer each 4096
triples, and over the Turtle (TTL Deflate) serialization9 in the best scenario
of compressing the complete dataset at once. We also test the RDSZ approach
which is focused on compressing streams of RDF graphs, whereas ERI considers
continuous flows of RDF triples. Thus, the evaluation with RDSZ is limited to
8
9
Taken from Eurostat-Linked Data, http://eurostat.linked-statistics.org/.
For the conversion process we use Any23 0.9.0, http://any23.apache.org/.
Compression Ratio
Nt Deflate-4K TTL Deflate ERI-4k ERI-4k-Nodict RDSZ HDT-4K HDT
Dataset
Mix
Identica
Wikipedia
AEMET-1
AEMET-2
Petrol
Flickr Event Media
LOD Nevada
LOD Charley
LOD Katrina
Eurostat migr.
Eurostat tour.
Eurostat avia paexac
LinkedMDB
Faceted DBLP
Dbpedia 3-8
8.2%
11.0%
10.5%
4.1%
2.8%
6.5%
9.0%
3.2%
3.1%
3.1%
2.1%
2.2%
2.2%
4.7%
5.4%
8.0%
5.1%
8.5%
7.5%
1.5%
1.1%
3.8%
6.9%
1.3%
1.3%
1.3%
0.5%
0.6%
0.6%
2.9%
3.7%
6.4%
5.2%
8.4%
7.5%
1.2%
1.1%
2.9%
6.6%
1.5%
1.4%
1.4%
0.5%
0.5%
0.6%
3.1%
4.0%
8.0%
5.1%
8.0%
7.7%
0.8%
1.1%
2.6%
6.3%
1.3%
1.2%
1.2%
0.5%
0.6%
0.6%
2.6%
3.5%
7.5%
4.9%
8.7%
7.2%
1.3%
1.1%
3.9%
6.6%
1.2%
1.2%
1.2%
-
10.6%
16.4%
13.4%
4.4%
3.8%
9.9%
14.4%
4.9%
4.9%
5.0%
2.6%
2.6%
3.2%
9.5%
11.3%
16.0%
7.6%
13.6%
10.9%
2.9%
3.8%
5.2%
7.2%
3.2%
3.2%
3.2%
2.5%
2.5%
2.6%
5.9%
9.2%
8.0%
Table 2. Compression results on the different datasets.
streaming datasets (the first category in the table), for which we can configure
the RDSZ input as a set of Turtle graphs merged together (the input expected
by the RDSZ prototype), one per original graph in the dataset. The results of
RDSZ depend on two configuration parameters: we use batchSize=5 and cacheSize=100, the default configuration in [10]. For a fair comparison, we consider
the original size in Nt in the reported RDSZ compression results. To complete
the comparison, we evaluate the HDT serialization, although it works on complete
datasets. Thus, we also analyze HDT on partitions of 4096 triples (HDT-4k).
The results show the varied compression ratio between categories and different datasets. The considered statistical datasets are much more compressive
than the rest. Besides structural repetitions, they are highly compressible because they include few objects (see Table 1) repeated throughout the dataset.
As can be seen, ERI excels in space for streaming and statistical datasets. As
expected, it clearly outperforms Nt compression (up to 5 times) thanks to the
molecule grouping. This grouping is somehow also exploited by Turtle, which
natively groups items with the same subject. Thus, the deflate compression over
Turtle can also take advantage of datasets in which predicates and values are
repeated within the same compression context. In turn, ERI clearly outperforms
Turtle compression (up to 1.9 times) in those datasets in which the repetitions in
structures and values are distributed across the stream (e.g. Petrol and Identica).
Similar reasoning can be made for the slightly different results reported by
ERI-4k and ERI-4k-Nodict. As can be seen, the presence of the object dictionary can overload the representation, although it always obtains comparable
compression ratios. Note that, as ERI groups the objects by predicate within
each block, ERI-4k-Nodict using Zlib can already take advantage of the redundancies in objects whenever these repetitions are present in the same block. In
turn, ERI-4k slightly improves ERI-4k-Nodict in those cases (such as statistical
datasets) in which the object repetitions are distributed across different blocks.
RDSZ remains comparable to our approach. ERI outperforms RDSZ in those
datasets in which the division in graphs of the input fails to group redundancies
in compression contexts. In contrast, the RDSZ compression slightly outper-
Fig. 5. Analysis of compression results of the considered ERI configurations.
forms ERI in two particular cases of interest: Mix, where the information is
randomly distributed, and the simple Wikipedia dataset, where only 3 predicates are present. In such cases, ERI pays the cost of having several compression
channels and thus flushing the metadata of several compressors (in contrast to
one compressor in RDSZ). An alternative, which is not exploited in the present
proposal, is to group channels and use one compressor per group of channels.
This kind of decision has also been taken by EXI [18].
As for general data, LinkedMDB and Faceted DBLP datasets provide wellstructured information and thus ERI can also take advantage of repetitive structures of predicates, obtaining the best compression as well. As expected, ERI
losses efficiency in a dataset with very diverse information and structures such
as Dbpedia. Nonetheless, Turtle compression is the only competitor in this case.
As expected, HDT is built for a different type of scenario and the results are
not competitive w.r.t. ERI. Although the compression of the full dataset with HDT
improves the compression by blocks (HDT-4k), it remains far from ERI efficiency.
Figure 5 compares the compression ratio of the three ERI configurations that
have been considered. As expected, a smaller buffer in ERI-1k slightly affects
the efficiency; the more blocks, the more additional control information and
smaller regularities can be obtained and compressed. The comparison between
ERI-4k and ERI-4k-Nodict corresponds with the results in Table 2 and the
aforementioned analysis denoting the object dictionary overhead.
4.3
Processing Efficiency Results
In this section we measure the processing time of ERI, reporting elapsed times
(in seconds) for all experiments, averaging five independent executions.
First, we compare compression and decompression times of ERI against RDSZ.
Table 3 reports the results for the streaming datasets (in which RDSZ applies),
Dataset
Compression Time (sec.)
Decompression Time (sec.)
RDSZ ERI-4k ERI-4k-Nodict RDSZ ERI-4k ERI-4k-Nodict
Mix
2.5
1.8
Identica
8.4
3.1
Wikipedia
3.8
2.8
AEMET-1
17.9
4.3
AEMET-2
95.7
15.7
Petrol
149.9
13.4
Flickr Event Media 1,141.8 262.4
LOD Nevada
534.7 329.9
LOD Charley
1,388.9 663.6
LOD Katrina
2,315.7 1,002.5
1.2
0.5
1.4
2.1
0.8
2.9
2.2
2.7
3.2
4.6
3.7
6.3
12.5
4.8
20.3
11.8
6.7
16.8
207.2 204.0 311.7
208.3 428.2 191.8
501.4 1,115.7 600.6
822.0 1,869.6 1,038.0
1.2
2.1
2.7
5.1
16.8
20.4
388.2
218.3
611.1
890.0
Table 3. Compression and decompression times comparing ERI and RDSZ.
comparing ERI-4k and ERI-4k-Nodict. As can be seen, ERI always outperforms the RDSZ compression time (3 and 3.8 times on average for ERI-4k
and ERI-4k-Nodict respectively). In contrast, ERI decompression is commonly
slower (1.4 times on average in both ERI configurations). Note again that RDSZ
processes and outputs streams of graphs, whereas ERI manages a stream of
triples. Thus, RDSZ compression can be affected by the fact that it has to
process potentially large graphs with many triples (as is the case of the LOD
datasets), hence the differential encoding process takes a longer time. In contrast,
ERI compresses blocks of the same size. In turn, ERI decompression is generally
slower as it decompresses several channels and outputs all triples in the block.
In those datasets in which the number of value channels is very small (Wikipedia
with three predicates and LOD datasets with many discrete predicates), ERI
decompression is comparable or even slightly better than RDSZ.
As expected, the object dictionary in ERI-4k deteriorates the performance
against ERI-4k-Nodict, once the dictionary has to be created in compression and
continuously updated in decompression. The decompression is faster in ERI-4k
when there are series of triples in which the dictionary does not change.
Then, we test an application managing ERI for compression, exchange and
consumption processes. We assume hereinafter an average unicast transmission
speed of 1MByte/s. Although applications could work on faster channels, we
assume that there is a wide range of scenarios, such as sensor networks, where
the transmission is much poorer, limited, or costly. In the following, we only
focus on streams of RDF triples. Thus, we obviate RDSZ (managing streams
of graphs) and Turtle (grouping human-readable triples by default), and we
establish compressed Nt as the baseline for a fair comparison.
We first focus on transmission and decompression, decoupling the compression process as many scenarios allow the information to be compressed beforehand. Thus, we measure the parsing throughput provided to the client of the
transmission, i.e. the number of triples parsed per time unit. In turn, the total
time includes the exchange time of the considered network, the decompression
time and the parsing process to obtain the components (subject, predicate and
object) of each triple. Figure 6 reports the average results over the corpora, in
triples per second, comparing ERI-4k and ERI-4k-Nodict against the baseline
NT-Deflate-Blocks-4K. As can be seen, both ERI-4k and ERI-4k-Nodict outperform the baseline in most cases except for those datasets with less regularities
Fig. 6. Analysis of Parsing (Exchange+Decompressing) throughput.
in the structure or the data values, which is in line with the previous results for
compression. This is the case of general datasets as well as two streaming datasets
(Wikipedia and Flickr Event Media in which most objects are unrepeated (as can
be seen in Table 1). On average, the gains in parsing throughput for both ERI
configurations are 110.64% and 142.36% for streaming and statistical datasets
respectively, whereas they only decrease to 99.65% for general datasets.
Finally, we address a scenario where compression is subsequently followed
by transmission and decompression (including parsing the final output to obtain
each triple). Figure 7 compares the resulting times (in logarithmic scale) of ERI4k against the baseline NT-Deflate-Blocks-4K. We choose ERI-4k over ERI-4kNodict because the first one produces bigger sizes and worse parsing throughput,
hence we are comparing our worst case over the baseline. Under these conditions,
ERI-4k suffers an expected overhead, given that we are always including the time
to process and compress the information in ERI whereas the baseline directly
compresses the information. Nevertheless, the time in which the client receives
all data in ERI is comparable to the baseline even in this worst case (ERI-4kNodict performs better than ERI-4k as stated above), which the aforementioned
improvement in parsing throughput (as shown in Figure 6). In turn, the huge
savings in the statistical dataset make ERI slightly faster than the baseline.
5
Conclusions and Future Work
In this paper we have focused on compression as a way to minimize transmission
costs in RDF stream processing. In particular, we propose the ERI format, which
leverages inherent structural and data redundancy, which is common on RDF
streams, especially those using the W3C Semantic Sensor Network Ontology.
ERI groups triples into information units called molecules, which are encoded
into two type of channels: structural channels referencing the structure of each
molecule by means of a dynamic dictionary of structures, and value channels
Fig. 7. Comparison of processing performance, ERI-4K against NT-Deflate-Blocks-4k.
grouping and compressing together all the property values by predicate. We
provide insights on the flexible and extensible ERI configurations and present a
practical implementation that is empirically evaluated.
Experiments show that ERI produces state-of-the-art compression for RDF
streams and it excels for regularly-structured static RDF datasets (e.g., statistical datasets), remaining competitive in general datasets. Time overheads for
ERI processing are relatively low and can be assumed in many scenarios.
Our next plans focus on integrating ERI within the next version of morphstreams [3], with the purpose of scaling to higher input data rates, minimizing
data exchange among processing nodes and serving a small set of retrieving features on the compressed data. This will come together with other new features,
including an adaptive query processor aware of the compression dimension during the application of optimization strategies. Besides, we expect to improve performance of ERI management by exploring parallel compression/decompression
and the use of caches and other fast compressors besides Zlib.
Acknowledgments
This research has been funded by the European Commission under the grant
PlanetData (FP7-257641) and by Ministerio de Econom´ıa y Competitividad
(Spain) under the project ”4V: Volumen, Velocidad, Variedad y Validez en la
Gesti´
on Innovadora de Datos” (TIN2013-46238-C4-2-R). We are thankful for discussions with authors of the RDSZ approach, specially with Norberto Fern´andez.
References
1. J. Arias, N. Fernandez, L. Sanchez, and D. Fuentes-Lorenzo. Ztreamy: A middleware for publishing semantic streams on the Web. Web Semantics: Science,
Services and Agents on the World Wide Web, 25:16–23, 2014.
2. D. F. Barbieri, D. Braga, S. Ceri, and M. Grossniklaus. An execution environment
for C-SPARQL queries. In Proc. of the International Conference on Extending
Database Technology (EDBT), pages 441–452. ACM, 2010.
3. J. P. Calbimonte, H. Jeung, O. Corcho, and K. Aberer. Enabling query technologies for the semantic sensor web. International Journal on Semantic Web and
Information Systems, 8(1):43–63, 2012.
4. J.P. Calbimonte, O. Corcho, and A. J. G. Gray. Enabling ontology-based access
to streaming data sources. In Proc. of the International Semantic Web Conference
(ISWC), pages 96–111. Springer, 2010.
5. M. Compton, P Barnaghi, L. Bermudez, et al. The SSN Ontology of the W3C
Semantic Sensor Network Incubator Group. Web Semantics: Science, Services
and Agents on the World Wide Web, 17(0), 2012.
6. G. Correndo, M. Salvadores, I. Millard, and N. Shadbolt. Linked Timelines: Temporal Representation and Management in Linked Data. In Proc. of Workshop on
Consuming Linked Data (COLD), volume CEUR-WS 665, paper 7. 2010.
7. R. Cyganiak, D. Reynolds, and J. Tennison. The RDF Data Cube Vocabulary.
W3C Recommendation, 16 January 2014.
8. P. Deutsch and J-L Gailly. Zlib compressed data format specification version 3.3.
May 1996. Internet RFC 1950.
9. J.D. Fern´
andez, M.A. Mart´ınez-Prieto, C. Guti´errez, A. Polleres, and M. Arias.
Binary RDF Representation for Publication and Exchange. Journal of Web Semantics, 19:22–41, 2013.
10. N. Fern´
andez, J. Arias, L. Sanchez, D. Fuentes-Lorenzo, and O. Corcho. RDSZ: An
approach for lossless RDF stream compression. In Proc. of the Extended Semantic
Web Conference (ESWC), volume LNCS 8465, pages 52–67, 2014.
11. F. Grandi. Introducing an Annotated Bibliography on Temporal and Evolution
Aspects in the Semantic Web. In SIGMOD Rec., volume 41, pages 18–21. 2013.
12. C. Guti´errez, C. Hurtado, and A. Vaisman. Temporal rdf. In The Semantic Web:
Research and Applications, volume LNCS 3532, pages 93–107. Springer Berlin Heidelberg, 2005.
13. H. Hasemann, A. Kroller, and M. Pagel. RDF Provisioning for the Internet of
Things. In Proc. of the International Conference on the Internet of Things (IOT),
pages 143–150. IEEE, 2012.
14. A. K. Joshi, P. Hitzler, and G. Dong. Logical Linked Data Compression. In The
Semantic Web: Semantics and Big Data, volume LNCS 7882, pages 170–184. 2013.
15. D. Le-Phuoc, H. Q. Nguyen-Mau, C. Le Van, and M. Hauswirth. Elastic and
Scalable Processing of Linked Stream Data in the Cloud. In The Semantic Web ISWC 2013, volume LNCS 8218, pages 280–297. Springer Berlin Heidelberg, 2013.
16. A. Margara, J. Urbani, F. Harmelen, and H. Bal. Streaming the Web: Reasoning
over Dynamic Data. Web Semantics, 25, 2014.
17. E. Prud’hommeaux and Carothers. RDF 1.1 Turtle-Terse RDF Triple Language.
W3C Recommendation, 25 February 2014.
18. J. Schneider, T. Kamiya, D. Peintner, and R. Kyusakov. Efficient XML Interchange
(EXI) Format 1.0 (Second Edition). W3C Recommendation, 11 February 2014.
19. J. Urbani, J. Maassen, N. Drost, F. Seinstra, and H. Bal. Scalable RDF data
compression with MapReduce. Concurrency and Computation: Practice and Experience, 25(1):24–39, 2013.
20. Y. Zhang, P. M. Duc, O. Corcho, and J. P. Calbimonte. SRBench: a streaming
RDF/SPARQL benchmark. In The Semantic Web–ISWC 2012, pages 641–657.
Springer, 2012.