Table of contents
- Table of contents
- What we are going to build
- Modelling the Road Traffic Data
- Using a RichAsyncFunction to query PostGIS
- Simulating Traffic Congestion Patterns
- Congestion Warnings
- Building the Apache Flink Pipeline
- Debouncing Warnings using a KeyedProcessFunction
- Generating Warnings for the CEP Pattern Matches with a PatternProcessFunction
- Running the Apache Flink Pipeline
- Conclusion
What we are going to build
In the last article we have been importing Open Street Map (OSM) data into a PostGIS database, then we've extracted the road and traffic light data available for the data. The queries are quite efficient by now, so we can query it in a millisecond range.
All code can be found in a Git repository at:
In this article we are going to implement a Traffic Congestion Detection using the OSM data, Apache Flink and a generated mock dataset. We will see how to transform events, query the database, using the CEP for detecting patterns in data and generate warnings.
Modelling the Road Traffic Data
We start by creating a RawTrafficEvent
, which models the data a single vehicle sends to our service. As
you can see it contains a timestamp
the event was recorded at, a vehicleId
as a unique identifier of
the vehicle, the position given as latitude
and longitude
, and finally the speed
.
package de.bytefish.flinkjam.models;
/**
* Represents a raw real-time traffic event from a vehicle, before any enrichment.
*/
public class RawTrafficEvent implements java.io.Serializable {
public long timestamp;
public String vehicleId;
public double latitude;
public double longitude;
public double speed;
public RawTrafficEvent() {
}
public RawTrafficEvent(long timestamp, String vehicleId, double latitude, double longitude, double speed) {
this.timestamp = timestamp;
this.vehicleId = vehicleId;
this.latitude = latitude;
this.longitude = longitude;
this.speed = speed;
}
// Getters
public long getTimestamp() {
return timestamp;
}
public String getVehicleId() {
return vehicleId;
}
public double getLatitude() {
return latitude;
}
public double getLongitude() {
return longitude;
}
public double getSpeed() {
return speed;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public void setVehicleId(String vehicleId) {
this.vehicleId = vehicleId;
}
public void setLatitude(double latitude) {
this.latitude = latitude;
}
public void setLongitude(double longitude) {
this.longitude = longitude;
}
public void setSpeed(double speed) {
this.speed = speed;
}
@Override
public String toString() {
return "RawTrafficEvent{" +
"timestamp=" + timestamp +
", vehicleId='" + vehicleId + '\'' +
", lat=" + latitude +
", lon=" + longitude +
", speed=" + speed +
'}';
}
}
The idea is to now enrich the data with the road and traffic light information, that we have
extracted in a previous version. I intially thought about sending the road and traffic light
information along with the RawEventData
.
And if you are Apple Maps or Google Maps you are controlling their devices and are based on most recent maps. So I guess they can send along this information reliably and highly simplfy the entire pipeline.
Since we are not Apple or Google, we need to enrich the Event with the road information for
the given GPS position in a RoadEnrichedTrafficEvent
.
package de.bytefish.flinkjam.models;
/**
* Represents a traffic event enriched with road segment data.
*/
public class RoadEnrichedTrafficEvent implements java.io.Serializable {
public long timestamp;
public String vehicleId;
public double latitude;
public double longitude;
public double speed;
public String roadSegmentId;
public int speedLimitKmh;
public String roadType;
public RoadEnrichedTrafficEvent() {
}
public RoadEnrichedTrafficEvent(RawTrafficEvent rawEvent, String roadSegmentId, int speedLimitKmh, String roadType) {
this.timestamp = rawEvent.timestamp;
this.vehicleId = rawEvent.vehicleId;
this.latitude = rawEvent.latitude;
this.longitude = rawEvent.longitude;
this.speed = rawEvent.speed;
this.roadSegmentId = roadSegmentId;
this.speedLimitKmh = speedLimitKmh;
this.roadType = roadType;
}
// Getters (needed for Flink's POJO detection and keying)
public long getTimestamp() {
return timestamp;
}
public String getVehicleId() {
return vehicleId;
}
public String getRoadSegmentId() {
return roadSegmentId;
}
public double getSpeed() {
return speed;
}
public double getLatitude() {
return latitude;
}
public double getLongitude() {
return longitude;
}
public int getSpeedLimitKmh() {
return speedLimitKmh;
}
public String getRoadType() {
return roadType;
}
@Override
public String toString() {
return "RoadEnrichedTrafficEvent{" +
"timestamp=" + timestamp +
", vehicleId='" + vehicleId + '\'' +
", latitude=" + latitude +
", longitude=" + longitude +
", speed=" + speed +
", roadSegmentId='" + roadSegmentId + '\'' +
", speedLimitKmh=" + speedLimitKmh +
", roadType='" + roadType + '\'' +
'}';
}
}
It's not useful to generate Traffic Warnings if a car is standing at a traffic light in
a city. So we need to take our traffic light data into account and enrich the event with
traffic lights in a FullyEnrichedTrafficEvent
.
package de.bytefish.flinkjam.models;
import java.util.List;
import java.util.Objects;
/**
* Represents a fully enriched traffic event, including road segment and nearby traffic light data.
* This is the event used for CEP.
*/
public class FullyEnrichedTrafficEvent implements java.io.Serializable {
public long timestamp;
public String vehicleId;
public double latitude;
public double longitude;
public double speed;
public String roadSegmentId;
public int speedLimitKmh;
public String roadType;
public List<TrafficLightInfo> nearbyTrafficLights; // List of nearby traffic lights
public FullyEnrichedTrafficEvent() {
}
public FullyEnrichedTrafficEvent(RoadEnrichedTrafficEvent roadEvent, List<TrafficLightInfo> nearbyTrafficLights) {
this.timestamp = roadEvent.timestamp;
this.vehicleId = roadEvent.vehicleId;
this.latitude = roadEvent.latitude;
this.longitude = roadEvent.longitude;
this.speed = roadEvent.speed;
this.roadSegmentId = roadEvent.roadSegmentId;
this.speedLimitKmh = roadEvent.speedLimitKmh;
this.roadType = roadEvent.roadType;
this.nearbyTrafficLights = nearbyTrafficLights;
}
// Getters
public long getTimestamp() {
return timestamp;
}
public String getVehicleId() {
return vehicleId;
}
public String getRoadSegmentId() {
return roadSegmentId;
}
public double getSpeed() {
return speed;
}
public double getLatitude() {
return latitude;
}
public double getLongitude() {
return longitude;
}
public int getSpeedLimitKmh() {
return speedLimitKmh;
}
public String getRoadType() {
return roadType;
}
public List<TrafficLightInfo> getNearbyTrafficLights() {
return nearbyTrafficLights;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FullyEnrichedTrafficEvent that = (FullyEnrichedTrafficEvent) o;
return timestamp == that.timestamp &&
Double.compare(that.speed, speed) == 0 &&
Double.compare(that.latitude, latitude) == 0 &&
Double.compare(that.longitude, longitude) == 0 &&
speedLimitKmh == that.speedLimitKmh &&
Objects.equals(vehicleId, that.vehicleId) &&
Objects.equals(roadSegmentId, that.roadSegmentId) &&
Objects.equals(roadType, that.roadType) &&
Objects.equals(nearbyTrafficLights, that.nearbyTrafficLights);
}
@Override
public int hashCode() {
return Objects.hash(timestamp, vehicleId, latitude, longitude, speed, roadSegmentId, speedLimitKmh, roadType, nearbyTrafficLights);
}
@Override
public String toString() {
return "FullyEnrichedTrafficEvent{" +
"timestamp=" + timestamp +
", vehicleId='" + vehicleId + '\'' +
", latitude=" + latitude +
", longitude=" + longitude +
", speed=" + speed +
", roadSegmentId='" + roadSegmentId + '\'' +
", speedLimitKmh=" + speedLimitKmh +
", roadType='" + roadType + '\'' +
", nearbyTrafficLights=" + nearbyTrafficLights +
'}';
}
}
Using a RichAsyncFunction to query PostGIS
Apache Flink uses a so called RichAsyncFunction
to asynchronously load data from external data sources. We
start by writing a AsyncRoadSegmentLookupFunction
to load the road segments off of our PostGIS database, the
SQL query has been written and optimized in the previous article.
A little complexity is introduced by using HikariCP
as the data source, because we want efficient Connection
Pooling, instead of having the costly task of opening and closing database connections for each and every query.
package de.bytefish.flinkjam.lookup;
// ...
/**
* An Asynchronous Lookup Function for enriching RawTrafficEvents with the road_segment data.
*/
public class AsyncRoadSegmentLookupFunction extends RichAsyncFunction<RawTrafficEvent, RoadEnrichedTrafficEvent> {
private transient ExecutorService executorService;
private transient HikariDataSource dataSource; // HikariCP DataSource
private String dbUrl;
private String dbUser;
private String dbPassword;
private double lookupRadiusMeters;
private static final String LOOKUP_SQL =
"SELECT\n" +
" osm_id, \n" +
" osm_type,\n" +
" road_type,\n" +
" speed_limit_kmh,\n" +
" name,\n" +
" geom <-> ST_SetSRID(ST_MakePoint(?, ?), 4326)::geography as distance\n" +
"FROM\n" +
" road_segments\n" +
"WHERE \n" +
" ST_DWithin(geom, ST_SetSRID(ST_MakePoint(?, ?), 4326)::geography, ?)\n" +
"ORDER BY \n" +
" distance asc \n" +
"LIMIT 1;";
public AsyncRoadSegmentLookupFunction(String dbUrl, String dbUser, String dbPassword, double lookupRadiusMeters) {
this.dbUrl = dbUrl;
this.dbUser = dbUser;
this.dbPassword = dbPassword;
this.lookupRadiusMeters = lookupRadiusMeters;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Initialize HikariCP connection pool
HikariConfig config = new HikariConfig();
config.setJdbcUrl(dbUrl);
config.setUsername(dbUser);
config.setPassword(dbPassword);
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
config.setMinimumIdle(5); // Minimum idle connections
config.setMaximumPoolSize(20); // Maximum total connections (tune this based on DB capacity)
config.setConnectionTimeout(5000); // 5 seconds connection timeout
config.setIdleTimeout(300000); // 5 minutes idle timeout
config.setMaxLifetime(1800000); // 30 minutes max connection lifetime
// Adjust the thread pool size for the AsyncFunction to match or exceed HikariCP's pool size
// A higher number of threads allows more concurrent async DB calls.
executorService = Executors.newFixedThreadPool(config.getMaximumPoolSize() * 2);
dataSource = new HikariDataSource(config);
}
@Override
public void close() throws Exception {
super.close();
if (executorService != null) {
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS); // Wait for threads to finish
}
if (dataSource != null) {
dataSource.close(); // Close the connection pool
}
}
@Override
public void asyncInvoke(RawTrafficEvent input, ResultFuture<RoadEnrichedTrafficEvent> resultFuture) throws Exception {
executorService.submit(() -> {
RoadEnrichedTrafficEvent outputEvent = null;
try (Connection connection = dataSource.getConnection()) {
try (PreparedStatement statement = connection.prepareStatement(LOOKUP_SQL)) {
statement.setDouble(1, input.longitude);
statement.setDouble(2, input.latitude);
statement.setDouble(3, input.longitude);
statement.setDouble(4, input.latitude);
statement.setDouble(5, lookupRadiusMeters);
try (ResultSet rs = statement.executeQuery()) {
if (rs.next()) {
String roadSegmentId = rs.getString("osm_id");
String roadType = rs.getString("road_type");
int speedLimit = rs.getInt("speed_limit_kmh");
outputEvent = new RoadEnrichedTrafficEvent(input, roadSegmentId, speedLimit, roadType);
} else {
// Important: Complete the future even if no segment is found
System.out.println("No road segment found for " + input.latitude + ", " + input.longitude + " - " + input.vehicleId);
outputEvent = new RoadEnrichedTrafficEvent(input, null, 0, "Unknown");
}
}
}
resultFuture.complete(Collections.singleton(outputEvent));
} catch (Exception e) { // Catch any exception during lookup (e.g., SQLException)
System.err.println("Error during async road segment lookup for event " + input.vehicleId + ": " + e.getMessage());
resultFuture.completeExceptionally(e); // Ensure future is completed exceptionally
}
});
}
}
We'll do the same for loading nearby traffic lights in the AsyncTrafficLightLookupFunction
function. We load
multiple traffic lights in a given radius, in case our vehicles are congested at an intersection.
package de.bytefish.flinkjam.lookup;
// ...
public class AsyncTrafficLightLookupFunction extends RichAsyncFunction<RoadEnrichedTrafficEvent, FullyEnrichedTrafficEvent> {
private transient ExecutorService executorService;
private transient HikariDataSource dataSource; // HikariCP DataSource
private String dbUrl;
private String dbUser;
private String dbPassword;
private double lookupRadiusMeters;
private static final String LOOKUP_SQL =
"SELECT osm_id, name, ST_Y(geom) AS latitude, ST_X(geom) AS longitude, is_pedestrian_crossing_light, " +
"ST_Distance(geom::geography, ST_SetSRID(ST_MakePoint(?, ?), 4326)::geography) AS distance_meters " +
"FROM traffic_lights " +
"WHERE ST_DWithin(geom::geography, ST_SetSRID(ST_MakePoint(?, ?), 4326)::geography, ?) " + // ? = radius in meters
"ORDER BY distance_meters " +
"LIMIT 3;"; // Limit to top N closest lights
public AsyncTrafficLightLookupFunction(String dbUrl, String dbUser, String dbPassword, double lookupRadiusMeters) {
this.dbUrl = dbUrl;
this.dbUser = dbUser;
this.dbPassword = dbPassword;
this.lookupRadiusMeters = lookupRadiusMeters;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Initialize HikariCP connection pool
HikariConfig config = new HikariConfig();
config.setJdbcUrl(dbUrl);
config.setUsername(dbUser);
config.setPassword(dbPassword);
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
config.setMinimumIdle(5); // Minimum idle connections
config.setMaximumPoolSize(20); // Maximum total connections (tune this based on DB capacity)
config.setConnectionTimeout(5000); // 5 seconds connection timeout
config.setIdleTimeout(300000); // 5 minutes idle timeout
config.setMaxLifetime(1800000); // 30 minutes max connection lifetime
// Adjust the thread pool size for the AsyncFunction to match or exceed HikariCP's pool size
// A higher number of threads allows more concurrent async DB calls.
executorService = Executors.newFixedThreadPool(config.getMaximumPoolSize() * 2);
dataSource = new HikariDataSource(config);
}
@Override
public void close() throws Exception {
super.close();
if (executorService != null) {
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS); // Wait for threads to finish
}
if (dataSource != null) {
dataSource.close(); // Close the connection pool
}
}
@Override
public void asyncInvoke(RoadEnrichedTrafficEvent input, ResultFuture<FullyEnrichedTrafficEvent> resultFuture) throws Exception {
executorService.submit(() -> {
List<TrafficLightInfo> nearbyTrafficLights = new ArrayList<>();
// Open connection within the submitted task to ensure thread safety
try (Connection connection = dataSource.getConnection()) {
try (PreparedStatement statement = connection.prepareStatement(LOOKUP_SQL)) {
statement.setDouble(1, input.longitude); // For ST_MakePoint
statement.setDouble(2, input.latitude); // For ST_MakePoint
statement.setDouble(3, input.longitude); // For ST_DWithin
statement.setDouble(4, input.latitude); // For ST_DWithin
statement.setDouble(5, lookupRadiusMeters); // Radius in meters
try (ResultSet rs = statement.executeQuery()) {
while (rs.next()) {
long id = rs.getLong("osm_id");
String name = rs.getString("name");
double lat = rs.getDouble("latitude");
double lon = rs.getDouble("longitude");
boolean isPedestrian = rs.getBoolean("is_pedestrian_crossing_light");
double distance = rs.getDouble("distance_meters");
nearbyTrafficLights.add(new TrafficLightInfo(id, name, lat, lon, isPedestrian, distance));
}
}
}
FullyEnrichedTrafficEvent outputEvent = new FullyEnrichedTrafficEvent(input, nearbyTrafficLights);
resultFuture.complete(Collections.singleton(outputEvent));
} catch (Exception e) {
System.err.println("SQL Error during async traffic light lookup for event " + input.vehicleId + ": " + e.getMessage());
resultFuture.completeExceptionally(e);
}
});
}
}
Simulating Traffic Congestion Patterns
By using Google Maps I have extracted the GPS position and simulated various congestion patters. The events are published in a one second interval, so you can see them coming in and see patterns building up.
package de.bytefish.flinkjam.sources;
// ...
/**
* A custom Flink SourceFunction to continuously generate simulated RawTrafficEvents.
* This will prevent the Flink job from terminating due to source exhaustion.
*/
public class RawTrafficEventSource implements SourceFunction<RawTrafficEvent> {
private volatile boolean isRunning = true;
public List<RawTrafficEvent> simulatedEvents;
public long currentBaseTimestamp;
public RawTrafficEventSource() {
// Initialize the base simulated events once
simulatedEvents = new ArrayList<>();
// Populate initial simulated events
currentBaseTimestamp = System.currentTimeMillis();
// Simulate normal traffic on A1 initially
simulatedEvents.add(new RawTrafficEvent(currentBaseTimestamp, "Car001", 52.194443, 7.797556, 90));
simulatedEvents.add(new RawTrafficEvent(currentBaseTimestamp, "Car002", 52.194512, 7.797632, 95));
// Simulate Light Slowdown on A1 (Exit FMO Airport) (e.g., speed below 70%, 3+ events, 2+ unique vehicles)
simulatedEvents.add(new RawTrafficEvent(currentBaseTimestamp + 5000, "Car001", 52.115703, 7.702582, 65));
simulatedEvents.add(new RawTrafficEvent(currentBaseTimestamp + 10000, "Car002", 52.115849, 7.702784, 60));
simulatedEvents.add(new RawTrafficEvent(currentBaseTimestamp + 15000, "Car003", 52.115611, 7.702527, 68));
simulatedEvents.add(new RawTrafficEvent(currentBaseTimestamp + 20000, "Car001", 52.116178, 7.703204, 62));
// Simulate Sustained Slowdown on A1 (e.g., speed below 50%, 5+ events, 3+ unique vehicles)
simulatedEvents.add(new RawTrafficEvent(currentBaseTimestamp + 25000, "Car008", 52.041431, 7.607906, 45));
simulatedEvents.add(new RawTrafficEvent(currentBaseTimestamp + 30000, "Car009", 52.041487, 7.607935, 40));
simulatedEvents.add(new RawTrafficEvent(currentBaseTimestamp + 35000, "Car010", 52.041200, 7.607768, 42));
simulatedEvents.add(new RawTrafficEvent(currentBaseTimestamp + 40000, "Car008", 52.041744, 7.608094, 35));
simulatedEvents.add(new RawTrafficEvent(currentBaseTimestamp + 45000, "Car009", 52.041499, 7.608008, 38));
// Simulate Traffic Jam (e.g., speed below 10 km/h, 7+ events, 5+ unique vehicles)
simulatedEvents.add(new RawTrafficEvent(currentBaseTimestamp + 50000, "TruckX", 52.006419, 7.575624, 8));
simulatedEvents.add(new RawTrafficEvent(currentBaseTimestamp + 53000, "CarY", 52.006460, 7.575661, 5));
simulatedEvents.add(new RawTrafficEvent(currentBaseTimestamp + 56000, "BusZ", 52.006511, 7.575714, 3));
simulatedEvents.add(new RawTrafficEvent(currentBaseTimestamp + 59000, "MotoA", 52.006555, 7.575762, 1));
simulatedEvents.add(new RawTrafficEvent(currentBaseTimestamp + 62000, "VanB", 52.006612, 7.575814, 7));
simulatedEvents.add(new RawTrafficEvent(currentBaseTimestamp + 65000, "TruckX", 52.006687, 7.575893, 2));
simulatedEvents.add(new RawTrafficEvent(currentBaseTimestamp + 68000, "CarY", 52.006760, 7.575959, 4));
// Simulate a scenario that's just a long red light (high density, low speed, but expected)
simulatedEvents.add(new RawTrafficEvent(currentBaseTimestamp + 78000, "CarD", 51.966097, 7.623803, 2));
simulatedEvents.add(new RawTrafficEvent(currentBaseTimestamp + 81000, "CarE", 51.966113, 7.623785, 0));
simulatedEvents.add(new RawTrafficEvent(currentBaseTimestamp + 84000, "CarF", 51.966138, 7.623783, 1));
simulatedEvents.add(new RawTrafficEvent(currentBaseTimestamp + 87000, "CarG", 51.966162, 7.623780, 0));
simulatedEvents.add(new RawTrafficEvent(currentBaseTimestamp + 90000, "CarH", 51.966184, 7.623771, 3));
simulatedEvents.add(new RawTrafficEvent(currentBaseTimestamp + 93000, "CarI", 51.966092, 7.623775, 0));
simulatedEvents.add(new RawTrafficEvent(currentBaseTimestamp + 96000, "CarJ", 51.966217, 7.623759, 2));
}
@Override
public void run(SourceContext<RawTrafficEvent> ctx) throws Exception {
int eventIndex = 0;
// Define a delay between each event emission to simulate real-time data
long delayBetweenEventsMillis = 1000; // 1 second between events
while (isRunning) {
if (eventIndex >= simulatedEvents.size()) {
// Loop back to the beginning of the simulated data
eventIndex = 0;
// Optionally adjust timestamps for the next loop to keep them increasing
currentBaseTimestamp = System.currentTimeMillis();
}
RawTrafficEvent originalEvent = simulatedEvents.get(eventIndex);
// Create a new event to update its timestamp
RawTrafficEvent eventToSend = new RawTrafficEvent(
currentBaseTimestamp + (originalEvent.timestamp - simulatedEvents.get(0).timestamp),
originalEvent.vehicleId,
originalEvent.latitude,
originalEvent.longitude,
originalEvent.speed
);
ctx.collect(eventToSend);
eventIndex++;
try {
Thread.sleep(delayBetweenEventsMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
isRunning = false; // Stop if interrupted
System.err.println("RawTrafficEventSource interrupted during sleep.");
}
}
}
@Override
public void cancel() {
isRunning = false;
}
}
Congestion Warnings
We are going to use the Apache Flink CEP to generate CongestionWarnings
. This CongestionWarning
is going to be the
same for every Traffic Event, such as Traffic Jam
, Sustained Slowdown
, ... It also includes the related traffic
lights in case we have encountered a Traffic Jam, that's related to an intersection.
package de.bytefish.flinkjam.models;
import java.util.List;
/**
* Represents a detected congestion warning.
*/
public class CongestionWarning implements java.io.Serializable {
public long timestamp;
public String warningType;
public String roadSegmentId;
public double currentAverageSpeed;
public int speedLimit;
public String details;
public List<TrafficLightInfo> relatedTrafficLights; // Can include lights related to the congestion
public int affectedUniqueVehiclesCount;
public CongestionWarning() {
}
public CongestionWarning(long timestamp, String roadSegmentId, String warningType, double currentAverageSpeed, int speedLimit, String details, List<TrafficLightInfo> relatedTrafficLights, int affectedUniqueVehiclesCount) {
this.timestamp = timestamp;
this.roadSegmentId = roadSegmentId;
this.warningType = warningType;
this.currentAverageSpeed = currentAverageSpeed;
this.speedLimit = speedLimit;
this.details = details;
this.relatedTrafficLights = relatedTrafficLights;
this.affectedUniqueVehiclesCount = affectedUniqueVehiclesCount;
}
public long getTimestamp() {
return timestamp;
}
public String getWarningType() {
return warningType;
}
public String getRoadSegmentId() {
return roadSegmentId;
}
public double getCurrentAverageSpeed() {
return currentAverageSpeed;
}
public int getSpeedLimit() {
return speedLimit;
}
public String getDetails() {
return details;
}
public List<TrafficLightInfo> getRelatedTrafficLights() {
return relatedTrafficLights;
}
public int getAffectedUniqueVehiclesCount() {
return affectedUniqueVehiclesCount;
}
@Override
public String toString() {
return "!!! CONGESTION ALERT !!! " +
"Timestamp=" + timestamp +
", RoadSegmentId='" + roadSegmentId + '\'' +
", Type='" + warningType + '\'' +
", AvgSpeed=" + String.format("%.2f", currentAverageSpeed) + " km/h" +
", SpeedLimit=" + speedLimit + " km/h" +
", Details='" + details + '\'' +
", UniqueVehicles=" + affectedUniqueVehiclesCount +
", RelatedLights=" + relatedTrafficLights;
}
}
Building the Apache Flink Pipeline
I usually don't want my articles to just be "dumping code to the article". So I have long thought about how to show iteratively building a pipeline. But it's too long, so the code has been commented a lot instead.
It basically starts with a TrafficCongestionConfig
, which is going to hold a configuration for congestion
patterns. All patterns are somewhat similar, so it's not useful to break them up into a dozens of
configurations.
We then start streaming the data with our previously defined RawTrafficEventSource
and enrich the
RawTrafficEvent
using the AsyncRoadSegmentLookupFunction
and AsyncTrafficLightLookupFunction
.
Once we have built the FullyEnrichedTrafficEvent
, we apply the various congestion patterns on them,
that are using the TrafficCongestionConfig
to define relative and absolute thresholds. In Apache Flink
a PatternProcessFunction
is then used to convert the matched pattern into a CongestionWarning
.
This generates the CongestionWarnings
just fine. It's generating them so fine, that way too many
warnings are generated. And it's logical: Think how the patterns are applied: It matches the
Patterns for every incoming event, thus each of them matches.
And what about a Traffic Jam, that slowly fades out? We don't want to generate the events in a very rapid succession, because with a "Stop-And-Go" you would quickly alternate like... "Traffic Jam", "Sustained Slowdown", "Traffic Jam", "Sustained Slowdown". People watching such a map probably go crazy.
So we also need to debounce the warnings introducing both an "Warning Event Hierarchy" and a debounce
period, that avoids emitting millions of events, while traffic is building up of fading out. All this
happens in the WarningDebouncerFunction
.
package de.bytefish.flinkjam;
// ...
public class TrafficCongestionWarningSystem {
private static final String DB_URL = "jdbc:postgresql://localhost:5432/flinkjam";
private static final String DB_USER = "postgis";
private static final String DB_PASSWORD = "postgis";
/**
* A Configuration used in Traffic Congestions Patterns.
*/
public static class TrafficCongestionConfig implements Serializable { // Implement Serializable
public int minEvents;
public double speedThresholdFactor; // Speed < factor of speed limit
public long windowMillis; // Changed to long for serializability
public int minUniqueVehicles;
public int trafficLightNearbyRadiusInMeters;
public double absoluteSpeedThresholdKmh; // Absolute speed < this value
public TrafficCongestionConfig(int minEvents, double speedThresholdFactor, double absoluteSpeedThresholdKmh, Duration window, int minUniqueVehicles, int trafficLightNearbyRadiusInMeters) {
this.minEvents = minEvents;
this.speedThresholdFactor = speedThresholdFactor;
this.absoluteSpeedThresholdKmh = absoluteSpeedThresholdKmh;
this.windowMillis = window.toMillis(); // Store as milliseconds
this.minUniqueVehicles = minUniqueVehicles;
this.trafficLightNearbyRadiusInMeters = trafficLightNearbyRadiusInMeters;
}
public int getMinEvents() {
return minEvents;
}
public double getSpeedThresholdFactor() {
return speedThresholdFactor;
}
public long getWindowMillis() {
return windowMillis;
}
public int getMinUniqueVehicles() {
return minUniqueVehicles;
}
public int getTrafficLightNearbyRadiusInMeters() {
return trafficLightNearbyRadiusInMeters;
}
public double getAbsoluteSpeedThresholdKmh() {
return absoluteSpeedThresholdKmh;
}
}
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// We define a WatermarkStrategy for all event streams. forBoundedOutOfOrderness is a common choice for
// real-time applications, allowing for some events to arrive out of order. We allow 5 seconds
// out-of-orderness.
WatermarkStrategy<RawTrafficEvent> rawEventWmStrategy = WatermarkStrategy
.<RawTrafficEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, recordTimestamp) -> event.timestamp);
// The simulation should include the following types: Normal Traffic, Light Slowdown, Sustained Slowdown,
// Traffic Jam, Red Light Phase. We don't have access to a real data source, so we generate our own
// traffic events and feed them to Apache Flink using a SourceFunction<>.
DataStream<RawTrafficEvent> rawTrafficEvents = env.addSource(new RawTrafficEventSource())
.assignTimestampsAndWatermarks(rawEventWmStrategy); // Apply watermark strategy
// Now we need to enrich the raw events with the road segment data, so we get the speed limit and road_segment,
// the possible congestion is on. This is done using the RichAsyncFunction, with 10m as the Lookup Function
// for Road Segments.
DataStream<RoadEnrichedTrafficEvent> roadEnrichedEvents = AsyncDataStream.unorderedWait(
rawTrafficEvents,
new AsyncRoadSegmentLookupFunction(DB_URL, DB_USER, DB_PASSWORD, 10),
5000, TimeUnit.MILLISECONDS, 100
).filter(event -> event.roadSegmentId != null);
roadEnrichedEvents.print("Road Enriched Event");
// Next is enriching the events with the TrafficLightInformation. This needs to be done, because we don't
// want to generate dozens of false positives, if cars are in a red light phase. If we don't take traffic
// lights into consideration, each red light phase would generate an event.
DataStream<FullyEnrichedTrafficEvent> fullyEnrichedEvents = AsyncDataStream.unorderedWait(
roadEnrichedEvents,
new AsyncTrafficLightLookupFunction(DB_URL, DB_USER, DB_PASSWORD, 20),
500, TimeUnit.MILLISECONDS, 100 // Increased timeout to 15 seconds
);
fullyEnrichedEvents.print("Fully Enriched Event");
// Watermark Strategy for FullyEnrichedTrafficEvents
WatermarkStrategy<FullyEnrichedTrafficEvent> fullyEnrichedEventWmStrategy = WatermarkStrategy
.<FullyEnrichedTrafficEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5)) // Allow 5 seconds out-of-orderness
.withTimestampAssigner((event, recordTimestamp) -> event.getTimestamp());
DataStream<FullyEnrichedTrafficEvent> trafficEventsForCEP = fullyEnrichedEvents
.assignTimestampsAndWatermarks(fullyEnrichedEventWmStrategy) // Apply watermark strategy after enrichment
.filter(event -> event.roadSegmentId != null); // Ensure only valid road segments go to CEP
// Key the stream by road segment ID to detect patterns per road segment
DataStream<FullyEnrichedTrafficEvent> keyedTrafficEvents = trafficEventsForCEP
.keyBy(FullyEnrichedTrafficEvent::getRoadSegmentId);
// Pattern 1: Light Slowdown
TrafficCongestionConfig lightSlowdownConfig = new TrafficCongestionConfig(3, 0.7, 0, Duration.ofSeconds(60), 2, 50);
Pattern<FullyEnrichedTrafficEvent, ?> lightSlowdownPattern = Pattern.<FullyEnrichedTrafficEvent>begin("lightSlowEvent")
.where(new IterativeCondition<FullyEnrichedTrafficEvent>() {
@Override
public boolean filter(FullyEnrichedTrafficEvent event, Context<FullyEnrichedTrafficEvent> ctx) throws Exception {
return event.getSpeed() < (event.getSpeedLimitKmh() * lightSlowdownConfig.speedThresholdFactor);
}
})
.timesOrMore(lightSlowdownConfig.minEvents)
.greedy()
.within(Duration.ofMillis(lightSlowdownConfig.windowMillis));
DataStream<CongestionWarning> lightSlowdownWarnings = CEP.pattern(keyedTrafficEvents, lightSlowdownPattern).process(
new CongestionPatternProcessFunction("Light Slowdown", lightSlowdownConfig.speedThresholdFactor, 0.0, lightSlowdownConfig.minUniqueVehicles, lightSlowdownConfig.getTrafficLightNearbyRadiusInMeters()));
// Pattern 2: Sustained Slowdown (more severe than Light Slowdown)
TrafficCongestionConfig sustainedSlowdownConfig = new TrafficCongestionConfig(5, 0.5, 0, Duration.ofSeconds(60), 3, 30);
Pattern<FullyEnrichedTrafficEvent, ?> sustainedSlowdownPattern = Pattern.<FullyEnrichedTrafficEvent>begin("sustainedSlowEvent")
.where(new IterativeCondition<>() {
@Override
public boolean filter(FullyEnrichedTrafficEvent event, Context<FullyEnrichedTrafficEvent> ctx) throws Exception {
return event.getSpeed() < (event.getSpeedLimitKmh() * sustainedSlowdownConfig.speedThresholdFactor);
}
})
.timesOrMore(sustainedSlowdownConfig.minEvents)
.greedy()
.within(Duration.ofMillis(sustainedSlowdownConfig.windowMillis));
DataStream<CongestionWarning> sustainedSlowdownWarnings = CEP
.pattern(keyedTrafficEvents, sustainedSlowdownPattern)
.process(new CongestionPatternProcessFunction("Sustained Slowdown", sustainedSlowdownConfig.speedThresholdFactor, 0.0, sustainedSlowdownConfig.minUniqueVehicles, sustainedSlowdownConfig.getTrafficLightNearbyRadiusInMeters()));
// Pattern 3: Traffic Jam (most severe, near-stop)
TrafficCongestionConfig trafficJamConfig = new TrafficCongestionConfig(7, 0, 10, Duration.ofSeconds(60), 5, 30);
Pattern<FullyEnrichedTrafficEvent, ?> trafficJamPattern = Pattern.<FullyEnrichedTrafficEvent>begin("trafficJamEvent")
.where(new IterativeCondition<FullyEnrichedTrafficEvent>() {
@Override
public boolean filter(FullyEnrichedTrafficEvent event, Context<FullyEnrichedTrafficEvent> ctx) throws Exception {
return event.getSpeed() < trafficJamConfig.absoluteSpeedThresholdKmh; // Absolute low speed
}
})
.timesOrMore(trafficJamConfig.minEvents)
.greedy()
.within(Duration.ofMillis(trafficJamConfig.windowMillis));
DataStream<CongestionWarning> trafficJamWarnings = CEP.pattern(keyedTrafficEvents, trafficJamPattern).process(
new CongestionPatternProcessFunction("Traffic Jam", 0.0, trafficJamConfig.absoluteSpeedThresholdKmh, trafficJamConfig.minUniqueVehicles, trafficJamConfig.trafficLightNearbyRadiusInMeters));
DataStream<CongestionWarning> allWarnings = lightSlowdownWarnings
.union(sustainedSlowdownWarnings)
.union(trafficJamWarnings);
// Apply debouncing to the combined warnings stream with severity awareness
long debouncePeriodMs = Duration.ofSeconds(60).toMillis();
DataStream<CongestionWarning> debouncedWarnings = allWarnings
.keyBy(CongestionWarning::getRoadSegmentId)
.process(new WarningDebouncerFunction(debouncePeriodMs));
debouncedWarnings.print("DEBOUNCED CONGESTION WARNING");
env.execute("flink-jam: Traffic Congestion Warning System");
}
}
Debouncing Warnings using a KeyedProcessFunction
In order to debounce the warning events in the stream, we need to store the last warning generated by the system. The last event is going to be stored for a keyed stream, that's why we don't need to introduce additional fields.
package de.bytefish.flinkjam.models;
import java.io.Serializable;
/**
* Stores metadata about the last warning issued for a road segment.
*/
public class LastWarningMetadata implements Serializable {
public long timestamp;
public String warningType;
public int severity; // Numerical severity
public LastWarningMetadata() {}
public LastWarningMetadata(long timestamp, String warningType, int severity) {
this.timestamp = timestamp;
this.warningType = warningType;
this.severity = severity;
}
@Override
public String toString() {
return "LastWarningMetadata{" +
"timestamp=" + timestamp +
", warningType='" + warningType + '\'' +
", severity=" + severity +
'}';
}
}
For debouncing events we are using a KeyedProcessFunction
, that debounces the events. It emits
a warning, if the current warnings severity is higher than the previous once or if we exceeded the
debounce period.
package de.bytefish.flinkjam.cep;
// ...
/**
* We need to debounce the events, so we don't generate too many warnings. Imagine you
* have a traffic jam and the cars slowly fade out. You probably don't want to send
* events, when traffic clears up.
*/
public class WarningDebouncerFunction extends KeyedProcessFunction<String, CongestionWarning, CongestionWarning> {
private final long debouncePeriodMillis;
private transient ValueState<LastWarningMetadata> lastWarningMetadataState;
private transient Map<String, Integer> severityMap; // Maps warning type strings to integers
public WarningDebouncerFunction(long debouncePeriodMillis) {
this.debouncePeriodMillis = debouncePeriodMillis;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
lastWarningMetadataState = getRuntimeContext().getState(
new ValueStateDescriptor<>("lastWarningMetadata", TypeInformation.of(LastWarningMetadata.class))
);
// Initialize severity mapping
severityMap = new HashMap<>();
severityMap.put("Light Slowdown", 1);
severityMap.put("Sustained Slowdown", 2);
severityMap.put("Traffic Jam (Queue at Light)", 3);
severityMap.put("Traffic Jam", 4); // Highest severity
}
// Helper to get severity from a warning type string
private int getSeverity(String warningType) {
return severityMap.getOrDefault(warningType, 0); // Default to 0 for unknown types (lowest)
}
@Override
public void processElement(
CongestionWarning warning,
Context ctx,
Collector<CongestionWarning> out) throws Exception {
LastWarningMetadata lastMetadata = lastWarningMetadataState.value();
long currentTime = warning.timestamp;
int incomingSeverity = getSeverity(warning.warningType);
if (lastMetadata == null) {
// No previous warning for this segment, emit immediately
out.collect(warning);
lastWarningMetadataState.update(new LastWarningMetadata(currentTime, warning.warningType, incomingSeverity));
} else {
int lastSeverity = lastMetadata.severity;
long timeSinceLastWarning = currentTime - lastMetadata.timestamp;
if (incomingSeverity > lastSeverity) {
// Always emit if the new warning is more severe
System.out.println(String.format("Emitting %s warning for segment %s (Severity escalated from %s to %s).",
warning.warningType, warning.roadSegmentId, lastMetadata.warningType, warning.warningType));
out.collect(warning);
lastWarningMetadataState.update(new LastWarningMetadata(currentTime, warning.warningType, incomingSeverity));
} else if (timeSinceLastWarning >= debouncePeriodMillis) {
// Emit if the debounce period has passed, even if severity is same or lower
System.out.println(String.format("Emitting %s warning for segment %s (Debounce period passed).",
warning.warningType, warning.roadSegmentId));
out.collect(warning);
lastWarningMetadataState.update(new LastWarningMetadata(currentTime, warning.warningType, incomingSeverity));
} else {
// Suppress if less severe or same severity and within debounce period
System.out.println(String.format("Suppressing %s warning for road segment %s (last warning %d ms ago, severity %d <= %d).",
warning.warningType, warning.roadSegmentId, timeSinceLastWarning, incomingSeverity, lastSeverity));
}
}
}
}
Generating Warnings for the CEP Pattern Matches with a PatternProcessFunction
Now the final piece to the puzzle is the PatternProcessFunction
, that finally processes the our matched
patterns. It takes the List of FullyEnrichedTrafficEvent
and applies the logic for generating the warnings
based on our matches.
package de.bytefish.flinkjam.cep;
// ...
import java.util.*;
public class CongestionPatternProcessFunction extends org.apache.flink.cep.functions.PatternProcessFunction<FullyEnrichedTrafficEvent, CongestionWarning> {
private final String warningBaseType;
private final double speedThresholdFactor; // Used for "Slowdown" types relative to speed limit
private final double absoluteSpeedThresholdKmh; // Used for "Traffic Jam" types
private final int minUniqueVehicles;
private final double trafficLightRadius; // Radius used to consider lights "nearby"
// Constructor for Light and Sustained Slowdown (using speedThresholdFactor)
public CongestionPatternProcessFunction(String warningBaseType, double speedThresholdFactor, double absoluteSpeedThresholdKmh, int minUniqueVehicles, double trafficLightRadius) {
this.warningBaseType = warningBaseType;
this.speedThresholdFactor = speedThresholdFactor;
this.absoluteSpeedThresholdKmh = absoluteSpeedThresholdKmh; // Will be 0.0 for slowdowns
this.minUniqueVehicles = minUniqueVehicles;
this.trafficLightRadius = trafficLightRadius;
}
@Override
public void processMatch(Map<String, List<FullyEnrichedTrafficEvent>> match, Context ctx, Collector<CongestionWarning> out) throws Exception {
// The matched events will be under the first key defined in the pattern (e.g., "firstSlowEvent")
List<FullyEnrichedTrafficEvent> allMatchedEvents = null;
// Iterate through the map to find the list of matched events.
// The key used in `begin("patternName")` is what needs to be retrieved.
// For simple patterns with a single `begin` event, iterating `values()` is sufficient.
if (!match.isEmpty()) {
allMatchedEvents = match.values().iterator().next(); // Get the list from the first (and likely only) key
}
if (allMatchedEvents == null || allMatchedEvents.isEmpty()) {
return; // Should not happen if pattern is correctly defined, but good check
}
FullyEnrichedTrafficEvent firstEvent = allMatchedEvents.get(0);
double totalSpeed = 0;
Set<String> uniqueVehicles = new HashSet<>();
Set<TrafficLightInfo> uniqueRelatedLights = new HashSet<>(); // Use Set for uniqueness
for (FullyEnrichedTrafficEvent event : allMatchedEvents) {
totalSpeed += event.getSpeed();
uniqueVehicles.add(event.getVehicleId());
if (event.getNearbyTrafficLights() != null) {
uniqueRelatedLights.addAll(event.getNearbyTrafficLights());
}
}
// Apply unique vehicle count filter
if (uniqueVehicles.size() < minUniqueVehicles) {
System.out.println(String.format("Skipping %s warning: Not enough unique vehicles (%d < %d) on road segment %s.",
warningBaseType, uniqueVehicles.size(), minUniqueVehicles, firstEvent.roadSegmentId));
return;
}
double averageSpeed = totalSpeed / allMatchedEvents.size();
// Check speed of the last event to prevent stale warnings
FullyEnrichedTrafficEvent lastEvent = allMatchedEvents.get(allMatchedEvents.size() - 1);
boolean lastEventStillSlowEnough = false;
if (warningBaseType.equals("Traffic Jam")) {
if (lastEvent.getSpeed() < absoluteSpeedThresholdKmh * 1.5) { // Allow some slight increase but still very slow
lastEventStillSlowEnough = true;
}
} else {
if (lastEvent.getSpeed() < (lastEvent.getSpeedLimitKmh() * (speedThresholdFactor + 0.15))) { // Allow slightly higher speed than threshold
lastEventStillSlowEnough = true;
}
}
if (!lastEventStillSlowEnough) {
System.out.println(String.format("Skipping %s warning: Last event speed (%.2f km/h) indicates clearing traffic or transient slowdown on road segment %s.",
warningBaseType, lastEvent.getSpeed(), firstEvent.roadSegmentId));
return;
}
String actualWarningType = warningBaseType;
String details;
// Use speedThresholdFactor (for relative) or absoluteSpeedThresholdKmh (for absolute)
if (warningBaseType.equals("Traffic Jam")) {
details = String.format("Severe congestion (Avg Speed: %.2f km/h, below %.1f km/h) detected on road segment %s (Type: %s). Involved vehicles: %d.",
averageSpeed, absoluteSpeedThresholdKmh, firstEvent.roadSegmentId, firstEvent.roadType, uniqueVehicles.size());
} else {
double thresholdSpeed = firstEvent.getSpeedLimitKmh() * speedThresholdFactor;
details = String.format("Average speed (%.2f km/h) is below %.1f%% of speed limit (%.1f km/h vs %d km/h) on road segment %s (Type: %s). Involved vehicles: %d.",
averageSpeed, speedThresholdFactor * 100, thresholdSpeed, firstEvent.speedLimitKmh, firstEvent.roadSegmentId, firstEvent.roadType, uniqueVehicles.size());
}
// Traffic Light Consideration Logic
if (!uniqueRelatedLights.isEmpty()) {
boolean isNearTrafficLight = false;
for (TrafficLightInfo light : uniqueRelatedLights) {
if (light.distanceMeters <= trafficLightRadius) {
isNearTrafficLight = true;
break;
}
}
if (isNearTrafficLight) {
if (warningBaseType.equals("Traffic Jam")) {
// If it's a Traffic Jam and near a light, refine the type
actualWarningType = "Traffic Jam (Queue at Light)";
details = String.format("Severe congestion (Avg Speed: %.2f km/h) detected near traffic light(s) on segment %s. Possible queue at intersection. Involved vehicles: %d.",
averageSpeed, firstEvent.roadSegmentId, uniqueVehicles.size());
} else if (warningBaseType.equals("Sustained Slowdown") || warningBaseType.equals("Light Slowdown")) {
// For slowdowns near a light, add a note
details += " (Near traffic light(s))";
}
} else {
// If it's a Traffic Jam AND NOT near a light, it's a more serious general jam
if (warningBaseType.equals("Traffic Jam")) {
details = String.format("Severe congestion (Avg Speed: %.2f km/h) detected on road segment %s. No immediate traffic light nearby. Potential incident/bottleneck. Involved vehicles: %d.",
averageSpeed, firstEvent.roadSegmentId, uniqueVehicles.size());
}
}
}
CongestionWarning warning = new CongestionWarning(
ctx.timestamp(),
firstEvent.roadSegmentId,
actualWarningType,
averageSpeed,
firstEvent.speedLimitKmh,
details,
new ArrayList<>(uniqueRelatedLights), // Convert Set back to List for output
uniqueVehicles.size()
);
out.collect(warning);
}
}
Running the Apache Flink Pipeline
Since Java 11 we need to explictly add the Java classes we need to use Reflection on.
In the IntelliJ Run Configuration I am adding the CLI Parameters to the JVM options. I don't know if I need them all, but... yolo:
--add-opens java.base/java.lang=ALL-UNNAMED
--add-opens java.base/java.io=ALL-UNNAMED
--add-opens java.base/java.util=ALL-UNNAMED
--add-opens java.base/java.base=ALL-UNNAMED
And when running the application, we can following the entire stream processing and see a warning generated for the light slowdown on the Autobahn A1, that we have defined in our simulated data:
Raw Traffic Event> RawTrafficEvent{timestamp=1751097526846, vehicleId='Car002', latitude=52.194512, longitude=7.797632, speed=95.0}
Road Enriched Event> RoadEnrichedTrafficEvent{timestamp=1751097526846, vehicleId='Car002', latitude=52.194512, longitude=7.797632, speed=95.0, roadSegmentId='326809667', speedLimitKmh=130, roadType='Autobahn'}
Fully Enriched Event> FullyEnrichedTrafficEvent{timestamp=1751097526846, vehicleId='Car002', latitude=52.194512, longitude=7.797632, speed=95.0, roadSegmentId='326809667', speedLimitKmh=130, roadType='Autobahn', nearbyTrafficLights=[]}
Raw Traffic Event> RawTrafficEvent{timestamp=1751097531846, vehicleId='Car001', latitude=52.115703, longitude=7.702582, speed=65.0}
Road Enriched Event> RoadEnrichedTrafficEvent{timestamp=1751097531846, vehicleId='Car001', latitude=52.115703, longitude=7.702582, speed=65.0, roadSegmentId='25995891', speedLimitKmh=130, roadType='Autobahn'}
Fully Enriched Event> FullyEnrichedTrafficEvent{timestamp=1751097531846, vehicleId='Car001', latitude=52.115703, longitude=7.702582, speed=65.0, roadSegmentId='25995891', speedLimitKmh=130, roadType='Autobahn', nearbyTrafficLights=[]}
Raw Traffic Event> RawTrafficEvent{timestamp=1751097536846, vehicleId='Car002', latitude=52.115849, longitude=7.702784, speed=60.0}
Road Enriched Event> RoadEnrichedTrafficEvent{timestamp=1751097536846, vehicleId='Car002', latitude=52.115849, longitude=7.702784, speed=60.0, roadSegmentId='25995891', speedLimitKmh=130, roadType='Autobahn'}
Fully Enriched Event> FullyEnrichedTrafficEvent{timestamp=1751097536846, vehicleId='Car002', latitude=52.115849, longitude=7.702784, speed=60.0, roadSegmentId='25995891', speedLimitKmh=130, roadType='Autobahn', nearbyTrafficLights=[]}
Raw Traffic Event> RawTrafficEvent{timestamp=1751097541846, vehicleId='Car003', latitude=52.115611, longitude=7.702527, speed=68.0}
Road Enriched Event> RoadEnrichedTrafficEvent{timestamp=1751097541846, vehicleId='Car003', latitude=52.115611, longitude=7.702527, speed=68.0, roadSegmentId='25995891', speedLimitKmh=130, roadType='Autobahn'}
Fully Enriched Event> FullyEnrichedTrafficEvent{timestamp=1751097541846, vehicleId='Car003', latitude=52.115611, longitude=7.702527, speed=68.0, roadSegmentId='25995891', speedLimitKmh=130, roadType='Autobahn', nearbyTrafficLights=[]}
Raw Traffic Event> RawTrafficEvent{timestamp=1751097546846, vehicleId='Car001', latitude=52.116178, longitude=7.703204, speed=62.0}
Road Enriched Event> RoadEnrichedTrafficEvent{timestamp=1751097546846, vehicleId='Car001', latitude=52.116178, longitude=7.703204, speed=62.0, roadSegmentId='25995891', speedLimitKmh=130, roadType='Autobahn'}
Fully Enriched Event> FullyEnrichedTrafficEvent{timestamp=1751097546846, vehicleId='Car001', latitude=52.116178, longitude=7.703204, speed=62.0, roadSegmentId='25995891', speedLimitKmh=130, roadType='Autobahn', nearbyTrafficLights=[]}
Raw Traffic Event> RawTrafficEvent{timestamp=1751097551846, vehicleId='Car008', latitude=52.041431, longitude=7.607906, speed=45.0}
Road Enriched Event> RoadEnrichedTrafficEvent{timestamp=1751097551846, vehicleId='Car008', latitude=52.041431, longitude=7.607906, speed=45.0, roadSegmentId='26305583', speedLimitKmh=130, roadType='Autobahn'}
Fully Enriched Event> FullyEnrichedTrafficEvent{timestamp=1751097551846, vehicleId='Car008', latitude=52.041431, longitude=7.607906, speed=45.0, roadSegmentId='26305583', speedLimitKmh=130, roadType='Autobahn', nearbyTrafficLights=[]}
DEBOUNCED CONGESTION WARNING> !!! CONGESTION ALERT !!! Timestamp=1751097541846, RoadSegmentId='25995891', Type='Light Slowdown', AvgSpeed=64,33 km/h, SpeedLimit=130 km/h, Details='Average speed (64,33 km/h) is below 70,0% of speed limit (91,0 km/h vs 130 km/h) on road segment 25995891 (Type: Autobahn). Involved vehicles: 3.', UniqueVehicles=3, RelatedLights=[]
Conclusion
And that's it! We now have a nice playground for exploring other ideas.
Let's see what we could build upon it!