Author: klystr
(2009/12/07 10:44) About 2 years ago
Final
21
System.out.println("[" + this.getqName() + "] Item was added");
22
this.queue.add(input);
23
}
24
25
public void deleteFromQueue(String queuedObject) {
26
this.queue.remove(queuedObject);
27
...
47
48
49
/**
50
* Geeft Array terug van hele queue
51
*
52
* @return Lijst met alle items
53
*/
54
public LinkedList<String> peekEntireQueue() {
55
return new LinkedList<String>(this.queue);
56
57
58
* Removes latest item from the Queue and removes it from the Queue.
59
60
* @return File contents
61
return this.queue.poll();
65
66
67
62
63
64
68
public void setqName(String qName) {
69
this.qName = qName;
70
5
import java.io.InputStream;
6
import java.net.SocketException;
7
8
import org.apache.commons.net.ftp.FTPClient;
9
import org.apache.commons.net.ftp.FTPFile;
10
11
public class ESBFTP {
12
13
private String ftpAddress, ftpUserName, ftpPassword;
34
31
35
32
36
33
* Haalt bestanden uit directory
* @param directory
37
* doeldirectory
38
* @return Array van FTPFiles
39
40
public FTPFile[] getFilesFromDir(String directory) {
41
this.makeConnection();
42
FTPFile[] output = null;
43
try {
44
output = this.ftpClient.listFiles(directory);
45
} catch (IOException e) {
46
e.printStackTrace();
this.terminateConnection();
return output;
* Maakt verbinding met de FTP-server en slaat deze verbinding op in de
* lokale variabele ftpClient
* Sluit connectie met server die nu in lokale variabele ftpClient staat
* Haalt opgegeven bestand van de server er verwijderd deze.
71
* @param filePath
72
* Pad + bestandsnaam
73
* @return Bestandsinhoud
74
private void terminateConnection() {
75
public String pollFile(String filePath) {
76
77
ByteArrayOutputStream outputStream = null;
78
this.ftpClient.disconnect();
79
outputStream = new ByteArrayOutputStream();
80
this.ftpClient.retrieveFile(filePath, outputStream);
81
// this.ftpClient.dele(filePath);
82
83
84
this.ftpClient = null;
85
86
87
return outputStream.toString();
88
89
90
108
109
110
111
112
91
113
92
93
94
114
95
115
96
116
97
117
98
118
99
119
100
101
120
102
103
104
105
106
107
121
122
123
124
import java.io.StringReader;
import java.io.StringWriter;
public class FTPApp2QChannel extends App2Q {
private String ftpAddress;
16
private String ftpPassword;
14
17
private ESBFTP ftpClient;
15
18
19
@Override
public String getFtpAddress() {
20
public void run() {
return ftpAddress;
while (true) {
this.readAndProcessFTP();
Thread.sleep(10000);
} catch (InterruptedException e) {
28
29
30
public String getFtpInitDir() {
return ftpInitDir;
public String getFtpPassword() {
return ftpPassword;
public String getFtpUserName() {
return ftpUserName;
private void readAndProcessFTP() {
// Maak FTP-verbinding klaar
this.ftpClient = new ESBFTP(this.getFtpAddress(), this.getFtpUserName(), this
private String undoFilter(String fileContents) {
StringWriter stringWriter = new StringWriter();
FromIBGFilter fromFilter = new FromIBGFilter();
fromFilter.doFilter(new StringReader(fileContents), stringWriter);
return stringWriter.toString();
public void setFtpAddress(String ftpAddress) {
this.ftpAddress = ftpAddress;
public void setFtpInitDir(String ftpInitDir) {
this.ftpInitDir = ftpInitDir;
public void setFtpPassword(String ftpPassword) {
this.ftpPassword = ftpPassword;
public void setFtpUserName(String ftpUserName) {
this.ftpUserName = ftpUserName;
this.httpObject = httpq2AppChannel;
private String findId(String inQueue) {
File tmpFile = new File("C:/queuetemp.tmp");
FileWriter fw = new FileWriter("C:/queuetemp.tmp");
fw.write(inQueue);
fw.close();
} catch (Exception e) {
// Doe niets
Xml xmlParser = new Xml("C:/queuetemp.tmp", "message");
String reqId = xmlParser.child("id").content();
tmpFile.delete();
return reqId;
private String getHttpQueueName() {
return this.httpObject.getSourceQueue().getqName();
private String getId(String queryType) {
int start = queryType.indexOf("?ID=") + 4;
queryType = queryType.replace(" HTTP/1.1", "");
return header.substring(5, header.indexOf(" HTTP/1.1"));
private void reply(String queryType) {
if (queryType.indexOf("?ID=") > 0) { // Specifiek ID
this.replyId(this.getId(queryType));
} else { // Overzicht
this.replyList();
private void initListen() {
System.out.println("[HTTPQ2AppChannel] Connection received from "
+ socket.getInetAddress().getHostName());
private void replyList() {
LinkedList<String> queued = this.httpObject.getSourceQueue().peekEntireQueue();
if (queued.size() > 0) {
LinkedList<String> idOutput = new LinkedList<String>();
int contentLength = 0;
for (String inQueue : queued) {
String inputId = "ID=" + this.findId(inQueue) + "\n";
idOutput.add(inputId);
contentLength += inputId.length();
outPutWriter.write("HTTP/1.1 200 OK\n");
outPutWriter.write("Content-Length=" + contentLength + "\n");
for (String id : idOutput) {
System.out.println(id);
outPutWriter.write(id.trim());
125
126
127
128
129
130
131
132
133
134
private void replyId(String queryId) {
135
136
150
outPutWriter.write("HTTP/1.1 404 Not Found");
151
152
137
138
139
140
141
142
143
144
145
146
147
148
149
153
154
155
// Open connection
return header.substring(6, header.indexOf(" HTTP/1.1"));
private String getReqeustInfo(String inputLine) {
// Queuenaam / Content length
return inputLine.substring(5).trim();
private void handleSocketRequest(String inputLine) {
// Pak socket
SocketApp2QChannel inputSocket;
inputSocket = (SocketApp2QChannel) ESBServer.esbChannels.get("LocatorIn");
// Header info
String header = getReqeustInfo(inputLine);
String[] info = header.split("/");
// Queue controleren
if (this.getHttpQueueName(inputSocket).equals(info[0].trim())) {
String readline = inputReader.readLine();
while (readline != null) {
inputSocket.getDestinationQueue().addToQueue(inputLine);
outPutWriter.println("ACK");
break;
// Do nothing
outPutWriter.println("NACK");
} else {
private void handleValidRequest() {
public class ESBServer {
public static HashMap<String, Channel> esbChannels;
public static void main(String[] args) {
new ESBServer("C:/Users/Martijn/workspace/ESB/src/hanze/ga/wt3/xml/configuration.xml");
private HashMap<String, ESBQueue> esbQueues;
@SuppressWarnings("static-access")
this.buildFTPApp2Q(channelDef);
* Zorgt ervoor dat een Channel de juiste bron- en bestemmingsQueue heeft.
private void linkQueueToChannel() {
for (ESBQueue esbQueue : this.queues.values()) {
// Link receiver
Channel receiver = esbQueue.getqReceiverChannel();
if (receiver instanceof App2Q) {
((App2Q) receiver).setDestinationQueue(esbQueue);
} else if (receiver instanceof Q2App) {
((Q2App) receiver).setSourceQueue(esbQueue);
// Link sender
Channel sender = esbQueue.getqSenderChannel();
if (sender instanceof App2Q) {
((App2Q) sender).setDestinationQueue(esbQueue);
} else if (sender instanceof Q2App) {
((Q2App) sender).setSourceQueue(esbQueue);
private void buildFTPApp2Q(Xml channel) {
FTPApp2QChannel ftp = new FTPApp2QChannel();
// Globale settings
ftp.setChannelName(channel.child("name").content());
// FTP Adapter waarden
Xml adapterNode = channel.child("adapter");
ftp.setFtpAddress(adapterNode.child("address").content());
ftp.setFtpInitDir(adapterNode.child("init-dir").content());
ftp.setFtpPassword(adapterNode.child("password").content());
ftp.setFtpUserName(adapterNode.child("username").content());
ftp.setChannelPort(adapterNode.child("port").content());
// Zet in lijst
this.channels.put(channel.child("name").content(), ftp);
* Bouwen van ESB aan de hand van XML-bestand.
private void buildFTPQ2App(Xml channel) {
FTPQ2AppChannel ftp = new FTPQ2AppChannel();
271
return queues;
247
272
248
273
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
274
System.out.println("[" + this.getqName() + "] Item was added");System.out.println("[" + this.getqName() + "] Item was added");public void deleteFromQueue(String queuedObject) {public void deleteFromQueue(String queuedObject) {public LinkedList<String> peekEntireQueue() {/*** Geeft Array terug van hele queue** @return Lijst met alle items*/public LinkedList<String> peekEntireQueue() {return new LinkedList<String>(this.queue);}public void setqName(String qName) {public void setqName(String qName) {import org.apache.commons.net.ftp.FTPClient;import org.apache.commons.net.ftp.FTPFile;public class ESBFTP {public class ESBFTP {public FTPFile[] getFilesFromDir(String directory) {try {} catch (IOException e) {* Sluit connectie met server die nu in lokale variabele ftpClient staat* Haalt opgegeven bestand van de server er verwijderd deze.** @param filePath* Pad + bestandsnaam* @return Bestandsinhoudprivate void terminateConnection() {public String pollFile(String filePath) {this.makeConnection();ByteArrayOutputStream outputStream = null;try {try {this.ftpClient.disconnect();outputStream = new ByteArrayOutputStream();this.ftpClient.retrieveFile(filePath, outputStream);// this.ftpClient.dele(filePath);} catch (IOException e) {} catch (IOException e) {this.ftpClient = null;this.terminateConnection();return outputStream.toString();* Haalt bestanden uit directory* Sluit connectie met server die nu in lokale variabele ftpClient staat** @param directory* doeldirectory* @return Array van FTPFilespublic FTPFile[] getFilesFromDir(String directory) {private void terminateConnection() {this.makeConnection();FTPFile[] output = null;try {try {output = this.ftpClient.listFiles(directory);this.ftpClient.disconnect();} catch (IOException e) {} catch (IOException e) {this.terminateConnection();this.ftpClient = null;return output;/*** Haalt opgegeven bestand van de server er verwijderd deze.** @param filePath* Pad + bestandsnaam* @return Bestandsinhoud*/public String pollFile(String filePath) {this.makeConnection();ByteArrayOutputStream outputStream = null;try {outputStream = new ByteArrayOutputStream();this.ftpClient.retrieveFile(filePath, outputStream);// this.ftpClient.dele(filePath);} catch (IOException e) {e.printStackTrace();}this.terminateConnection();return outputStream.toString();}import org.apache.commons.net.ftp.FTPFile;public class FTPApp2QChannel extends App2Q {public class FTPApp2QChannel extends App2Q {@Overridepublic String getFtpAddress() {public void run() {return ftpAddress;try {while (true) {this.readAndProcessFTP();Thread.sleep(10000);}} catch (InterruptedException e) {e.printStackTrace();}public String getFtpInitDir() {public String getFtpPassword() {public String getFtpUserName() {private void readAndProcessFTP() {private void readAndProcessFTP() {private String undoFilter(String fileContents) {@OverrideStringWriter stringWriter = new StringWriter();public void run() {FromIBGFilter fromFilter = new FromIBGFilter();try {fromFilter.doFilter(new StringReader(fileContents), stringWriter);while (true) {this.readAndProcessFTP();return stringWriter.toString();Thread.sleep(10000);}} catch (InterruptedException e) {e.printStackTrace();}public String getFtpAddress() {return ftpAddress;}public void setFtpAddress(String ftpAddress) {public void setFtpAddress(String ftpAddress) {public String getFtpInitDir() {return ftpInitDir;}public void setFtpInitDir(String ftpInitDir) {public void setFtpInitDir(String ftpInitDir) {public String getFtpUserName() {public void setFtpPassword(String ftpPassword) {return ftpUserName;this.ftpPassword = ftpPassword;public void setFtpUserName(String ftpUserName) {public void setFtpUserName(String ftpUserName) {public String getFtpPassword() {private String undoFilter(String fileContents) {return ftpPassword;StringWriter stringWriter = new StringWriter();}FromIBGFilter fromFilter = new FromIBGFilter();fromFilter.doFilter(new StringReader(fileContents), stringWriter);public void setFtpPassword(String ftpPassword) {return stringWriter.toString();this.ftpPassword = ftpPassword;private String findId(String inQueue) {File tmpFile = new File("C:/queuetemp.tmp");try {FileWriter fw = new FileWriter("C:/queuetemp.tmp");} catch (Exception e) {Xml xmlParser = new Xml("C:/queuetemp.tmp", "message");String reqId = xmlParser.child("id").content();private String getHttpQueueName() {private String getId(String queryType) {private String getId(String queryType) {int start = queryType.indexOf("?ID=") + 4;int start = queryType.indexOf("?ID=") + 4;queryType = queryType.replace(" HTTP/1.1", "");queryType = queryType.replace(" HTTP/1.1", "");return header.substring(5, header.indexOf(" HTTP/1.1"));return header.substring(5, header.indexOf(" HTTP/1.1"));private void reply(String queryType) {if (queryType.indexOf("?ID=") > 0) { // Specifiek IDthis.replyId(this.getId(queryType));} else { // Overzichtthis.replyList();}}private String getHttpQueueName() {return this.httpObject.getSourceQueue().getqName();}private void initListen() {private void initListen() {System.out.println("[HTTPQ2AppChannel] Connection received from "System.out.println("[HTTPQ2AppChannel] Connection received from "private void replyList() {private void reply(String queryType) {LinkedList<String> queued = this.httpObject.getSourceQueue().peekEntireQueue();if (queryType.indexOf("?ID=") > 0) { // Specifiek IDif (queued.size() > 0) {this.replyId(this.getId(queryType));LinkedList<String> idOutput = new LinkedList<String>();} else { // Overzichtint contentLength = 0;this.replyList();for (String inQueue : queued) {String inputId = "ID=" + this.findId(inQueue) + "\n";idOutput.add(inputId);contentLength += inputId.length();}outPutWriter.write("HTTP/1.1 200 OK\n");outPutWriter.write("Content-Length=" + contentLength + "\n");for (String id : idOutput) {System.out.println(id);outPutWriter.write(id.trim());}private String findId(String inQueue) {File tmpFile = new File("C:/queuetemp.tmp");try {FileWriter fw = new FileWriter("C:/queuetemp.tmp");fw.write(inQueue);fw.close();} catch (Exception e) {// Doe niets}Xml xmlParser = new Xml("C:/queuetemp.tmp", "message");String reqId = xmlParser.child("id").content();tmpFile.delete();return reqId;}private void replyId(String queryId) {private void replyId(String queryId) {if (queued.size() > 0) {if (queued.size() > 0) {outPutWriter.write("HTTP/1.1 404 Not Found");outPutWriter.write("HTTP/1.1 404 Not Found");private void replyList() {if (queued.size() > 0) {for (String inQueue : queued) {outPutWriter.write("HTTP/1.1 200 OK\n");outPutWriter.write("Content-Length=" + contentLength + "\n");for (String id : idOutput) {public void run() {public void run() {return header.substring(6, header.indexOf(" HTTP/1.1"));return header.substring(6, header.indexOf(" HTTP/1.1"));private String getReqeustInfo(String inputLine) {private void handleSocketRequest(String inputLine) {inputSocket = (SocketApp2QChannel) ESBServer.esbChannels.get("LocatorIn");String[] info = header.split("/");if (this.getHttpQueueName(inputSocket).equals(info[0].trim())) {try {while (readline != null) {outPutWriter.println("ACK");} catch (Exception e) {outPutWriter.println("NACK");} else {outPutWriter.println("NACK");private void handleValidRequest() {private void handleValidRequest() {try {try {private void handleSocketRequest(String inputLine) {// Pak socketSocketApp2QChannel inputSocket;inputSocket = (SocketApp2QChannel) ESBServer.esbChannels.get("LocatorIn");// Header infoString header = getReqeustInfo(inputLine);String[] info = header.split("/");// Queue controlerenif (this.getHttpQueueName(inputSocket).equals(info[0].trim())) {try {String readline = inputReader.readLine();while (readline != null) {inputSocket.getDestinationQueue().addToQueue(inputLine);outPutWriter.println("ACK");break;}} catch (Exception e) {// Do nothingoutPutWriter.println("NACK");}} else {outPutWriter.println("NACK");}}private String getReqeustInfo(String inputLine) {// Queuenaam / Content lengthreturn inputLine.substring(5).trim();}public void run() {public void run() {public class ESBServer {public class ESBServer {public static void main(String[] args) {public static void main(String[] args) {new ESBServer("C:/Users/Martijn/workspace/ESB/src/hanze/ga/wt3/xml/configuration.xml");new ESBServer("C:/Users/Martijn/workspace/ESB/src/hanze/ga/wt3/xml/configuration.xml");public static HashMap<String, Channel> esbChannels;@SuppressWarnings("static-access")@SuppressWarnings("static-access")/*** Zorgt ervoor dat een Channel de juiste bron- en bestemmingsQueue heeft.*/private void linkQueueToChannel() {for (ESBQueue esbQueue : this.queues.values()) {// Link receiverChannel receiver = esbQueue.getqReceiverChannel();if (receiver instanceof App2Q) {((App2Q) receiver).setDestinationQueue(esbQueue);} else if (receiver instanceof Q2App) {((Q2App) receiver).setSourceQueue(esbQueue);}// Link senderChannel sender = esbQueue.getqSenderChannel();if (sender instanceof App2Q) {((App2Q) sender).setDestinationQueue(esbQueue);} else if (sender instanceof Q2App) {((Q2App) sender).setSourceQueue(esbQueue);}}}private void buildFTPApp2Q(Xml channel) {FTPApp2QChannel ftp = new FTPApp2QChannel();// Globale settingsftp.setChannelName(channel.child("name").content());// FTP Adapter waardenXml adapterNode = channel.child("adapter");ftp.setFtpAddress(adapterNode.child("address").content());ftp.setFtpInitDir(adapterNode.child("init-dir").content());ftp.setFtpPassword(adapterNode.child("password").content());ftp.setFtpUserName(adapterNode.child("username").content());ftp.setChannelPort(adapterNode.child("port").content());// Zet in lijstthis.channels.put(channel.child("name").content(), ftp);}private void buildFTPApp2Q(Xml channel) {ftp.setChannelName(channel.child("name").content());Xml adapterNode = channel.child("adapter");ftp.setFtpAddress(adapterNode.child("address").content());ftp.setFtpInitDir(adapterNode.child("init-dir").content());ftp.setFtpPassword(adapterNode.child("password").content());ftp.setFtpUserName(adapterNode.child("username").content());ftp.setChannelPort(adapterNode.child("port").content());this.channels.put(channel.child("name").content(), ftp);private void buildFTPQ2App(Xml channel) {private void buildFTPQ2App(Xml channel) {private void linkQueueToChannel() {for (ESBQueue esbQueue : this.queues.values()) {if (receiver instanceof App2Q) {} else if (receiver instanceof Q2App) {if (sender instanceof App2Q) {} else if (sender instanceof Q2App) {