Memgraph can connect to existing Kafka streams. To use streams, a user must
- Manage a stream via a query
- Provide a user-defined transformation module
More information about transformation modules can be found here. The rest of this section describes how to manage streams with Memgraph.
The general syntax for creating a stream is:
CREATE STREAM <stream name> TOPICS <topic1> [, <topic2>, ...] TRANSFORM <transform procedure> [CONSUMER_GROUP <consumer group name>] [BATCH_INTERVAL <milliseconds>] [BATCH_SIZE <size>];
STREAM with name
<stream name> that consumes messages from
TOPICS with name
TRANSFORM denotes the user-defined
transformation with name
Additionally, the user can provide the following optional parameters:
<consumer group name>. If not specified, then
mg_consumerwill be used as a consumer group name.
BATCH_INTERVALdenotes the maximum wait time interval for consuming message(s) before calling the transformation procedure with the already received message(s). This value must be greater than zero and is defaulted to 100.
BATCH_SIZEdenotes the total number of messages to wait before calling the transformation procedure with the already received message(s). It must be greater than zero and is defaulted to 1000.
The transformation procedure is called if either the
BATCH_INTERVAL or the
BATCH_SIZE is reached and there is at least one received message.
BATCH_INTERVAL starts when the:
- the stream is started
- the processing of the previous batch is completed
- the previous batch interval ended without receiving any messages
The user who executes the
CREATE query is going to be the owner of the stream.
Authentication and authorization are not supported in Memgraph Community, thus
the owner will always be
Null, and the privileges are not checked in Memgraph
Community. In Memgraph Enterprise the privileges of the owner are used when
executing the queries returned from a transformation, in other words, the
execution of the queries will fail if the owner doesn't have the required
privileges. More information about how the owner affects the stream can be
found in the
DROP STREAM <stream name>;
Drops a stream with name
START STREAM <stream name>;START ALL STREAMS;
Starts a stream (or all streams) with name
When a stream is started, it should resume from the last committed offset. If there is no committed offset for the consumer group, then the largest offset will be used, therefore only the new messages will be consumed.
In stream processing, it is important to have some guarantees about how failures are handled. When connecting an external application such as Memgraph to a Kafka stream, there are two possible ways to handle failures during message processing:
- Every message is processed at least once: the message offsets are committed to the Kafka cluster after the processing is done. This means if the committing fails, then the messages can get processed multiple times.
- Every message is processed at most once: the message offsets are committed to the Kafka cluster right after they are received before the processing is started. This means if the processing fails, then the same messages won't be processed again.
Missing a message can result in missing an edge that would connect two independent components of the graph. Therefore, we think that missing some information is a bigger problem for graphs than having some information duplicated, so we implemented our streams using the at least once semantics, i.e. for every batch of messages the queries returned by the transformations are executed and committed to the database before committing the message offset to the Kafka cluster. However, even though we cannot guarantee exactly once semantics, we tried to minimize the possibility of processing messages multiple times. This means committing the message offsets to the Kafka cluster happens right after the transaction is committed to the database.
STOP STREAM <stream name>;STOP ALL STREAMS;
Stops a stream (or all streams) with name
Shows a list of existing streams with the following information:
- stream name
- list of topics
- consumer group id
- batch interval
- batch size
- transformation procedure name
- the owner of the streams
- whether the stream is running
CHECK STREAM <stream name> [BATCH_LIMIT <count>] [TIMEOUT <milliseconds>] ;
Does a dry-run on stream with name
<stream name> with
<count> number of
batches and returns the result of the transformation: the queries and their
parameters that would be executed in a normal run.
<count> is unspecified, its default value is 1.
<count> batches are processed, the transformation result is returned.
<count> number of batches are not processed within the specified timeout,
then an exception is thrown. This might be caused by not receiving enough
TIMEOUT is measured in milliseconds, and it's defaulted to 30000.
Checking a stream won't commit any offsets.