Multithread test app help
In a job interview they requested to me to develop an application that should be a server and a client. The server should allow several connections at the same time to increase/decrease a counter. The counter should be persisted allowing the application to recover last counter number from crash/normal exit. The communication should be done through TCP and the protocol should be defined. I already did the app but, as I am no experienced with multithreadind I would like to check whether I did it right. :)
First I did a server that will receive the calls, each time a call is received, a new thread is created to process the request by using FixedThreadPool. In order to work with the file that will persist the counter I have defined a SynchronousQueue, it will ensure that the file is modified once at a time for each requested.
Code:
public class Server extends Thread {
private ServerSocket serverSocket = null;
private ExecutorService pool = null;
private SynchronousQueue bq = null;
public Server(int port, int poolSize) throws IOException {
this.serverSocket = new ServerSocket(port);
this.pool = Executors.newFixedThreadPool(poolSize);
this.bq = new SynchronousQueue(true);
}
@Override
public void run() {
try {
PersistCounter pc = new PersistCounter(this.bq);
pc.start();
for (;;) {
pool.execute(new Operation(serverSocket.accept(), this.bq));
}
} catch (IOException ex) {
if (Boolean.FALSE.equals(this.serverSocket.isClosed())) {
try {
this.serverSocket.close();
} catch (IOException ex1) {
ex1.printStackTrace();
}
}
pool.shutdown();
}
}
}
Each processing the request will check whether the received command (INC/DEC) is right or wrong, returning OK or ERR to client. If comand is ok, will put the command in the queue.
Code:
public class Operation extends Thread {
private Socket socket = null;
private SynchronousQueue bq = null;
public Operation(Socket socket, SynchronousQueue bq) {
this.socket = socket;
this.bq = bq;
}
@Override
public void run() {
try {
BufferedReader in = new BufferedReader(new InputStreamReader(
this.socket.getInputStream()));
PrintWriter pw = new PrintWriter(this.socket.getOutputStream());
String command = in.readLine();
System.out.println("Received command is: " + command);
if (Boolean.TRUE.equals("INC".equalsIgnoreCase(command.trim()))) {
try {
System.out.println("Putting on queue...");
bq.put("INC");
pw.println("OK");
pw.flush();
} catch (Exception e) {
System.out.println("Error: " + e.getMessage());
pw.println("ERR");
pw.flush();
}
} else if (Boolean.TRUE.equals("DEC"
.equalsIgnoreCase(command.trim()))) {
try {
bq.put("DEC");
pw.println("OK");
pw.flush();
} catch (Exception e) {
pw.println("ERR");
pw.flush();
System.out.println("Error: " + e.getMessage());
}
} else {
System.out.println("Wrong command..");
pw.println("ERR");
}
System.out.println("Closing resources");
in.close();
pw.close();
this.socket.close();
} catch (SocketException se) {
System.out.println("Error: " + se.getMessage());
} catch (IOException ex) {
System.out.println("Error: " + ex.getMessage());
} finally {
try {
System.out.println("Closing socket");
this.socket.close();
} catch (IOException ex) {
System.out.println("Error closing socket..." + ex.getMessage());
}
}
}
}
The real operation on the counter is performed by a class that works as a consumer of the queue:
Code:
public class PersistCounter extends Thread {
private SynchronousQueue bq = null;
private File file = new File("./data.txt");
public PersistCounter(SynchronousQueue bq) {
this.bq = bq;
try {
checkFile(this.file);
} catch (Exception ex) {
System.out.println("Error: " + ex.getMessage());
}
}
public void run() {
try {
while (true) {
String operation = (String) this.bq.take();
doOperation(operation, this.file);
}
} catch (Exception e) {
System.out.println("Error: " + e.getMessage());
}
}
private void doOperation(String operation, File file) throws Exception {
// Read file
FileChannel fc = new FileInputStream(file).getChannel();
ByteBuffer buff = ByteBuffer.allocate(10);
fc.read(buff);
// Convert from ByteBuffer to String.
byte[] bArray = new byte[buff.limit()];
for (int x = 0; x < buff.limit() - 1; x++) {
bArray[x] = buff.get(x);
}
// Get current value.
int current = Integer.parseInt(new String(bArray).trim());
fc.close();
System.out.println("Current value: " + current);
if ("INC".equalsIgnoreCase(operation)) {
current++;
} else {
current--;
}
System.out.println("New value: " + current);
// Store new value on file.
fc = new FileOutputStream(file).getChannel();
fc.write(ByteBuffer.wrap(new Integer(current).toString().getBytes()));
fc.close();
}
/**
* Will check whether the file required to persist counter exists or not.
* If file does not exists, it will be created and initialized to zero.
*
* @param file
* @throws Exception
*/
private void checkFile(File file) throws Exception {
if (Boolean.FALSE.equals(file.exists())) {
System.out.println("File does not exists. Will be created!");
PrintWriter pw = new PrintWriter(file);
// Initialize file with 0
pw.print("0");
pw.flush();
pw.close();
} else {
System.out.println("File: " + file.getAbsolutePath()
+ " already exists!");
}
}
}
The client is very silly and just work sending commands to the server.
Is my code right? I mean, is it thread safe? I have tested it and it worked fine but I would like whether I did it right or could be a better way... :)
Thanks in advance
C