Skip to content

Commit

Permalink
RANGER-4837: Fix flaky test by iterating over datewise subfolders for…
Browse files Browse the repository at this point in the history
… counting orc logs (#337)
  • Loading branch information
fateh288 authored Jul 15, 2024
1 parent bc091c5 commit c9361dc
Showing 1 changed file with 39 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.nio.file.Paths;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

Expand Down Expand Up @@ -735,7 +736,7 @@ public void testAuditFileQueueSpoolORC(){
try {
Thread.sleep(40000);
} catch (InterruptedException e) {
System.out.println(e);
logger.error(e.getMessage());
}
queue.waitToComplete();
assertTrue("File created", logFile.exists());
Expand All @@ -745,7 +746,9 @@ public void testAuditFileQueueSpoolORC(){
@Test
public void testAuditFileQueueSpoolORCRollover(){
String appType = "test";
int messageToSend = 100000;
int messageToSend = 1000;
int preRolloverMessagesCount = (int)(0.8*messageToSend);
int postRolloverMessagesCount = messageToSend - preRolloverMessagesCount;
String spoolFolderName = "target/spool";
String logFolderName = "target/testAuditFileQueueSpoolORC";
try {
Expand All @@ -760,11 +763,7 @@ public void testAuditFileQueueSpoolORCRollover(){
}
assertTrue(Files.notExists(Paths.get(spoolFolderName)));
assertTrue(Files.notExists(Paths.get(logFolderName)));
String subdir = appType + "/" + LocalDate.now().toString().replace("-","");
File logFolder = new File(logFolderName);
File logSubfolder = new File(logFolder, subdir);
String logFileName = "test_ranger_audit.orc";
File logFile = new File(logSubfolder, logFileName);
Properties props = new Properties();
props.put(AuditProviderFactory.AUDIT_IS_ENABLED_PROP, "true");
String hdfsPropPrefix = AuditProviderFactory.AUDIT_DEST_BASE + ".hdfs";
Expand All @@ -774,6 +773,7 @@ public void testAuditFileQueueSpoolORCRollover(){
"%app-type%_ranger_audit.orc");
String orcPrefix = hdfsPropPrefix + ".orc";
props.put(orcPrefix+".compression","snappy");
//large numbers used here to ensure that file rollover happens because of file rollover seconds and not orc file /related props
props.put(orcPrefix+".buffersize",""+100000000000000L);
props.put(orcPrefix+".stripesize",""+100000000000000L);
props.put(hdfsPropPrefix + ".batch.queuetype","filequeue");
Expand All @@ -782,35 +782,53 @@ public void testAuditFileQueueSpoolORCRollover(){
String fileSpoolPrefix = filequeuePrefix + ".filespool";
props.put(fileSpoolPrefix+".dir",spoolFolderName);
props.put(fileSpoolPrefix+".buffer.size",""+100000000000000L);
props.put(fileSpoolPrefix+".file.rollover.sec",""+300);
props.put(fileSpoolPrefix+".file.rollover.sec",""+5);
AuditProviderFactory factory = new AuditProviderFactory();
factory.init(props, appType);
AuditHandler queue = factory.getAuditProvider();
for (int i = 0; i < messageToSend; i++) {
for (int i = 0; i < preRolloverMessagesCount; i++) {
queue.log(createEvent());
try {
Thread.sleep(5);
Thread.sleep(10);
} catch (InterruptedException e) {
System.out.println(e);
logger.error(e.getMessage());
}
}
//wait for rollover to happen
try {
Thread.sleep(40000);
Thread.sleep(10000);
} catch (InterruptedException e) {
System.out.println(e);
logger.error(e.getMessage());
}
//send some more logs
for (int i = 0; i < postRolloverMessagesCount; i++) {
queue.log(createEvent());
try {
Thread.sleep(10);
} catch (InterruptedException e) {
logger.error(e.getMessage());
}
}
queue.waitToComplete();
assertTrue("File created", logFile.exists());
File[] listOfFiles = logSubfolder.listFiles();
int totalLogsOrc = 0;
if (listOfFiles != null){
for(File f : listOfFiles){
if (f.getName().endsWith(".orc")){
totalLogsOrc += getOrcFileRowCount(f.getPath());
File appSubFolder = new File(logFolder,appType);
String[] datewiseSubfolders = appSubFolder.list();
logger.info("subfolder list="+ Arrays.toString(datewiseSubfolders));
if (datewiseSubfolders != null) {
for (String dateSubfolder : datewiseSubfolders){
File logSubfolder = new File(appSubFolder, dateSubfolder);
File[] listOfFiles = logSubfolder.listFiles();
if (listOfFiles != null){
for(File f : listOfFiles){
if (f.getName().endsWith(".orc")){
logger.info("Reading orc file:"+f.getName());
totalLogsOrc += getOrcFileRowCount(f.getPath());
}
}
}
}
}
System.out.println("Number of logs in orc="+totalLogsOrc);
logger.info("Number of logs in orc="+totalLogsOrc);
long totalLogsArchive = 0;

try {
Expand All @@ -826,7 +844,7 @@ public void testAuditFileQueueSpoolORCRollover(){
} catch (IOException e) {
throw new RuntimeException(e);
}
System.out.println("Number of logs in archive:"+totalLogsArchive);
logger.info("Number of logs in archive:"+totalLogsArchive);
assertEquals(totalLogsOrc, totalLogsArchive);

long notYetConvertedToORCLogsCount = 0;
Expand All @@ -848,7 +866,7 @@ public void testAuditFileQueueSpoolORCRollover(){
catch (IOException e){
throw new RuntimeException(e);
}
System.out.println("Number of logs not converted to ORC:"+notYetConvertedToORCLogsCount);
logger.info("Number of logs not converted to ORC:"+notYetConvertedToORCLogsCount);
assertEquals(messageToSend, notYetConvertedToORCLogsCount+totalLogsArchive);
}

Expand Down

0 comments on commit c9361dc

Please sign in to comment.