Write your first batch
Writing a batch is mainly about writing a main(String...)
method which uses Yupiik Batch compoennts.
Create a batch
Implementing a batch is about extending Batch
interface:
public class MyBatch implements Batch<MyConf> {
@Override
public void accept(final MyConf configuration) {
// ...
}
}
What is important to note is that a batch has a configuration and the configuration is "injected" in accept
method. This enables the framework to map the main arguments to the configuration in an unified fashion.
Once you have a batch implementation, you can run it using Batch
launcher method:
public static void main(final String... args) {
Batch.run(MyBatch.class, args);
}
Define your batch configuration
A batch configuration is a class with field(s) decorated with @Param
:
public class DataSourceConfiguration {
@Param(description = "Driver to use", required = true)
private String driver;
@Param(description = "JDBC URL to use", required = true)
private String url;
@Param(description = "Database username.")
private String username;
@Param(description = "Database password.")
private String password;
}
NOTE
|
fields are injected so they shouldn't be |
TIP
|
|
TIP
|
|
TIP
|
passing |
Reusable batch components
Reusable components are in io.yupiik.batch.runtime.component
package. This section highlights a few of them.
io.yupiik.batch.runtime.component.AcceptedLossDiffFilter
Enables to filter a Diff
. If the acceptedLoss
is not reached, i.e. more row would be deleted than this percentage, the chain will end there.
Goal is to not delete a database if incoming data are corrupted or not properly up to date.
io.yupiik.batch.runtime.component.DatasetDiffComputer
Component which takes as input two iterators representing sorted datasets.
Both dataset will be compared - in streaming mode - using the Comparator
passed in the constructor/factory method. It enables to detect deletions and additions and it will be reflected in the resulting Diff
instance.
If equals, the BiPredicate
will be used to check it is actually equal or not. If not the data will be considered updated, otherwise not changed and ignored from the diff.
io.yupiik.batch.runtime.component.DiffExecutor
Enables to apply a Diff
- from a DatasetDiffComputer
.
It will apply it in a database represented by the connectionSupplier
with the provided commitInterval
. The statements are creating using the related factories - insertFactory
, updateFactory
, deleteFactory
.
Finally, dryRun
toggle enables to simulate the processing without issuing any modification in the database.
io.yupiik.batch.runtime.component.Mapper
This mapping component enables to convert an input to an output instance by providing an specification instance.
It is a class decorated with @Mapping
:
@Mapping(
from = IncomingModel.class,
to = OutputModel.class,
documentation = "Converts an input to an output.",
properties = {
@Property(type = CONSTANT, to = "outputFieldUrl", value = "https://foo.bar/"),
@Property(type = TABLE_MAPPING, from = "inputKeyField", to = "mappedOutput", value = "myLookupTable", onMissedTableLookup = FORWARD)
},
tables = {
@Mapping.MappingTable(
name = "myLookupTable",
entries = {
@Entry(input = "A", output = "1"),
@Entry(input = "C", output = "3")
}
)
})
public class MyMapperSpec {
@Mapping.Custom(description = "This will map X to Y.")
String outputField(final IncomingModel in[, @Table("myLookupTable") final Map<String, String> myLookupTable) {
return ...;
}
}
To get a mapper, you simply call Mapper.mapper(MyMapperSpec.class)
and then can insert this mapper in any BatchChain
.
The specification API enables static mapping (properties
) or custom mapping - @Mapping.Custom
- for more advanced logic.
The companion class io.yupiik.batch.runtime.documentation.MapperDocGenerator
enables to generate an asciidoctor documentation for a mapper class.
io.yupiik.batch.runtime.component.SQLQuery
Enables to extract data from a SQL query.
A custom mapper will be called for each ResultSet
line to convert current row in an object passed to the rest of the BatchChain
.
io.yupiik.batch.runtime.component.uship.DatabaseDiffExecutor
Enables to apply a Diff
- from a DatasetDiffComputer
when the entities (diff model) are Yupiik UShip Persistence models.
It will apply it in a database with the provided commitInterval
.
dryRun
toggle enables to simulate the processing without issuing any modification in the database.
Combine components
Combining components can be done in a plain main:
final var referenceData = new ReferenceDataSQLQuery(databaseConnection);
final var newInputs = new MyNewInputs();
final var comparingProcessor = new ReferencialRowDatasetDiffComputer();
final var diff = comparingProcessor.apply(referenceData, newInputs);
if (new AcceptedLossDiffFilter(0.10).test(diff)) {
new ReferenceDataDiffExecutor(databaseConnection, 25).accept(diff);
}
However, for too noisy cases, it can be neat to use a fluent API on the diff to make it more readable and composable. Indeed you can use the Stream
or Optional
API:
// init
final var diff = new ReferencialRowDatasetDiffComputer()
.apply(new ReferenceDataSQLQuery(databaseConnection), new MyNewInputs());
// start the flow
Stream.of(diff)
.filter(new AcceptedLossDiffFilter(0.10))
.forEach(new ReferenceDataDiffExecutor(databaseConnection, 25));
// or
Optional.of(diff)
.filter(new AcceptedLossDiffFilter(0.10))
.ifPresent(new ReferenceDataDiffExecutor(databaseConnection, 25));
This enables to read more explicitly the flow of processing thanks Stream
or Optional
fluent API. It is also now easier to insert an element or decorate components explicitly.
However, these two API are not designed for that and will quickly hit some limitation. To make it more batch oriented, parent Batch
class enables to define a stream like flow but more batch oriented. You have to start your flow by from()
or use a specific source such as DatasetDiffComputer
. For example:
@Override
public void accept(final MyConfiguration configuration) {
final var connectionProvider = configuration.datasource.toConnectionProvider();
referencialRowDatasetDiff()
.withCustomData(myInput())
.withReferencialData(referenceData(datasource, configuration.table))
.diff()
.filter(withAcceptedLoss(configuration.acceptedLoss))
.then(applyDiff(connectionProvider, configuration.commitInterval, configuration.dryRun, configuration.table))
.run(runConfiguration(datasource, getClass().getSimpleName(), systemUTC()));
}
This DSL is more friendly to the batches we write (integrating with default components).
IMPORTANT
|
until you hit |
TIP
|
some components have a static factory to make it more expressive, don't hesitate to use it. |
Finally, the RunConfiguration
enables to intercept any step of the BatchChain
defined by the previous DSL. Combined with ExecutionTracer
, it will let you store any execution and its steps in a database for audit or monitoring/administration purposes.
Async result
It can be neat to pass a step and its result to next step without it being finished.
It is often the case for reactive bacthes (one "thread" starts to poll data, next step processes it etc.. but you want to keep the polling and processing split in terms of "step" and tracing).
To do that, you can return a BatchPromise
which is just a holder of a value (reactive in our example) and a CompletionStage
which notifies the batch runtime and tracer when the step is done:
from()
.map("step1", new CommentifiableFunction<Void, BatchPromise<String>>() {
@Override
public BatchPromise<String> apply(final Void o) {
final var reactiveComponent = runStep1(); // an Observable with RxJava for example
return BatchPromise.of(reactiveComponent::onItem, reactiveComponent.toCompletionStage());
}
})
.map("step2", new CommentifiableConsumer<BatchPromise<Void>>() {
@Override
public BatchPromise<Void> apply(final BatchPromise<Observable> in) {
final var promise = new CompletableFuture<Void>();
in.subscribe(
this::doStep2ItemProcessing,
error -> {
// log etc...
promise.completeExceptionally(error);
},
() -> promise.complete(null));
return BatchPromise.of(null, promise);
}
})
.run(tracingConfig);
So overall the step1 starts to read some data and emiiting it when step2 starts and subscribes to it.
When step1 is done it notifies the batch runtime it is ok and the tracer (or runtime) will stop the thread 1 monitoring.
Step2 being asynchronous too (due to its reactive nature, it also emits a BatchPromise
leading to the same kind of behavior).
TIP
|
thanks to this trick, you can run a concurrent job with a flat chain since you can pass in the |
Reusable Iterators
Reusable iterators are either provided through FluentIterator
or extensions (in this case you must add a dependency to get it).
io.yupiik.batch.iterator.excel.component.ExcelIterator
Reads an excel file sheet row by row.
Factories
<A> ExcelIterator<A> ExcelIterator.of(Path,int,RowMapper<A>)
Creates a new ExcelIterator
with a custom row mapper.
ExcelIterator<List<String>> ExcelIterator.ofLines(Path,int)
Creates a new ExcelIterator
with a default row mapper mapping lines as List<String>
.
Dependency
<dependency>
<groupId>io.yupiik.batch</groupId>
<artifactId>excel-iterator</artifactId>
<version>${yupiik-batch.version}</version>
</dependency>
TIP
-
Use
FluentIterator
tofilter
andmap
your inputIterator
, it makes the code more readable and adds some featuresStream
does not have like a more advanceddistinct
implementation.
Here is a sample input iterator written using FluentIterator
and filtering its data with some business rule on the raw input and post processing the mapped data with PspRolePasswordInjector
.
final var iterator = FluentIterator.of(myIterator()) (1)
.filter(new MyBusinessFilter()) (2)
.map(new MyBusinessMapper()) (3)
.unwrap(); (4)
-
Create a
FluentIterator
frommyIterator()
result, -
Filter the iterator with a
Predicate
, - Map the iterator data to another model with a `Mapper>,
-
Remove the enclosing
FluentIterator
wrapper which is not needed once the full iterator chain is defined (optional).