Orekhov-AV.ru
Поиск работы: тестовое задание
(получено вечером 14 мая 2012 года, выполнено утром 21 мая 2012 года)

Тестовое задание: сборщик и сервер хранения данных из /var/log/messages.

Сборщик данных.

Должен содержать 2 потока выполнения. Первый висит и ждет обновления файла, по обновлении парсит и передает второму. Второй - собирает записи до определенного лимита (время или количество записей), затем передает на сервер.

Сервер.

При получении сохраняет данные в БД sqlite. Поддерживает обращение по http с авторизацией (любого типа) и отдаёт html-страницу с данными из базы в виде таблицы.

Язык: C, можно использовать доступные библиотеки.

Моё решение:

Архив (исходники, Makefile, README).

Краткий путеводитель по исходникам:

Makefile :

  1
  2 CC = gcc
  3 ECHO = env echo -ne
  4
  5 all : server.bin client.bin
  6
  7
  8
  9

Почти все параметры задаются на этапе компиляции в Makefile как директивы -D компилятора gcc:

 10 DEFS += -DSQLITE_DB=\"./messages.db\"
 11
 12
 13 DEFS += -DSERVERDATA_ADDR_STR=\"127.0.0.1\"
 14 DEFS += -DSERVERDATA_PORT_STR=\"8081\"
 15 DEFS += -DSERVERDATA_MAX_CONNECTIONS=5
 16
 17 DEFS += -DMAX_SEND_ERRORS=5
 18
 19 DEFS += -DCLIENTID_COMMAND_STR=\"CLIENTID:\ \"
 20 DEFS += -DHEADER_COMMAND_STR=\"HEADER:\ \"
 21 DEFS += -DCONTENT_COMMAND_STR=\"CONTENT:\ \"
 22
 23 DEFS += -DSERVERHTTP_ADDR_STR=\"127.0.0.1\"
 24 DEFS += -DSERVERHTTP_PORT_STR=\"8082\"
 25 DEFS += -DSERVERHTTP_MAX_CONNECTIONS=5
 26 DEFS += -DSERVERHTTP_REALM=\"HttpVarLogMessages\"
 27 DEFS += -DSERVERHTTP_USER=\"Admin\"
 28 DEFS += -DSERVERHTTP_PASSWORD=\"code-tester\"
 29
 30 DEFS += -DMAX_STRLEN=1000
 31 DEFS += -DMAX_BUFFER=10000
 32

Символьные коды выхода из программы тоже задаются через директивы -D:

 33 DEFS += -DEXIT_OK=0
 34 DEFS += -DEXIT_GETADDRINFO=1
 35 DEFS += -DEXIT_SOCKET=2
 36 DEFS += -DEXIT_BIND=3
 37 DEFS += -DEXIT_LISTEN=4
 38 DEFS += -DEXIT_FORK=5
 39 DEFS += -DEXIT_ACCEPT=6
 40 DEFS += -DEXIT_SIGACTION=7
 41 DEFS += -DEXIT_SEND_ERRORS=8
 42 DEFS += -DEXIT_CONNECT=9
 43 DEFS += -DEXIT_THREAD=10
 44

Символьные коды возврата из некоторых функций:

 45 DEFS += -DRET_PARSE_FAILED=0
 46 DEFS += -DRET_PARSE_COMPLETE=1
 47 DEFS += -DRET_NEED_INPUT=2
 48 DEFS += -DRET_DB_FAILED=3
 49 DEFS += -DRET_DB_SQLFAILED=4
 50
 51 DEFS += -DRET_SEND_OK=0
 52 DEFS += -DRET_SEND_FAILED=1
 53

Ещё несколько параметров:

 54
 55 DEFS += -DMAX_PACKETS=5
 56 DEFS += -DMAX_TIMEOUT=6
 57
 58 DEFS += -DCLIENTID_NAME=\"Tester\"
 59

Цели для компиляции и линковки:

 60 base64.o : base64.c Makefile
 61         @$(ECHO) "Compiling base64.c\n"
 62         @$(CC) -c -o base64.o base64.c $(DEFS)
 63
 64 sighandle.o : sighandle.c Makefile
 65         @$(ECHO) "Compiling sighandle.c\n"
 66         @$(CC) -c -o sighandle.o sighandle.c $(DEFS)
 67
 68 httpserver.o : httpserver.c Makefile
 69         @$(ECHO) "Compiling httpserver.c\n"
 70         @$(CC) -c -o httpserver.o httpserver.c $(DEFS)
 71
 72 dataserver.o : dataserver.c Makefile
 73         @$(ECHO) "Compiling dataserver.c\n"
 74         @$(CC) -c -o dataserver.o dataserver.c $(DEFS)
 75
 76 sock_utils.o : sock_utils.c Makefile
 77         @$(ECHO) "Compiling sock_utils.c\n"
 78         @$(CC) -c -o sock_utils.o sock_utils.c $(DEFS)
 79
 80
 81
 82
 83 server.o : server.c Makefile
 84         @$(ECHO) "Compiling server.c\n"
 85         @$(CC) -c -o server.o server.c $(DEFS)
 86
 87 server.bin : server.o base64.o sighandle.o httpserver.o dataserver.o sock_utils.o
 88         @$(ECHO) "Linking server.bin\n\n"
 89         @$(CC) -lsqlite3 -o server.bin server.o base64.o sighandle.o httpserver.o dataserver.o sock_utils.o
 90
 91
 92 client.o : client.c Makefile
 93         @$(ECHO) "Compiling client.c\n"
 94         @$(CC) -c -o client.o client.c $(DEFS)
 95
 96
 97 packets_queue.o : packets_queue.c Makefile
 98         @$(ECHO) "Compiling packets_queue.c\n"
 99         @$(CC) -c -o packets_queue.o packets_queue.c $(DEFS)
100
101
102         
103         
104 client.bin : client.o base64.o sighandle.o sock_utils.o packets_queue.o
105         @$(ECHO) "Linking client.bin\n\n"
106         @$(CC) -lpthread -o client.bin client.o base64.o sighandle.o sock_utils.o packets_queue.o
107
108
109
110
111 clean:
112         @rm -vf *.o *.bin
113
114
115
116
117
118 runc:
119         @./client.bin
120
121
122 runs:
123         @./server.bin
124
125
126
127 kills:
128         @killall server.bin
129
130
131 killc:
132         @killall client.bin
133
134
135

common.h :

Необходимые директивы (include, define):

  1 #ifndef common___H
  2 #define common___H
  3
  4 #include <stdio.h>
  5 #include <stdlib.h>
  6
  7 #include <sys/types.h>
  8 #include <sys/stat.h>
  9 #include <fcntl.h>
 10 #include <stdio.h>
 11 #include <stdlib.h>
 12 #include <unistd.h>
 13 #include <string.h>
 14 #include <sys/socket.h>
 15 #include <netdb.h>
 16
 17 #include <pthread.h>
 18
 19 #include <signal.h>
 20
 21 #include <time.h>
 22
 23 #include <poll.h>
 24
 25 #if defined (httpserver___C) || defined (dataserver___C)
 26 #include <sqlite3.h>
 27 #endif
 28
 29

Типы данных (enum, struct):

 30
 31 enum get_states
 32 {
 33         initial_get,
 34         letter_G,
 35         letter_E,
 36         letter_T,
 37         space_1,
 38         slash_symbol,
 39         bad_string,
 40         get_ok
 41 };
 42
 43 enum auth_states
 44 {
 45         initial,
 46         string_in_progress,
 47         auth_ok
 48 };
 49
 50 enum data_pack_states
 51 {
 52         clientid,
 53         clientid_value,
 54         header,
 55         header_value,
 56         content,
 57         content_value,
 58         bad_data,
 59         data_ok
 60 };
 61
 62 typedef struct {
 63         char *source;
 64         char *line_header;
 65         char *line_content;
 66 } data_packet_t;
 67
 68 typedef struct {
 69         data_packet_t packet;
 70         enum data_pack_states state;
 71         int temp_pos;
 72         char *curr_keyword;
 73         int keyword_pos;
 74 } parse_data_t;
 75
 76 typedef struct _packets_queue_el_t {
 77         data_packet_t packet;
 78         struct _packets_queue_el_t *prev;
 79         struct _packets_queue_el_t *next;
 80 } packets_queue_el_t;
 81
 82 typedef struct {
 83         pthread_mutex_t         queue_mutex;
 84         time_t                  last_flush;
 85         long                    len;
 86         long                    max_len;
 87         long                    send_errors;
 88         long                    max_send_errors;
 89         packets_queue_el_t      *first;
 90         packets_queue_el_t      *last;
 91 } packets_queue_t;
 92
 93 enum messages_states {
 94         start_process,
 95         process_header,
 96         process_content,
 97         line_complete
 98 };
 99
100 typedef struct {
101         packets_queue_t *queue;
102         data_packet_t packet;
103         enum messages_states state;
104         int temp_pos;
105 } parse_messages_t;
106

Некоторые extern-описания (данные, функции):

107 #ifndef main___C
108 extern char base64[];
109 extern char buffer[MAX_BUFFER];
110 extern volatile sig_atomic_t exit_code;
111 #endif
112
113 #ifndef server___C
114 extern pid_t http_fork;
115 extern pid_t http_fork_child;
116 extern pid_t data_fork_child;
117 extern char auth_test_string[MAX_STRLEN];
118 #endif
119
120 #ifndef sighandle___C
121 void sighandler (int sig);
122 void check_exit ();
123 #endif
124
125 #ifndef httpserver___C
126 int check_auth (char *in_buff);
127 int proceed_http_connection (int connsock_fd);
128 #endif
129
130
131 #endif // common___H
132
133

server.c :

 1 #ifndef main___C
 2 #define main___C
 3
 4 #ifndef server___C
 5 #define server___C
 6 #endif
 7
 8 #include "common.h"
 9
10
11 char base64[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
12 char buffer[MAX_BUFFER];
13 volatile sig_atomic_t exit_code = 0;
14
15 pid_t http_fork = -1;
16 pid_t http_fork_child = -1;
17 pid_t data_fork_child = -1;
18 char auth_test_string[MAX_STRLEN];
19
20
21
22
23
24
25 int main (int argc, char **argv)
26 {
27 struct sigaction act;
28
29
30
31
32         memset (&act, '\0', sizeof (act));
33         act.sa_handler = &sighandler;
34         if (sigaction (SIGTERM, &act, NULL) < 0)
35         {
36                 perror ("sigaction");
37                 exit (EXIT_SIGACTION);
38         }
39
40
41         http_fork = fork ();
42
43         if (http_fork == 0)
44         {
45                 httpserver ();
46         }
47         else
48         {
49                 dataserver ();
50         }
51
52
53
54
55
56
57
58         return EXIT_OK;
59 }
60
61
62 #endif // main___C
63
64
65

base64.c :

 1 #ifndef base64___C
 2 #define base64___C
 3
 4 #include "common.h"
 5

Функция кодирования строки в base64 (необходима для проверки basic http авторизации):

 6 int encode_base64 (char *in_str, char **out_str, int *out_len)
 7 {
 8         int in_len = strlen (in_str);
 9         int in_index, out_index;
10         long b3, temp;
11
12         (*out_len) = (in_len/3 + (in_len%3 == 0?0:1))*4;
13         (*out_str) = (char *) malloc ((*out_len)+1);
14         memset ((*out_str), 0, (*out_len)+1);
15
16         in_index = 0;
17         out_index = 0;
18
19         while (in_index < in_len)
20         {
21                 b3 = in_str[in_index];
22                 b3 <<= 8;
23                 b3 |= (in_index+1 >= in_len) ? 0 : in_str[in_index+1];
24                 b3 <<= 8;
25                 b3 |= (in_index+2 >= in_len) ? 0 : in_str[in_index+2];
26
27
28                 temp = b3 & (0xFC0000);
29                 temp >>= 18;
30                 (*out_str)[out_index] = base64[temp]; out_index++;
31                 temp = b3 & (0x3F000);
32                 temp >>= 12;
33                 (*out_str)[out_index] = base64[temp]; out_index++;
34                 if (in_index+1 < in_len)
35                 {
36                         temp = b3 & (0xFC0);
37                         temp >>= 6;
38                         (*out_str)[out_index] = base64[temp]; out_index++;
39                 }
40                 else
41                 {
42                         (*out_str)[out_index] = '='; out_index++;
43                 }
44                 if (in_index+2 < in_len)
45                 {
46                         temp = b3 & (0x3F);
47                         (*out_str)[out_index] = base64[temp]; out_index++;
48                 }
49                 else
50                 {
51                         (*out_str)[out_index] = '='; out_index++;
52                 }
53
54                 in_index += 3;
55         }
56
57
58 }
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 #endif // base64___C
76
77
78
79
80
81

sighandle.c (используется и в программе сервера и в программе клиента для простейшей обработки сигналов) :

 1 #ifndef sighandle___C
 2 #define sighandle___C
 3
 4 #include "common.h"
 5
 6 void sighandler (int sig)
 7 {
 8         fprintf (stderr, "\nKilling process by signal %d.\n", sig);
 9         exit_code = 1;
10 }
11
12 void check_exit ()
13 {
14         if (exit_code == 1)
15                 exit (EXIT_OK);
16 }
17
18 #endif // sighandle___C
19
20

sock_utils.c :

  1 #ifndef sock_utils___C
  2 #define sock_utils___C
  3
  4 #include "common.h"
  5
  6

Функция создания "слушающего" сокета:

  7
  8
  9 int listen_socket (int *listen_sock, const char *serv_prefix, const char *protocol_name, const char *hostname, const char *port, int max_connections)
 10 {
 11 struct addrinfo hints;
 12 struct addrinfo *result, *rp;
 13 struct protoent *proto;
 14 int err;
 15 int i;
 16
 17
 18         memset (&hints, 0, sizeof (struct addrinfo));
 19         hints.ai_family = AF_INET; //AF_UNSPEC;
 20         hints.ai_socktype = SOCK_STREAM;
 21         proto = getprotobyname (protocol_name);
 22         if (proto != NULL)
 23                 hints.ai_protocol = proto->p_proto;
 24
 25         err = getaddrinfo (hostname, port, &hints, &result);
 26         if (err != 0)
 27         {
 28                 fprintf (stderr, "Error on getaddrinfo for server %s://%s:%s (source '%s': %d):\n\t'%s'.\n", serv_prefix, hostname, port, __FILE__, __LINE__, gai_strerror (err));
 29                 exit (EXIT_GETADDRINFO);
 30         }
 31         else
 32         {
 33                 fprintf (stderr, "Success on getaddrinfo for server %s://%s:%s (source '%s': %d).\n", serv_prefix, hostname, port, __FILE__, __LINE__);
 34         }
 35
 36         (*listen_sock) = -1;
 37
 38         for (rp=result; rp!=NULL; rp=rp->ai_next)
 39         {
 40                 (*listen_sock) = socket (rp->ai_family, rp->ai_socktype, rp->ai_protocol);
 41
 42                 if ((*listen_sock) == -1)
 43                         continue;
 44
 45                 if (bind ((*listen_sock), rp->ai_addr, rp->ai_addrlen) == 0)
 46                         break;
 47
 48                 // Bad bind:
 49                 close ((*listen_sock));
 50         }
 51         if ((*listen_sock) == -1)
 52         {
 53                 fprintf (stderr, "Socket creation error for server %s://%s:%s (source '%s': %d).\n", serv_prefix, hostname, port, __FILE__, __LINE__);
 54                 exit (EXIT_SOCKET);
 55         }
 56         else
 57         {
 58                 fprintf (stderr, "Success on socket creation for server %s://%s:%s (source '%s': %d).\n", serv_prefix, hostname, port, __FILE__, __LINE__);
 59         }
 60         if (rp == NULL)
 61         {
 62                 fprintf (stderr, "Bind error for server %s://%s:%s (source '%s': %d).\n", serv_prefix, hostname, port, __FILE__, __LINE__);
 63                 exit (EXIT_BIND);
 64         }
 65         else
 66         {
 67                 fprintf (stderr, "Success on bind for server %s://%s:%s (source '%s': %d).\n", serv_prefix, hostname, port, __FILE__, __LINE__);
 68         }
 69
 70         freeaddrinfo (result);
 71
 72
 73         err = listen ((*listen_sock), max_connections);
 74         if (err != 0)
 75         {
 76                 fprintf (stderr, "Error on listen for server %s://%s:%s (source '%s': %d).\n", serv_prefix, hostname, port, __FILE__, __LINE__);
 77                 exit (EXIT_LISTEN);
 78         }
 79         else
 80         {
 81                 fprintf (stderr, "Success on listen for server %s://%s:%s (source '%s': %d).\n", serv_prefix, hostname, port, __FILE__, __LINE__);
 82         }
 83
 84
 85
 86         return 0;
 87 }
 88
 89

Функция создания "клиентского" connect-сокета:

 90
 91
 92 int connect_socket (int *sock, const char *serv_prefix, const char *protocol_name, const char *hostname, const char *port)
 93 {
 94 struct addrinfo hints;
 95 struct addrinfo *result, *rp;
 96 struct protoent *proto;
 97 int err;
 98 int i;
 99
100
101         memset (&hints, 0, sizeof (struct addrinfo));
102         hints.ai_family = AF_INET; //AF_UNSPEC;
103         hints.ai_socktype = SOCK_STREAM;
104         proto = getprotobyname (protocol_name);
105         if (proto != NULL)
106                 hints.ai_protocol = proto->p_proto;
107
108         err = getaddrinfo (hostname, port, &hints, &result);
109         if (err != 0)
110         {
111                 fprintf (stderr, "Error on getaddrinfo for server %s://%s:%s (source '%s': %d):\n\t'%s'.\n", serv_prefix, hostname, port, __FILE__, __LINE__, gai_strerror (err));
112                 exit (EXIT_GETADDRINFO);
113         }
114         else
115         {
116                 fprintf (stderr, "Success on getaddrinfo for server %s://%s:%s (source '%s': %d).\n", serv_prefix, hostname, port, __FILE__, __LINE__);
117         }
118
119         (*sock) = -1;
120
121         for (rp=result; rp!=NULL; rp=rp->ai_next)
122         {
123                 (*sock) = socket (rp->ai_family, rp->ai_socktype, rp->ai_protocol);
124
125                 if ((*sock) == -1)
126                         continue;
127
128                 if (connect ((*sock), rp->ai_addr, rp->ai_addrlen) == 0)
129                         break;
130
131                 // Bad connect:
132                 close ((*sock));
133         }
134         if ((*sock) == -1)
135         {
136                 fprintf (stderr, "Socket creation error for server %s://%s:%s (source '%s': %d).\n", serv_prefix, hostname, port, __FILE__, __LINE__);
137                 exit (EXIT_SOCKET);
138         }
139         else
140         {
141                 fprintf (stderr, "Success on socket creation for server %s://%s:%s (source '%s': %d).\n", serv_prefix, hostname, port, __FILE__, __LINE__);
142         }
143         if (rp == NULL)
144         {
145                 fprintf (stderr, "Connect error for server %s://%s:%s (source '%s': %d).\n", serv_prefix, hostname, port, __FILE__, __LINE__);
146                 exit (EXIT_CONNECT);
147         }
148         else
149         {
150                 fprintf (stderr, "Success on connect for server %s://%s:%s (source '%s': %d).\n", serv_prefix, hostname, port, __FILE__, __LINE__);
151         }
152
153         freeaddrinfo (result);
154
155
156         return 0;
157 }
158
159
160
161
162
163
164
165
166
167
168
169 #endif // sock_utils___C
170
171

httpserver.c :

  1 #ifndef httpserver___C
  2 #define httpserver___C
  3
  4 #include "common.h"
  5
  6 char test_str[MAX_BUFFER];
  7

Фунция проверки http-запроса (проверяет, что запрашивается /, а не /favicon.ico какой-нибудь):

  8 int check_get (char *in_buff)
  9 {
 10         enum get_states state = initial_get;
 11         int temp_pos, in_pos;
 12         in_pos = 0;
 13         while ((in_buff[in_pos] != 0) && (state != get_ok))
 14         {
 15                 switch (state)
 16                 {
 17                         case bad_string:
 18                                 switch (in_buff[in_pos])
 19                                 {
 20                                         case 10:
 21                                         case 13:
 22                                                 state = initial_get;
 23                                                 break;
 24                                         default:
 25                                                 break;
 26                                 }
 27                                 break;
 28                         case initial_get:
 29                                 switch (in_buff[in_pos])
 30                                 {
 31                                         case 10:
 32                                         case 13:
 33                                                 break;
 34                                         case 'G':
 35                                                 state = letter_G;
 36                                                 break;
 37                                         default:
 38                                                 state = bad_string;
 39                                 }
 40                                 break;
 41                         case letter_G:
 42                                 switch (in_buff[in_pos])
 43                                 {
 44                                         case 'E':
 45                                                 state = letter_E;
 46                                                 break;
 47                                         default:
 48                                                 state = bad_string;
 49                                 }
 50                                 break;
 51                         case letter_E:
 52                                 switch (in_buff[in_pos])
 53                                 {
 54                                         case 'T':
 55                                                 state = letter_T;
 56                                                 break;
 57                                         default:
 58                                                 state = bad_string;
 59                                 }
 60                                 break;
 61                         case letter_T:
 62                                 switch (in_buff[in_pos])
 63                                 {
 64                                         case ' ':
 65                                                 state = space_1;
 66                                                 break;
 67                                         default:
 68                                                 state = bad_string;
 69                                 }
 70                                 break;
 71                         case space_1:
 72                                 switch (in_buff[in_pos])
 73                                 {
 74                                         case ' ':
 75                                                 break;
 76                                         case '/':
 77                                                 state = slash_symbol;
 78                                                 break;
 79                                         default:
 80                                                 state = bad_string;
 81                                 }
 82                                 break;
 83                         case slash_symbol:
 84                                 switch (in_buff[in_pos])
 85                                 {
 86                                         case ' ':
 87                                                 state = get_ok;
 88                                                 break;
 89                                         default:
 90                                                 state = bad_string;
 91                                 }
 92                                 break;
 93                 }
 94                 in_pos++;
 95         }
 96         if (state == get_ok)
 97         {
 98                 return 1;
 99         }
100         else
101         {
102                 return 0;
103         }
104 }
105
106

Фунция проверки http-запроса (проверяет basic авторизацию):

107
108 int check_auth (char *in_buff)
109 {
110         enum auth_states state = initial;
111         int temp_pos, in_pos;
112         in_pos = 0;
113         while ((in_buff[in_pos] != 0) && (state != auth_ok))
114         {
115                 switch (state)
116                 {
117                         case initial:
118                                 switch (in_buff[in_pos])
119                                 {
120                                         case 10:
121                                         case 13:
122                                                 break;
123                                         default:
124                                                 memset (test_str, 0, MAX_BUFFER);
125                                                 test_str[0] = in_buff[in_pos];
126                                                 temp_pos = 1;
127                                                 state = string_in_progress;
128                                 }
129                                 break;
130                         case string_in_progress:
131                                 switch (in_buff[in_pos])
132                                 {
133                                         case 10:
134                                         case 13:
135                                                 if (strcmp (test_str, auth_test_string) == 0)
136                                                 {
137                                                         state = auth_ok;
138                                                 }
139                                                 else
140                                                 {
141                                                         state = initial;
142                                                 }
143                                                 break;
144                                         default:
145                                                 test_str[temp_pos] = in_buff[in_pos];
146                                                 temp_pos++;
147                                 }
148                                 break;
149                 }
150                 in_pos++;
151         }
152         if (state == auth_ok)
153         {
154                 return 1;
155         }
156         else
157         {
158                 if (strcmp (test_str, auth_test_string) == 0)
159                 {
160                         return 1;
161                 }
162                 else
163                 {
164                         return 0;
165                 }
166         }
167 }
168
169 char td_str[MAX_STRLEN];
170

Callback функция для sqlite3_exec (каждую строку, полученную из базы, оформляет в строку html-таблицы и записывает в сокет):

171 int my_sqlite_callback (void *out_sock, int argc, char **argv, char **azColName)
172 {
173 int *sock = (int *) out_sock;
174 int i;
175
176         write ((*sock), "<tr align=\"left\" valign=\"middle\">\n", strlen ("<tr align=\"left\" valign=\"middle\">\n"));
177         for(i=0; i<argc; i++)
178         {
179                 memset (td_str, 0, MAX_STRLEN);
180                 sprintf (td_str, "<td><small>%s</small></td>\n", (argv[i]) ? argv[i] : "-");
181                 write ((*sock), td_str, strlen (td_str));
182         }
183         write ((*sock), "\n</tr>\n", strlen ("\n</tr>\n"));
184
185         return 0;
186 }

Фунция, выполняющая основные операции по работе с базой sqlite (open, exec, close):

187
188 int get_table (char *buff, int *out_sock)
189 {
190 sqlite3 *db;
191 char *zErrMsg = 0;
192 int rc;
193
194         memset (buff, 0, MAX_BUFFER);
195         strcat (buff, "<table border=1 width=\"1280px\">\n");
196         
197         strcat (buff, "<tr align=\"center\" valign=\"middle\"><td width=\"100px\" bgcolor=\"#1253dd\"><font color=\"#ffffff\">Site:</font></td><td width=\"300px\" bgcolor=\"#1253dd\"><font color=\"#ffffff\">Message header:</font></td><td bgcolor=\"#1253dd\"><font color=\"#ffffff\">Message content:</font></td></tr>\n");
198
199         rc = sqlite3_open (SQLITE_DB, &db);
200         if (rc)
201         {
202                 fprintf (stderr, "Can't open database '%s': %s\n", SQLITE_DB, sqlite3_errmsg(db));
203                 sqlite3_close (db);
204                 strcat (buff, "<tr align=\"center\" valign=\"middle\"><td bgcolor=\"#ff0000\"><font color=\"#ffffff\">ERROR</font></td><td bgcolor=\"#ff0000\"><font color=\"#ffffff\">CAN NOT OPEN DB FILE</font></td><td bgcolor=\"#ff0000\"><font color=\"#ffffff\">!!!</font></td></tr>\n");
205                 strcat (buff, "</table>\n");
206                 write ((*out_sock), buff, strlen (buff));
207                 return 1;
208         }
209
210         write ((*out_sock), buff, strlen (buff));
211
212         rc = sqlite3_exec (db, "select source, line_header, line_content from messages order by source", my_sqlite_callback, out_sock, &zErrMsg);
213
214         memset (buff, 0, strlen (buff));
215         if (rc != SQLITE_OK)
216         {
217                 fprintf (stderr, "SQL error: %s\n", zErrMsg);
218                 sqlite3_free (zErrMsg);
219                 strcat (buff, "<tr align=\"center\" valign=\"middle\"><td bgcolor=\"#ff0000\"><font color=\"#ffffff\">ERROR</font></td><td bgcolor=\"#ff0000\"><font color=\"#ffffff\">SQL ERROR</font></td><td bgcolor=\"#ff0000\"><font color=\"#ffffff\">!!!</font></td></tr>\n");
220                 strcat (buff, "</table>\n");
221                 write ((*out_sock), buff, strlen (buff));
222                 return 2;
223         }
224         sqlite3_close (db);
225
226         strcat (buff, "</table>\n");
227         write ((*out_sock), buff, strlen (buff));
228
229         return 0;
230 }

Фунция (процесс-потомок) для обработки входящего http-соединения:

231
232 int proceed_http_connection (int connsock_fd)
233 {
234
235
236         memset (buffer, 0, MAX_BUFFER);
237         read (connsock_fd, buffer, MAX_BUFFER);
238         if (check_auth (buffer) == 1)
239         {
240                 if (check_get (buffer) == 1)
241                 {
242                         memset (buffer, 0, MAX_BUFFER);
243                         sprintf (buffer, "HTTP/1.1 200 OK\r\nContent-Type: text/html\r\n\r\n");
244                         strcat (buffer, "<html>\n<head>\n</head>\n<body>\n<center>\n");
245                         write (connsock_fd, buffer, strlen (buffer));
246                         get_table (buffer, &connsock_fd);
247                         memset (buffer, 0, MAX_BUFFER);
248                         strcat (buffer, "\n</center>\n</body>\n</html>\r\n\r\n");
249                         write (connsock_fd, buffer, strlen (buffer));
250                 }
251                 else
252                 {
253                         memset (buffer, 0, MAX_BUFFER);
254                         sprintf (buffer, "HTTP/1.1 204 No Content\r\n");
255                         write (connsock_fd, buffer, strlen (buffer));
256                 }
257         }
258         else
259         {
260                 memset (buffer, 0, MAX_BUFFER);
261                 sprintf (buffer, "HTTP/1.1 401 Unauthorized\r\nWWW-Authenticate: Basic realm=\"%s\"\r\n", SERVERHTTP_REALM);
262                 write (connsock_fd, buffer, strlen (buffer));
263         }
264         close (connsock_fd);
265         return EXIT_OK;
266 }
267
268
269
270
271
272 char ucp_str[MAX_STRLEN];
273
274

Функция с главным циклом http-сервера:

275
276 int httpserver ()
277 {
278 char *b64ucp_str;
279 int  b64ucp_strlen;
280 int err;
281 int sock_fd;
282 int newsock_fd;
283 int i;
284
285         memset (ucp_str, 0, MAX_STRLEN);
286         sprintf (ucp_str, "%s:%s", SERVERHTTP_USER, SERVERHTTP_PASSWORD);
287         encode_base64 (ucp_str, &b64ucp_str, &b64ucp_strlen);
288         memset (auth_test_string, 0, MAX_STRLEN);
289         sprintf (auth_test_string, "Authorization: Basic %s", b64ucp_str);
290
291         listen_socket (&sock_fd, "http", "tcp", SERVERHTTP_ADDR_STR, SERVERHTTP_PORT_STR, SERVERHTTP_MAX_CONNECTIONS);
292
293
294         while (exit_code != 1)
295         {
296                 struct sockaddr_in client_addr;
297                 socklen_t client_addr_len = sizeof (client_addr);
298                 int newsock_fd;
299
300                 newsock_fd = accept (sock_fd, NULL, NULL);
301
302                 check_exit ();
303
304                 if (newsock_fd != -1)
305                 {
306                         http_fork_child = fork ();
307                         if (http_fork_child == 0)
308                         {
309                                 close (sock_fd);
310                                 err = proceed_http_connection (newsock_fd);
311                                 exit (EXIT_OK);
312                         }
313                         else
314                         {
315                                 close (newsock_fd);
316                                 if (http_fork_child < 0)
317                                 {
318                                         fprintf (stderr, "Error on fork for server http://%s:%s (source '%s': %d).\n", SERVERHTTP_ADDR_STR, SERVERHTTP_PORT_STR, __FILE__, __LINE__);
319                                         exit (EXIT_FORK);
320                                 }
321                                 else
322                                 {
323                                         fprintf (stderr, "Succes on fork (child PID %d) for server http://%s:%s (source '%s': %d).\nListen for new connections ...\n", http_fork_child, SERVERHTTP_ADDR_STR, SERVERHTTP_PORT_STR, __FILE__, __LINE__);
324                                 }
325                         }
326                 }
327                 else
328                 {
329                         fprintf (stderr, "Error on accept (new socket fd == %d) for server http://%s:%s (source '%s': %d).\n", newsock_fd, SERVERHTTP_ADDR_STR, SERVERHTTP_PORT_STR, __FILE__, __LINE__);
330                         exit (EXIT_ACCEPT);
331                 }
332         }
333
334
335         return (0);
336 }
337
338
339
340
341
342
343
344
345
346
347
348
349 #endif // httpserver___C
350
351

dataserver.c :

  1 #ifndef dataserver___C
  2 #define dataserver___C
  3
  4 #include "common.h"
  5
  6 char data_temp[MAX_BUFFER];
  7
  8 char clientid_str[] = CLIENTID_COMMAND_STR;
  9 char header_str[] = HEADER_COMMAND_STR;
 10 char content_str[] = CONTENT_COMMAND_STR;
 11

Функция, преобразующая в строке символ " в апостроф (перед тем, как сохранить строку в базу insert-запросом) :

 12
 13 void escape_quotes (char *buff)
 14 {
 15 int i;
 16
 17         for (i=0; i<strlen (buff); i++)
 18         {
 19                 if (buff[i] == '"')
 20                         buff[i] = '\'';
 21         }
 22 }
 23

Функция парсинга сообщения от клиента (проверяет корректность входящего пакета, разбивает пакет на логические части, когда пакет полностью корректно распознан, он сохраняется в базе):

 24 int get_data (char *in_buff, parse_data_t *parse_state)
 25 {
 26 int in_pos;
 27
 28
 29         in_pos = 0;
 30
 31         while ((in_buff[in_pos] != 0) && (parse_state->state != data_ok) && (parse_state->state != bad_data))
 32         {
 33                 switch (parse_state->state)
 34                 {
 35                         case clientid:
 36                                 if (((in_buff[in_pos] == 10) || (in_buff[in_pos] == 13)) && (parse_state->keyword_pos == 0))
 37                                 {
 38                                 }
 39                                 else
 40                                 {
 41                                         if (parse_state->keyword_pos < strlen (parse_state->curr_keyword))
 42                                         {
 43                                                 if (in_buff[in_pos] == parse_state->curr_keyword[parse_state->keyword_pos])
 44                                                 {
 45                                                         parse_state->keyword_pos++;
 46                                                 }
 47                                                 else
 48                                                 {
 49                                                         parse_state->state = bad_data;
 50                                                 }
 51                                         }
 52                                         else
 53                                         {
 54                                                 parse_state->curr_keyword = header_str;
 55                                                 parse_state->keyword_pos = 0;
 56                                                 memset (data_temp, 0, MAX_BUFFER);
 57                                                 parse_state->temp_pos = 0;
 58                                                 parse_state->state = clientid_value;
 59                                                 in_pos--;
 60                                         }
 61                                 }
 62                                 break;
 63                         case clientid_value:
 64                                 switch (in_buff[in_pos])
 65                                 {
 66                                         case 10:
 67                                         case 13:
 68                                                 parse_state->packet.source = strdup (data_temp);
 69                                                 parse_state->state = header;
 70                                                 break;
 71                                         default:
 72                                                 data_temp[parse_state->temp_pos] = in_buff[in_pos];
 73                                                 parse_state->temp_pos++;
 74                                 }
 75                                 break;
 76                         case header:
 77                                 if (((in_buff[in_pos] == 10) || (in_buff[in_pos] == 13)) && (parse_state->keyword_pos == 0))
 78                                 {
 79                                 }
 80                                 else
 81                                 {
 82                                         if (parse_state->keyword_pos < strlen (parse_state->curr_keyword))
 83                                         {
 84                                                 if (in_buff[in_pos] == parse_state->curr_keyword[parse_state->keyword_pos])
 85                                                 {
 86                                                         parse_state->keyword_pos++;
 87                                                 }
 88                                                 else
 89                                                 {
 90                                                         parse_state->state = bad_data;
 91                                                 }
 92                                         }
 93                                         else
 94                                         {
 95                                                 parse_state->curr_keyword = content_str;
 96                                                 parse_state->keyword_pos = 0;
 97                                                 memset (data_temp, 0, MAX_BUFFER);
 98                                                 parse_state->temp_pos = 0;
 99                                                 parse_state->state = header_value;
100                                                 in_pos--;
101                                         }
102                                 }
103                                 break;
104                         case header_value:
105                                 switch (in_buff[in_pos])
106                                 {
107                                         case 10:
108                                         case 13:
109                                                 parse_state->packet.line_header = strdup (data_temp);
110                                                 parse_state->state = content;
111                                                 break;
112                                         default:
113                                                 data_temp[parse_state->temp_pos] = in_buff[in_pos];
114                                                 parse_state->temp_pos++;
115                                 }
116                                 break;
117                         case content:
118                                 if (((in_buff[in_pos] == 10) || (in_buff[in_pos] == 13)) && (parse_state->keyword_pos == 0))
119                                 {
120                                 }
121                                 else
122                                 {
123                                         if (parse_state->keyword_pos < strlen (parse_state->curr_keyword))
124                                         {
125                                                 if (in_buff[in_pos] == parse_state->curr_keyword[parse_state->keyword_pos])
126                                                 {
127                                                         parse_state->keyword_pos++;
128                                                 }
129                                                 else
130                                                 {
131                                                         parse_state->state = bad_data;
132                                                 }
133                                         }
134                                         else
135                                         {
136                                                 memset (data_temp, 0, MAX_BUFFER);
137                                                 parse_state->temp_pos = 0;
138                                                 parse_state->state = content_value;
139                                                 in_pos--;
140                                         }
141                                 }
142                                 break;
143                         case content_value:
144                                 switch (in_buff[in_pos])
145                                 {
146                                         case 10:
147                                         case 13:
148                                                 parse_state->packet.line_content = strdup (data_temp);
149                                                 parse_state->state = data_ok;
150                                                 break;
151                                         default:
152                                                 data_temp[parse_state->temp_pos] = in_buff[in_pos];
153                                                 parse_state->temp_pos++;
154                                 }
155                                 break;
156                 }
157                 in_pos++;
158         }
159
160         if (parse_state->state != data_ok)
161         {
162                 if (parse_state->state == bad_data)
163                 {
164                         if (parse_state->packet.source != NULL)
165                         {
166                                 free (parse_state->packet.source);
167                                 parse_state->packet.source = NULL;
168                         }
169                         if (parse_state->packet.line_header != NULL)
170                         {
171                                 free (parse_state->packet.line_header);
172                                 parse_state->packet.line_header = NULL;
173                         }
174                         if (parse_state->packet.line_content != NULL)
175                         {
176                                 free (parse_state->packet.line_content);
177                                 parse_state->packet.line_content = NULL;
178                         }
179                         return RET_PARSE_FAILED;
180                 }
181                 else
182                 {
183                         return RET_NEED_INPUT;
184                 }
185         }
186         else
187         {
188                 sqlite3 *db;
189                 char *zErrMsg = 0;
190                 int rc;
191
192
193                 rc = sqlite3_open (SQLITE_DB, &db);
194                 if (rc)
195                 {
196                         fprintf (stderr, "Can't open database '%s': %s\n", SQLITE_DB, sqlite3_errmsg(db));
197                         sqlite3_close (db);
198                         return RET_DB_FAILED;
199                 }
200         
201                 escape_quotes (parse_state->packet.source);
202                 escape_quotes (parse_state->packet.line_header);
203                 escape_quotes (parse_state->packet.line_content);
204                 
205                 memset (data_temp, 0, MAX_BUFFER);
206                 sprintf (data_temp, "insert into messages (source, line_header, line_content) values (\"%s\", \"%s\", \"%s\")", parse_state->packet.source, parse_state->packet.line_header, parse_state->packet.line_content);
207
208                 rc = sqlite3_exec (db, data_temp, NULL, NULL, &zErrMsg);
209                 if (rc != SQLITE_OK)
210                 {
211                         fprintf (stderr, "SQL error: %s\n", zErrMsg);
212                         sqlite3_free (zErrMsg);
213                         return RET_DB_SQLFAILED;
214                 }
215                 sqlite3_close (db);
216
217
218
219
220                 free (parse_state->packet.source);
221                 parse_state->packet.source = NULL;
222                 free (parse_state->packet.line_header);
223                 parse_state->packet.line_header = NULL;
224                 free (parse_state->packet.line_content);
225                 parse_state->packet.line_content = NULL;
226                 return RET_PARSE_COMPLETE;
227         }
228
229
230
231
232 }
233
234

Функция (процесс-потомок) для обработки входящего соединения data-сервера:

235
236 int proceed_data_connection (int connsock_fd)
237 {
238 parse_data_t parse_state;
239 int data;
240
241         parse_state.packet.source = NULL;
242         parse_state.packet.line_header = NULL;
243         parse_state.packet.line_content = NULL;
244         parse_state.keyword_pos = 0;
245         parse_state.curr_keyword = clientid_str;
246         parse_state.state = clientid;
247         parse_state.temp_pos = 0;
248
249         memset (buffer, 0, MAX_BUFFER);
250         read (connsock_fd, buffer, MAX_BUFFER);
251
252         while ((data = get_data (buffer, &parse_state)) == RET_NEED_INPUT)
253         {
254                 memset (buffer, 0, MAX_BUFFER);
255                 read (connsock_fd, buffer, MAX_BUFFER);
256         }
257
258         if (data == RET_PARSE_COMPLETE)
259         {
260                 memset (buffer, 0, MAX_BUFFER);
261                 sprintf (buffer, "OK");
262                 write (connsock_fd, buffer, strlen (buffer));
263         }
264         else
265         {
266                 memset (buffer, 0, MAX_BUFFER);
267                 sprintf (buffer, "ERR");
268                 write (connsock_fd, buffer, strlen (buffer));
269         }
270
271
272         close (connsock_fd);
273         return EXIT_OK;
274 }
275

Функция с главным циклом data-сервера:

276
277 int dataserver ()
278 {
279 int err;
280 int sock_fd;
281 int newsock_fd;
282 int i;
283
284         listen_socket (&sock_fd, "data", "tcp", SERVERDATA_ADDR_STR, SERVERDATA_PORT_STR, SERVERDATA_MAX_CONNECTIONS);
285
286
287         while (exit_code != 1)
288         {
289                 struct sockaddr_in client_addr;
290                 socklen_t client_addr_len = sizeof (client_addr);
291                 int newsock_fd;
292
293                 newsock_fd = accept (sock_fd, NULL, NULL);
294
295                 check_exit ();
296
297                 if (newsock_fd != -1)
298                 {
299                         data_fork_child = fork ();
300                         if (data_fork_child == 0)
301                         {
302                                 close (sock_fd);
303                                 err = proceed_data_connection (newsock_fd);
304                                 exit (EXIT_OK);
305                         }
306                         else
307                         {
308                                 close (newsock_fd);
309                                 if (data_fork_child < 0)
310                                 {
311                                         fprintf (stderr, "Error on fork for server data://%s:%s (source '%s': %d).\n", SERVERDATA_ADDR_STR, SERVERDATA_PORT_STR, __FILE__, __LINE__);
312                                         exit (EXIT_FORK);
313                                 }
314                                 else
315                                 {
316                                         fprintf (stderr, "Succes on fork (child PID %d) for server data://%s:%s (source '%s': %d).\nListen for new connections ...\n", data_fork_child, SERVERDATA_ADDR_STR, SERVERDATA_PORT_STR, __FILE__, __LINE__);
317                                 }
318                         }
319                 }
320                 else
321                 {
322                         fprintf (stderr, "Error on accept (new socket fd == %d) for server data://%s:%s (source '%s': %d).\n", newsock_fd, SERVERDATA_ADDR_STR, SERVERDATA_PORT_STR, __FILE__, __LINE__);
323                         exit (EXIT_ACCEPT);
324                 }
325         }
326
327
328         return (0);
329 }
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350 #endif // dataserver___C
351
352

packets_queue.c (операции для работы с очередью пакетов в многопоточном клиенте) :

  1 #ifndef packets_queue___C
  2 #define packets_queue___C
  3
  4 #include "common.h"
  5

Функция инициализации очереди:

  6 int queue_make (packets_queue_t *queue, int max_packets, int max_errors)
  7 {
  8         pthread_mutex_init (&(queue->queue_mutex), NULL);
  9         queue->last_flush = time (NULL);
 10         queue->len = 0;
 11         queue->max_len = max_packets;
 12         queue->send_errors = 0;
 13         queue->max_send_errors = max_errors;
 14         queue->first = NULL;
 15         queue->last = NULL;
 16         return 0;
 17 }

Функция помещения пакета в очередь (если пакетов становится достаточно для отправки на сервер, вызывается функция выгрузки очереди):

 18
 19 int queue_put (packets_queue_t *queue, data_packet_t *data)
 20 {
 21 packets_queue_el_t *tmp_el;
 22
 23         pthread_mutex_lock (&(queue->queue_mutex));
 24
 25         if (queue->first == NULL)
 26         {
 27                 tmp_el = (packets_queue_el_t *) malloc (sizeof (packets_queue_el_t));
 28                 tmp_el->packet.source = strdup (data->source);
 29                 tmp_el->packet.line_header = strdup (data->line_header);
 30                 tmp_el->packet.line_content = strdup (data->line_content);
 31                 tmp_el->prev = NULL;
 32                 tmp_el->next = NULL;
 33                 queue->first = tmp_el;
 34                 queue->last = tmp_el;
 35         }
 36         else
 37         {
 38                 tmp_el = (packets_queue_el_t *) malloc (sizeof (packets_queue_el_t));
 39                 tmp_el->packet.source = strdup (data->source);
 40                 tmp_el->packet.line_header = strdup (data->line_header);
 41                 tmp_el->packet.line_content = strdup (data->line_content);
 42                 queue->last->next = tmp_el;
 43                 tmp_el->prev = queue->last;
 44                 tmp_el->next = NULL;
 45                 queue->last = tmp_el;
 46         }
 47         queue->len++;
 48
 49         if (queue->len >= queue->max_len)
 50         {
 51                 pthread_mutex_unlock (&(queue->queue_mutex));
 52                 queue_flush (queue);
 53         }
 54         else
 55         {
 56                 pthread_mutex_unlock (&(queue->queue_mutex));
 57         }
 58         return 0;
 59 }
 60
 61
 62 char send_str[MAX_BUFFER];
 63

Функция отправки пакета по сети на сервер (с проверкой ответа сервера):

 64 int send_packet (char *source, char *header, char *content)
 65 {
 66 int err;
 67 int send_sock;
 68
 69         err = connect_socket (&send_sock, "data", "tcp", SERVERDATA_ADDR_STR, SERVERDATA_PORT_STR);
 70
 71         memset (send_str, 0, MAX_BUFFER);
 72         sprintf (send_str, "%s%s\n%s%s\n%s%s\n", CLIENTID_COMMAND_STR, source, HEADER_COMMAND_STR, header, CONTENT_COMMAND_STR, content);
 73
 74         write (send_sock, send_str, strlen (send_str));
 75
 76         memset (send_str, 0, MAX_BUFFER);
 77         err = read (send_sock, send_str, MAX_BUFFER);
 78
 79         if (err > 0)
 80         {
 81                 if (strcmp (send_str, "OK") == 0)
 82                 {
 83                         close (send_sock);
 84                         return RET_SEND_OK;
 85                 }
 86                 else
 87                 {
 88                         close (send_sock);
 89                         return RET_SEND_FAILED;
 90                 }
 91         }
 92         else
 93         {
 94                 close (send_sock);
 95                 return RET_SEND_FAILED;
 96         }
 97
 98
 99
100 }

Функция очистки очереди (выгрузки всех пакетов на сервер с использованием предыдущей функции):

101
102 int queue_flush (packets_queue_t *queue)
103 {
104 packets_queue_el_t *tmp_el;
105
106         pthread_mutex_lock (&(queue->queue_mutex));
107         while (queue->len > 0)
108         {
109                 if (send_packet (queue->first->packet.source, queue->first->packet.line_header, queue->first->packet.line_content) == RET_SEND_OK)
110                 {
111                         if (queue->len > 1)
112                         {
113                                 tmp_el = queue->first;
114                                 queue->first = tmp_el->next;
115                                 queue->first->prev = NULL;
116                                 queue->len--;
117
118                                 free (tmp_el->packet.source);
119                                 free (tmp_el->packet.line_header);
120                                 free (tmp_el->packet.line_content);
121                                 free (tmp_el);
122                         }
123                         else
124                         {
125                                 tmp_el = queue->first;
126                                 queue->first = NULL;
127                                 queue->last = NULL;
128                                 queue->len--;
129
130                                 free (tmp_el->packet.source);
131                                 free (tmp_el->packet.line_header);
132                                 free (tmp_el->packet.line_content);
133                                 free (tmp_el);
134                         }
135                 }
136                 else
137                 {
138                         queue->send_errors++;
139                         if (queue->send_errors > queue->max_send_errors)
140                         {
141                                 pthread_mutex_unlock (&(queue->queue_mutex));
142                                 exit (EXIT_SEND_ERRORS);
143                         }
144                 }
145                 sleep (1);
146         }
147         queue->last_flush = time (NULL);
148         pthread_mutex_unlock (&(queue->queue_mutex));
149
150
151         return 0;
152 }
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169 #endif // packets_queue___C
170
171

client.c :

  1 #ifndef main___C
  2 #define main___C
  3
  4 #ifndef client___C
  5 #define client___C
  6 #endif
  7
  8
  9 #include "common.h"
 10
 11 char base64[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
 12 char buffer[MAX_BUFFER];
 13
 14 volatile sig_atomic_t exit_code = 0;
 15

Функция парсинга буфера, прочитанного из /var/log/messages:

 16
 17 int parse_messages (char *in_buff, parse_messages_t *parse_state)
 18 {
 19 int in_pos;
 20
 21
 22         in_pos = 0;
 23         while (in_buff[in_pos] != 0)
 24         {
 25                 switch (parse_state->state)
 26                 {
 27                         case start_process:
 28                                 switch (in_buff[in_pos])
 29                                 {
 30                                         case 10:
 31                                         case 13:
 32                                                 parse_state->state = line_complete;
 33                                                 break;
 34                                         default:
 35                                                 memset (parse_state->packet.line_header, 0, MAX_STRLEN);
 36                                                 parse_state->temp_pos = 0;
 37                                                 in_pos--;
 38                                                 parse_state->state = process_header;
 39                                 }
 40                                 break;
 41                         case line_complete:
 42                                 switch (in_buff[in_pos])
 43                                 {
 44                                         case 10:
 45                                         case 13:
 46                                                 break;
 47                                         default:
 48                                                 memset (parse_state->packet.line_header, 0, MAX_STRLEN);
 49                                                 parse_state->temp_pos = 0;
 50                                                 in_pos--;
 51                                                 parse_state->state = process_header;
 52                                 }
 53                                 break;
 54                         case process_header:
 55                                 switch (in_buff[in_pos])
 56                                 {
 57                                         case 10:
 58                                         case 13:
 59                                                 memset (parse_state->packet.line_content, 0, MAX_STRLEN);
 60                                                 queue_put (parse_state->queue, &(parse_state->packet));
 61                                                 parse_state->state = line_complete;
 62                                                 break;
 63                                         case ']':
 64                                                 parse_state->packet.line_header[parse_state->temp_pos] = in_buff[in_pos];
 65                                                 memset (parse_state->packet.line_content, 0, MAX_STRLEN);
 66                                                 parse_state->temp_pos = 0;
 67                                                 parse_state->state = process_content;
 68                                                 break;
 69                                         default:
 70                                                 parse_state->packet.line_header[parse_state->temp_pos] = in_buff[in_pos];
 71                                                 parse_state->temp_pos++;
 72                                 }
 73                                 break;
 74                         case process_content:
 75                                 switch (in_buff[in_pos])
 76                                 {
 77                                         case 10:
 78                                         case 13:
 79                                                 queue_put (parse_state->queue, &(parse_state->packet));
 80                                                 parse_state->state = line_complete;
 81                                                 break;
 82                                         default:
 83                                                 parse_state->packet.line_content[parse_state->temp_pos] = in_buff[in_pos];
 84                                                 parse_state->temp_pos++;
 85                                 }
 86                                 break;
 87                 }
 88
 89                 in_pos++;
 90         }
 91
 92         return 0;
 93 }
 94
 95 char read_buff[MAX_BUFFER];

Функция чтения /var/log/messages (используется poll ()):

 96
 97 void  *poll_thread_func (void *arg)
 98 {
 99 packets_queue_t *queue = (packets_queue_t *) arg;
100 parse_messages_t parse_state;
101 int fd;
102 struct pollfd poll_desc;
103 int err;
104
105
106         parse_state.packet.source = (char *) malloc (MAX_STRLEN);
107         memset (parse_state.packet.source, 0, MAX_STRLEN);
108         sprintf (parse_state.packet.source, "%s", CLIENTID_NAME);
109
110         parse_state.packet.line_header = (char *) malloc (MAX_STRLEN);
111         memset (parse_state.packet.line_header, 0, MAX_STRLEN);
112
113         parse_state.packet.line_content = (char *) malloc (MAX_STRLEN);
114         memset (parse_state.packet.line_content, 0, MAX_STRLEN);
115
116         parse_state.queue = queue;
117         parse_state.state = start_process;
118         parse_state.temp_pos = 0;
119
120
121         fd = open ("/var/log/messages", O_RDONLY);
122
123         poll_desc.fd = fd;
124         poll_desc.events = POLLIN | POLLPRI;
125
126         while (1)
127         {
128                 err = poll (&poll_desc, 1, -1);
129                 check_exit ();
130                 if (err == 1)
131                 {
132                         if (((poll_desc.revents & POLLIN) == POLLIN) || ((poll_desc.revents & POLLPRI) == POLLPRI))
133                         {
134                                 memset (read_buff, 0, MAX_BUFFER);
135                                 err = read (fd, read_buff, MAX_BUFFER);
136                                 check_exit ();
137                                 if (err > 0)
138                                 {
139                                         parse_messages (read_buff, &parse_state);
140                                 }
141                         }
142                 }
143         }
144
145 }

Функция с циклом, отправляющим данные из очереди на сервер по времени:

146
147 int send_function (packets_queue_t *queue)
148 {
149         while (1)
150         {
151                 fprintf (stderr, "TIMEOUT: %d\n", MAX_TIMEOUT);
152                 sleep (MAX_TIMEOUT);
153                 check_exit ();
154                 queue_flush (queue);
155         }
156         return 0;
157 }
158

Функция main :)

159
160 int main (int argc, char **argv)
161 {
162 packets_queue_t main_queue;
163 pthread_t poll_thread;
164 int rc1;
165 struct sigaction act;
166
167
168
169
170         memset (&act, '\0', sizeof (act));
171         act.sa_handler = &sighandler;
172         if (sigaction (SIGTERM, &act, NULL) < 0)
173         {
174                 perror ("sigaction");
175                 exit (EXIT_SIGACTION);
176         }
177
178
179         queue_make (&main_queue, MAX_PACKETS, MAX_TIMEOUT);
180
181         if ( (rc1=pthread_create (&poll_thread, NULL, &poll_thread_func, &main_queue)) )
182         {
183                 fprintf (stderr, "Poll thread creation failed: %d.\n", rc1);
184                 exit (EXIT_THREAD);
185         }
186
187
188         send_function (&main_queue);
189
190
191
192
193         return 0;
194 }
195
196
197
198 #endif // main___C
199
200

Безусловно, программам не хватает проверок записи/чтения, не помешала бы реализация отправки нескольких пакетов за одну сессию, и т.п. Но здесь представлен только результат работы, которую удалось проделать за неделю (на Си уже год ничего не писал, сетевых сервисов до этого тоже не писал).


21 мая 2012
© Андрей В. Орехов, 2012. (i@orekhov-av.ru)
Использование материалов сайта без ссылки на orekhov-av.ru запрещено.
Использование материалов сайта в оффлайновых изданиях запрещено.