Rust Client for Apache Fluss (Incubating)
https://github.com/apache/fluss-rust.git
Rust implementation of Apache Fluss™.
Java 17 or higher (Java 8 and Java 11 are not recommended) If your cluster does not fulfill these software requirements you will need to install/upgrade it.
Fluss requires the JAVA_HOME environment variable to be set on all nodes and point to the directory of your Java installation.
tar -xzf fluss-0.8.0-incubating-bin.tgz
cd fluss-0.8.0-incubating/
You can start Fluss local cluster by running the following command:
./bin/local-cluster.sh start
After that, the Fluss local cluster is started.
After that, go the project directory, build it and run the example:
cargo build --example example-table --release
cd target/release/examples
./example-table
The example code is as follows:
#[tokio::main]
pub async fn main() -> Result<()> {
// 1: create the table;
let mut args = Args::default();
args.bootstrap_servers = "127.0.0.1:9123".to_string();
let conn_config = ConnectionConfig::from_args(args);
let conn = FlussConnection::new(conn_config).await;
let admin = conn.get_admin();
let table_descriptor = TableDescriptor::builder()
.schema(
Schema::builder()
.column("c1", DataTypes::int())
.column("c2", DataTypes::string())
.build(),
)
.build();
let table_path = TablePath::new("fluss".to_owned(), "rust_test".to_owned());
admin
.create_table(&table_path, &table_descriptor, true)
.await
.unwrap();
// 2: get the table
let table_info = admin.get_table_info(&table_path).await.unwrap();
print!("Get created table:\n {}\n", table_info);
// let's sleep 2 seconds to wait leader ready
thread::sleep(Duration::from_secs(2));
// 3: append log to the table
let table = conn.get_table(&table_path).await;
let append_writer = table.new_append().create_writer();
let batch = record_batch!(("c1", Int32, [1, 2, 3, 4, 5, 6]), ("c2", Utf8, ["a1", "a2", "a3", "a4", "a5", "a6"])).unwrap();
append_writer.append(batch)?;
append_writer.flush().await?;
println!("Start to scan log records......");
// 4: scan the records
let log_scanner = table.new_scan().create_log_scanner();
log_scanner.subscribe(0, 0).await;
loop {
let scan_records = log_scanner.poll(Duration::from_secs(10)).await?;
println!("Start to poll records......");
for record in scan_records {
let row = record.row();
println!(
"{{{}, {}}}@{}",
row.get_int(0),
row.get_string(1),
record.offset()
);
}
}
Ok(())
}
You can change it according to your needs, have fun!
./bin/local-cluster.sh stop
Licensed under the Apache License, Version 2.0