Cassandra 作为一门优秀的非关系型数据库,用途比较广泛。
本文不是一篇Cassandra的介绍性文章,主要讲解了如何利用Java代码操作Cassandra数据库,如果对Cassandra还不是很了解,可以参考前面的两篇教程。
与 Cassandra交互的常用的类:
Cluster类:操作集群,控制连接节点和一些属性,项目中只需要定义一个
Session类:执行CQL语句,项目中只需要定义一个
ResultSet类:每次同步执行CQL都会返回这个类
ResultSetFuture类:每次异步执行CQL都会返回这个类
PreparedStatement类:可以预定义CQL
Statement类:定义CQL,可以指定查询属性,如fetchSize
Row类:查询的结果中的一行数据
QueryBuilder类:可以动态构造CQL中的Select、Insert、Update、Delete
Cluster
Cluster cluster = Cluster.builder()
.addContactPoints("127.0.0.1","127.0.0.2")
.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
.withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
.build();
cluster.getConfiguration()
.getProtocolOptions()
.setCompression(ProtocolOptions.Compression.LZ4);
Cluster 四种Retry重试策略:(对数据库的操作失败后的重试)
DefaultRetryPolicy
DowngradingConsistencyRetryPolicy
与DefaultRetryPolicy一样,不同点是考虑了最终数据一致性问题
FallthroughRetryPolicy
这一策略不重试查询,但允许客户端业务逻辑实现重试
LoggingRetryPolicy
不重试查询,用来记录日志信息,info级别
Cluster两种节点重连策略:(节点连接不上重新连接)
ConstantReconnectionPolicy
固定时间间隔进行重连
ExponentialReconnectionPolicy
指数级别增加重连时间,但不会超过最大重连时间
session执行CQL查询的两种模式:
同步执行
ResultSet rs = session.execute("SELECT * FROM KEYSPACE1.CF1");
异步执行
ResultSetFuture rs = session.executeAsync("SELECT * FROM KEYSPACE1.CF1");
同步执行与异步执行结果的获取
ResultSet类代表执行CQL的结果信息,如果是查询操作,可以用如下方法获取所有数据:
List<Row> rowsList = rs.all();
ResultSetFuture是以异步非阻塞方式获取数据,可以通过如下方法获取ResultSet对象:
ResultSet rs = rsf.getUninterruptibly();
预定义CQL语句
PreparedStatement statement = getSession().prepare(
"INSERT INTO simplex.songs " +
"(id,title,album,artist,tags) " +
"VALUES (?,?,?);");
statement.bind(1231,"标题","专辑","艺术家","标签")
用QueryBuilder动态生成CQL语句
Insert insert = QueryBuilder
.insertInto("addressbook","contact") // 空间(数据库)名字,列族名字
.value("firstName","Dwayne") // 列名,列的值
.value("lastName","Garcia")
.value("email","dwayne@example.com";
ResultSet results = session.execute(insert);
下面的例子使用的 jar 为cassandra-driver-core
给出 jar 的maven dependency
<!-- https://mvnrepository.com/artifact/com.datastax.cassandra/cassandra-driver-core --> <dependency> <groupId>com.datastax.cassandra</groupId> <artifactId>cassandra-driver-core</artifactId> <version>2.1.9</version> </dependency>
下面分别给出三个例子 展示如何利用Java代码操作Cassandra 操作数据库。
2. 使用cassandra-driver-core 封装的 QueryBuilder 操作数据库
3. 使用Preparement sql 操作 Cassandra数据库
package cassandra; import java.util.List; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; /** * Hello world! * */ public class Cassandra_CQL { public static void main( String[] args ) { Cluster cluster = null; Session session = null; try { //定义一个Cluster类 cluster = Cluster.builder().addContactPoint("127.0.0.1").build(); //需要获取Session对象 session = cluster.connect(); //创建键空间 String createKeySpaceCQL = "create keyspace if not exists keyspace1 with replication={'class':'SimpleStrategy','replication_factor': 1}"; session.execute(createKeySpaceCQL); //创建列族 String createTableCQL = "create table if not exists keyspace1.student(name varchar primary key,age int)"; session.execute(createTableCQL); //插入数据 String insertCQL = "insert into keyspace1.student(name,age) values('zhangsan',20)"; session.execute(insertCQL); //查询,修改,删除数据 String queryCQL = "select * from keyspace1.student"; ResultSet rs = session.execute(queryCQL); List<Row> dataList = rs.all(); for (Row row : dataList) { System.out.println("==>name: "+ row.getString("name")); System.out.println("==>age: "+row.getInt("age")); } //修改 String updateCQL = "update keyspace1.student set age=22 where name='zhangsan'"; session.execute(updateCQL); rs = session.execute(queryCQL); dataList = rs.all(); for (Row row : dataList) { System.out.println("==>name: "+ row.getString("name")); System.out.println("==>age: "+row.getInt("age")); } //删除数据 String deleteCQL = "delete from keyspace1.student where name='zhangsan'"; session.execute(deleteCQL); rs = session.execute(queryCQL); dataList = rs.all(); for (Row row : dataList) { System.out.println("==>name: "+ row.getString("name")); System.out.println("==>age: "+row.getInt("age")); } } catch (Exception e) { e.printStackTrace(); }finally{ //关闭Session和Cluster session.close(); cluster.close(); } } }
2.使用cassandra-driver-core 封装的 QueryBuilder 操作数据库
package cassandra; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.querybuilder.Insert; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.querybuilder.Select.Where; public class Cassandra_builder { public static void main( String[] args ) { Cluster cluster = null; Session session = null; try { //定义一个Cluster类 cluster = Cluster.builder().addContactPoint("127.0.0.1").build(); //需要获取Session对象 session = cluster.connect(); //创建键空间 String createKeySpaceCQL = "create keyspace if not exists keyspace1 with replication={'class':'SimpleStrategy',age int)"; session.execute(createTableCQL); //新增数据 Insert insert = QueryBuilder.insertInto("keyspace1","student").value("name","lisi").value("age",11); session.execute(insert); System.out.println("插入语句: "+insert); System.out.println("查询数据"); //查询数据 Where select = QueryBuilder.select().all().from("keyspace1","student").where(QueryBuilder.eq("name","lisi")); ResultSet rs = session.execute(select); for(Row row : rs.all()){ System.out.println("=>name: "+row.getString("name")); System.out.println("=>age: "+row.getInt("age")); } System.out.println("查询语句: "+select); System.out.println("修改数据"); //修改数据 com.datastax.driver.core.querybuilder.Update.Where update = QueryBuilder.update("keyspace1","student").with(QueryBuilder.set("age",21)).where(QueryBuilder.eq("name","lisi")); session.execute(update); rs = session.execute(select); for(Row row : rs.all()){ System.out.println("=>name: "+row.getString("name")); System.out.println("=>age: "+row.getInt("age")); } System.out.println("修改语句: "+update); System.out.println("删除数据"); //删除数据 com.datastax.driver.core.querybuilder.Delete.Where delete = QueryBuilder.delete().from("keyspace1","lisi")); session.execute(delete); rs = session.execute(select); for(Row row : rs.all()){ System.out.println("=>name: "+row.getString("name")); System.out.println("=>age: "+row.getInt("age")); } System.out.println("删除语句: "+delete); } catch (Exception e) { e.printStackTrace(); }finally{ //关闭Session和Cluster session.close(); cluster.close(); } } }
3.使用Preparement sql 操作 Cassandra数据库
package cassandra; import java.util.List; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; public class Cassandra_preCQL { public static void main(String[] args) { Cluster cluster = null; Session session = null; try { //定义一个Cluster类 cluster = Cluster.builder().addContactPoint("127.0.0.1").build(); //需要获取Session对象 session = cluster.connect(); //创建键空间 String createKeySpaceCQL = "create keyspace if not exists keyspace1 with replication={'class':'SimpleStrategy',age int)"; session.execute(createTableCQL); //插入数据 PreparedStatement statement = session.prepare("insert into keyspace1.student(name,age) values(?,?)"); session.execute(statement.bind("zhangsan",40)); //查询,删除数据 String queryCQL = "select * from keyspace1.student"; ResultSet rs = session.execute(queryCQL); List<Row> dataList = rs.all(); for (Row row : dataList) { System.out.println("==>name: "+ row.getString("name")); System.out.println("==>age: "+row.getInt("age")); } } catch (Exception e) { e.printStackTrace(); }finally{ //关闭Session和Cluster session.close(); cluster.close(); } } }