public abstract class ForeachWriter<T>
extends Object
implements scala.Serializable
open(...)
method has been called, which signifies that the task is
ready to generate data.
For each partition with `partitionId`: For each batch/epoch of streaming data (if its streaming query) with `epochId`: Method `open(partitionId, epochId)` is called. If `open` returns true: For each row in the partition and batch/epoch, method `process(row)` is called. Method `close(errorOrNull)` is called with error (if any) seen while processing rows.
Important points to note:
partitionId
and epochId
can be used to deduplicate generated data when failures
cause reprocessing of some input data. This depends on the execution mode of the query. If
the streaming query is being executed in the micro-batch mode, then every partition
represented by a unique tuple (partitionId, epochId) is guaranteed to have the same data.
Hence, (partitionId, epochId) can be used to deduplicate and/or transactionally commit data
and achieve exactly-once guarantees. However, if the streaming query is being executed in the
continuous mode, then this guarantee does not hold and therefore should not be used for
deduplication.
close()
method will be called if open()
method returns successfully (irrespective
of the return value), except if the JVM crashes in the middle.
Scala example:
datasetOfString.writeStream.foreach(new ForeachWriter[String] {
def open(partitionId: Long, version: Long): Boolean = {
// open connection
}
def process(record: String) = {
// write string to connection
}
def close(errorOrNull: Throwable): Unit = {
// close the connection
}
})
Java example:
datasetOfString.writeStream().foreach(new ForeachWriter<String>() {
@Override
public boolean open(long partitionId, long version) {
// open connection
}
@Override
public void process(String value) {
// write string to connection
}
@Override
public void close(Throwable errorOrNull) {
// close the connection
}
});
Constructor and Description |
---|
ForeachWriter() |
Modifier and Type | Method and Description |
---|---|
abstract void |
close(Throwable errorOrNull)
Called when stopping to process one partition of new data in the executor side.
|
abstract boolean |
open(long partitionId,
long epochId)
Called when starting to process one partition of new data in the executor.
|
abstract void |
process(T value)
Called to process the data in the executor side.
|
public abstract void close(Throwable errorOrNull)
open
returns true
or false
. However,
close
won't be called in the following cases:
Throwable
open
throws a Throwable
.errorOrNull
- the error thrown during processing data or null if there was no error.public abstract boolean open(long partitionId, long epochId)
partitionId
and epochId
.
partitionId
- the partition id.epochId
- a unique id for data deduplication.true
if the corresponding partition and version id should be processed. false
indicates the partition should be skipped.public abstract void process(T value)
open
returns true
.value
- (undocumented)