GCP Cloud Dataflow use-case patterns
Generate random data and write to 2 MongoDB collections
public static void main(String[] args) throws Exception {
MongoDBExportOptions options = PipelineOptionsFactory
.fromArgs(args).as(MongoDBExportOptions.class);
Pipeline pipeline = Pipeline.create(options);
// total documents to be written
int totalDocs = 1000000;
// write dummy doc1 to collection1
PCollection<Document> col1 = pipeline
.apply("Generate sequence", GenerateSequence.from(0).to(totalDocs))
.apply("Generate dummy doc1", ParDo.of(new GenerateDoc1Fn()));
.apply("Write data to collection2",
MongoDbIO.write()
.withUri("xxx")
.withDatabase("db")
.withCollection("collection1");
PCollection<Document> col2 = pipeline
.apply("Generate sequence", GenerateSequence.from(0).to(totalDocs))
.apply("Generate dummy doc2", ParDo.of(new GenerateDoc2Fn()));
.apply("Write data to collection2",
MongoDbIO.write()
.withUri("xxx")
.withDatabase("db")
.withCollection("collection2");
}
public static class GenerateDoc1Fn extends DoFn<Long, Document> {
@ProcessElement
public void processElement(@Element Long index, OutputReceiver<Document> out) {
out.output(new Document()
.append("doc1_field", "doc1_value"));
.append("doc1_field", "doc1_value"));
}
}
public static class GenerateDoc2Fn extends DoFn<Long, Document> {
@ProcessElement
public void processElement(@Element Long index, OutputReceiver<Document> out) {
out.output(new Document()
.append("doc2_field", "doc2_value"));
.append("doc2_field", "doc2_value"));
}
}
ไม่มีความคิดเห็น:
แสดงความคิดเห็น