class UploadDataFunction(hrn: HRN)
extends RichMapFunction[PendingPartition, CommitPartition]
with Serializable {
@transient
private lazy val dataClient: FlinkDataClient =
new FlinkDataClient()
@transient
private lazy val writeEngine: FlinkWriteEngine =
dataClient.writeEngine(hrn)
override def close(): Unit =
dataClient.terminate()
override def map(pendingPartition: PendingPartition): CommitPartition =
writeEngine.put(pendingPartition)
}
class PublishIndexWindowFunction(hrn: HRN, indexLayer: String)
extends RichAllWindowFunction[CommitPartition, String, TimeWindow]
with Serializable {
@transient
private lazy val flinkDataClient: FlinkDataClient =
new FlinkDataClient()
@transient
private lazy val publishApi: FlinkPublishApi =
flinkDataClient.publishApi(hrn)
override def close(): Unit =
flinkDataClient.terminate()
override def apply(window: TimeWindow,
partitions: Iterable[CommitPartition],
out: Collector[String]): Unit = {
publishApi.publishIndex(indexLayer, partitions.iterator)
out.collect(s"indexing a new partition of $indexLayer layer is a success")
}
}
val pendingPartitions: DataStream[PendingPartition] =
getPendingPartitionsStream()
val newPartitions: DataStream[CommitPartition] =
pendingPartitions.map(new UploadDataFunction(hrn))
val newPartitionsWindow: AllWindowedStream[CommitPartition, TimeWindow] =
newPartitions.windowAll(TumblingEventTimeWindows.of(Time.minutes(1)))
val results: DataStream[String] =
newPartitionsWindow.apply(new PublishIndexWindowFunction(hrn, indexLayer))
class UploadDataFunction extends RichMapFunction<PendingPartition, CommitPartition>
implements Serializable {
private HRN hrn;
private transient FlinkDataClient dataClient;
private transient FlinkWriteEngine writeEngine;
public UploadDataFunction(HRN hrn) {
this.hrn = hrn;
}
@Override
public void open(Configuration parameters) throws Exception {
dataClient = new FlinkDataClient();
writeEngine = dataClient.writeEngine(hrn);
}
@Override
public void close() throws Exception {
dataClient.terminate();
}
@Override
public CommitPartition map(PendingPartition pendingPartition) throws Exception {
return writeEngine.put(pendingPartition);
}
}
class PublishIndexWindowFunction
extends RichAllWindowFunction<CommitPartition, String, TimeWindow> implements Serializable {
private HRN hrn;
private String indexLayer;
private transient FlinkDataClient dataClient;
private transient FlinkPublishApi publishApi;
public PublishIndexWindowFunction(HRN hrn, String indexLayer) {
this.hrn = hrn;
this.indexLayer = indexLayer;
}
@Override
public void open(Configuration parameters) throws Exception {
dataClient = new FlinkDataClient();
publishApi = dataClient.publishApi(hrn);
}
@Override
public void close() throws Exception {
dataClient.terminate();
}
@Override
public void apply(
TimeWindow window, Iterable<CommitPartition> commitPartitions, Collector<String> out)
throws Exception {
publishApi.publishIndex(indexLayer, commitPartitions.iterator());
out.collect("indexing a new partition of " + indexLayer + " layer is a success");
}
}
DataStream<PendingPartition> pendingPartitions = getPendingPartitionsStream();
DataStream<CommitPartition> newPartitions = pendingPartitions.map(new UploadDataFunction(hrn));
AllWindowedStream<CommitPartition, TimeWindow> newPartitionsWindow =
newPartitions.windowAll(TumblingEventTimeWindows.of(Time.minutes(1)));
DataStream<String> results =
newPartitionsWindow.apply(new PublishIndexWindowFunction(hrn, indexLayer));